# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


In [1]:
from __future__ import annotations

import random
import time
from functools import partial
from pathlib import Path
from typing import Callable

import bs4
import dask
import dask.bag as db
import re
import json
import dask.dataframe as dd
from dask.delayed import Delayed

In [2]:
DATA_DIR = Path('data/')
SRC_DIR = Path('src/')
OUTPUT_DIR = Path('output/')
if not OUTPUT_DIR.exists():
    OUTPUT_DIR.mkdir()

## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [3]:
def cpu_task():
    return sum([random.random() for _ in range(100_000)])


In [4]:
def io_task():
    time.sleep(0.1)
    return random.random()


In [5]:
def task_runner(task: Callable, n: int = 100) -> None:
    for _ in range(n):
        task()


In [6]:
%%timeit

task_runner(cpu_task)

796 ms ± 36.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
%%timeit

task_runner(io_task)

10.1 s ± 5.78 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
delayed_cpu_task = dask.delayed(cpu_task)
delayed_io_task = dask.delayed(io_task)

In [9]:
def delayed_task_runner(delayed_task: Delayed, n: int = 100, scheduler=None) -> None:
    tasks = [delayed_task() for _ in range(n)]
    dask.compute(tasks, scheduler=scheduler)


In [10]:
%%timeit

delayed_task_runner(delayed_cpu_task, scheduler='threads')

818 ms ± 11.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%%timeit

delayed_task_runner(delayed_cpu_task, scheduler='processes')

894 ms ± 94.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
%%timeit

delayed_task_runner(delayed_io_task, scheduler='threads')

1.33 s ± 3.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [13]:
%%timeit

delayed_task_runner(delayed_io_task, scheduler='processes')

2.12 s ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать.


In [14]:
def get_text(name: str, tag: bs4.element.Tag) -> str | None:
    _tag = tag.find(name)
    return _tag and _tag.get_text()


In [15]:
def parse_reviewers_xml(path: str) -> list[dict]:
    with open(path) as f:
        soup = bs4.BeautifulSoup(f)

    user_list = []
    for user in soup.find_all('user'):
        user_get_text = partial(get_text, tag=user)

        id_ = user_get_text('id')
        username = user_get_text('username')
        name = user_get_text('name')
        sex = user_get_text('sex')
        mail = user_get_text('mail')
        registered = user_get_text('registered')
        birthdate = user_get_text('birthdate')
        name_prefix = user.get('prefix', None)
        country_tag = user.find('country')
        if country_tag:
            country = country_tag.get_text()
            country_code = country_tag.get('code', None)
        else:
            country = country_code = None

        user_list.append({
            'id': id_,
            'username': username,
            'name': name,
            'sex': sex,
            'mail': mail,
            'registered': registered,
            'birthdate': birthdate,
            'name_prefix': name_prefix,
            'country': country,
            'country_code': country_code,
        })

    return user_list


In [16]:
parse_reviewers_xml(DATA_DIR.joinpath('reviewers_full/reviewers_full_0.xml'))[:5]

[{'id': '556011',
  'username': 'gabrielacalhoun',
  'name': None,
  'sex': 'F',
  'mail': None,
  'registered': None,
  'birthdate': '1988-01-25',
  'name_prefix': 'Mrs.',
  'country': None,
  'country_code': None},
 {'id': '1251087',
  'username': 'qbaxter',
  'name': None,
  'sex': None,
  'mail': 'qware@gmail.com',
  'registered': None,
  'birthdate': '1985-01-19',
  'name_prefix': None,
  'country': 'Norway',
  'country_code': 'NO'},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'sex': None,
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': '1955-07-03',
  'name_prefix': None,
  'country': None,
  'country_code': None},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': None,
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'name_prefix': None,
  'country': 'Cuba',
  'country_code': 'CU'},
 {'id': '2945188',
  'username': 'gambledanielle',
  'na

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [17]:
path_list = [DATA_DIR.joinpath(f'reviewers_full/reviewers_full_{i}.xml') for i in range(5)]

In [18]:
%%time

for path in path_list:
    parse_reviewers_xml(path)

Wall time: 1min 4s


In [19]:
parse_reviewers_xml_delayed = dask.delayed(parse_reviewers_xml, pure=True)

In [None]:
%%time

dask.compute([parse_reviewers_xml_delayed(path) for path in path_list], scheduler='threads')

In [None]:
%%time

dask.compute([parse_reviewers_xml_delayed(path) for path in path_list], scheduler='processes')

In [None]:
%%time

dask.compute([parse_reviewers_xml_delayed(path) for path in path_list], scheduler='processes')

In [None]:
from src.worker import parse_reviewers_xml as parse_reviewers_xml_
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

In [None]:
%%time

with ThreadPoolExecutor() as pool:
    pool.map(parse_reviewers_xml_, path_list)

In [None]:
%%time

with ProcessPoolExecutor() as pool:
    pool.map(parse_reviewers_xml_, path_list)

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [None]:
users_bag = db.from_delayed([parse_reviewers_xml_delayed(path) for path in path_list])

In [None]:
processed_users_bag = (users_bag
                 .map(lambda x: {**x, 'birth_year': int(x['birthdate'][:4]) if x['birthdate'] else None})
                 .filter(lambda x: x['birth_year'] and x['birth_year'] > 1980)
                 .map(lambda x: {**x, 'id': int(x['id'])}))

In [None]:
processed_users_bag.take(5)

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [None]:
users_df: dask.dataframe.DataFrame = processed_users_bag.to_dataframe().set_index('id')

In [None]:
users_df.head()

5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [None]:
def is_negative(rating: int):
    return rating < 3


In [None]:
pattern = re.compile(r'^.*reviews_(\d+)\.json$')


def loads(element: tuple[str, str]) -> dict:
    data, path = element
    return {
        'user_id': json.loads(data)['user_id'],
        'rating': int(pattern.match(path).groups()[0])
    }


reviews_bag = db.read_text(
    DATA_DIR.joinpath('reviews_full/*.json'),
    include_path=True,
    blocksize=30_000_000,
).map(loads).filter(lambda x: is_negative(x['rating']))

reviews_bag.take(3)

In [None]:
reviews_df = reviews_bag.to_dataframe()

In [None]:
rating_count_df = reviews_df.groupby('user_id').count().rename(columns={'rating': 'n_neg_reviews'})

In [None]:
joined_df = users_df.join(rating_count_df)

In [None]:
result = joined_df.compute()

In [None]:
result