# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

In [184]:
from bs4 import BeautifulSoup
import time
import random
from dask import delayed, compute
import dask.bag as db
import dask.dataframe as dd

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [2]:
def cpu_task():
    return sum([random.randint(0,10) for _ in range(100_000)])


In [3]:
%%time
res =[cpu_task() for _ in range(100)]

Wall time: 6.85 s


In [5]:
def io_task():
    time.sleep(0.1)
    return random.randint(0, 10)

In [6]:
%%time
res =[io_task() for _ in range(100)]

Wall time: 10.9 s


In [8]:
cpu_task_delayed = delayed(cpu_task)

In [10]:
cpu_task_delayed().compute()

500276

In [20]:
%%time
res =[cpu_task_delayed() for _ in range(100)]
res = compute(res, scheduler='multiprocessing')

Wall time: 2.85 s


In [17]:
io_task_delayed = delayed(io_task)

In [19]:
%%time
res =[io_task_delayed() for _ in range(100)]
res = compute(res, scheduler='multiprocessing')

Wall time: 2.23 s


In [21]:
%%time
res =[io_task_delayed() for _ in range(100)]
res = compute(res, scheduler='threading')

Wall time: 1.42 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [106]:
def parse_xml(n):
    with open(f'reviewers_full/reviewers_full_{n}.xml', 'r', encoding='utf8') as fp:
        data = BeautifulSoup(fp, 'xml')
    revs_lst = []
    for user in data.find_all('user'):
        rev_dic = {}
        if (user.find('username')):
            rev_dic['username'] = user.username.text
        if (user.find('id')):
            rev_dic['id'] = user.id.text
        if (user.find('name')):
            rev_dic['name'] = user.find('name').text
        if (user.find('sex')):
            rev_dic['sex'] = user.sex.text
        if (user.find('country')):
            rev_dic['country'] = user.country.text
            if ('code' in user.country.attrs.keys()):
                rev_dic['country_code'] = user.country.attrs['code']
        if (user.find('mail')):
            rev_dic['mail'] = user.mail.text
        if (user.find('registered')):
            rev_dic['registered'] = user.registered.text
        if (user.find('birthdate')):
            rev_dic['birthdate'] = user.birthdate.text
        if ('name_prefix' in user.attrs.keys()):
            rev_dic['name_prefix'] = user.attrs.prefix
        revs_lst.append(rev_dic)
    return revs_lst

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [74]:
%%time
res = [parse_xml(i) for i in range(5)]

Wall time: 1min 25s


In [77]:
res[4][-1]

{'username': 'anthony04',
 'sex': 'M',
 'registered': '2001-11-05',
 'birthdate': '1990-08-16'}

In [108]:
parse_xml_delayed = delayed(parse_xml)

In [82]:
%%time
res = [parse_xml_delayed(i) for i in range(5)]
res = compute(res, scheduler='multiprocessing')

Wall time: 1min 19s


In [88]:
res[0][4][-1]

{'username': 'anthony04',
 'sex': 'M',
 'registered': '2001-11-05',
 'birthdate': '1990-08-16'}

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [109]:
reviewers = [parse_xml_delayed(i) for i in range(5)]
reviewers

[Delayed('parse_xml-090eb431-3c31-4d2a-adaa-4995689c3756'),
 Delayed('parse_xml-c649b426-07c3-41b1-b910-fee5e70d63d0'),
 Delayed('parse_xml-ddbf2762-ee1a-484f-b8f2-42ac17bdbbc5'),
 Delayed('parse_xml-fe26056b-b159-40d4-80db-68ab44253790'),
 Delayed('parse_xml-cd12b84b-9a1c-4100-871c-b28c4aecbb08')]

In [110]:
reviewers_bag = db.from_delayed(reviewers)

In [111]:
def preproc(reviewer):
    reviewer['id'] = int(reviewer['id'])
    if ('birthdate' in reviewer.keys()):
        reviewer['birth_year'] = int(reviewer['birthdate'][:4])
        if reviewer['birth_year'] < 1980:
            return
    return reviewer

In [168]:
proc_bag = reviewers_bag.map(preproc).filter(bool)

In [169]:
proc_bag_res = proc_bag.compute()

In [172]:
proc_bag_res[0]

{'username': 'gabrielacalhoun',
 'id': 556011,
 'sex': 'F',
 'birthdate': '1988-01-25',
 'birth_year': 1988}

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [176]:
reviewers_dd = proc_bag.to_dataframe(meta={'id':'int64','username': 'object', 'name': 'object', 'sex': 'object', 'country': 'object', 'mail': 'object', 'registered':'datetime64[ns]', 'birthdate':'datetime64[ns]', 'name_prefix': 'object', 'country_code': 'object', 'birth_year': 'float64'})

In [182]:
reviewers_dd = reviewers_dd.set_index('id')

In [183]:
reviewers_dd.head()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1535,gina35,Robert Graham,,Algeria,,NaT,NaT,,DZ,
1581,ehenry,Scott Burton,,,salazardiana@gmail.com,2007-01-07,NaT,,,
1634,pughdawn,Connor Ibarra,,Angola,,2019-07-03,NaT,,AO,
1676,lgeorge,,M,,,NaT,1983-06-24,,,1983.0
1755,amycarter,,M,,ybrandt@yahoo.com,NaT,NaT,,,


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [185]:
neg_revs = dd.read_json(['./reviews_full/reviews_0.json','./reviews_full/reviews_1.json','./reviews_full/reviews_2.json'])  

In [190]:
neg_revs_gr = neg_revs.groupby('user_id').recipe_id.count()

In [217]:
neg_revs_dd = neg_revs_gr.to_frame(name='n_recipes')
neg_revs_dd

Unnamed: 0_level_0,n_recipes
npartitions=1,Unnamed: 1_level_1
,int64
,...


In [218]:
neg_revs_dd.compute()

Unnamed: 0_level_0,n_recipes
user_id,Unnamed: 1_level_1
1533,64
1535,441
1634,36
1676,29
1755,4
...,...
2002366476,1
2002368940,1
2002369279,1
2002369650,1


In [222]:
reviewers_dd_merged = reviewers_dd.merge(neg_revs_dd, 'left', left_index=True, right_index=True)

In [223]:
reviewers_dd_merged.head()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,birth_year,n_recipes
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1535,gina35,Robert Graham,,Algeria,,NaT,NaT,,DZ,,441.0
1581,ehenry,Scott Burton,,,salazardiana@gmail.com,2007-01-07,NaT,,,,1.0
1634,pughdawn,Connor Ibarra,,Angola,,2019-07-03,NaT,,AO,,36.0
1676,lgeorge,,M,,,NaT,1983-06-24,,,1983.0,29.0
1755,amycarter,,M,,ybrandt@yahoo.com,NaT,NaT,,,,4.0
