# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [3]:
import random
import time

def cpu_task():
    return sum([random.randint(0, 10) for _ in range(100_000)])

def io_task():
    time.sleep(0.1)
    return random.randint(0, 10)

In [2]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 9.95 s, sys: 177 ms, total: 10.1 s
Wall time: 10.9 s


In [4]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 5.54 ms, sys: 3.77 ms, total: 9.31 ms
Wall time: 10.3 s


In [40]:
import dask.delayed
import dask

In [7]:
cpu_task_delayed = dask.delayed(cpu_task)


In [8]:
%%time
res = [cpu_task_delayed() for _ in range(100)]

CPU times: user 5.73 ms, sys: 2.77 ms, total: 8.5 ms
Wall time: 7.54 ms


In [9]:
res[0]

Delayed('cpu_task-53d194b2-76bc-42e2-957f-7c7cf3f59960')

In [10]:
%%time
dask.compute(res)

CPU times: user 9.4 s, sys: 184 ms, total: 9.58 s
Wall time: 9.85 s


([498400,
  499665,
  501939,
  499901,
  500605,
  499276,
  499646,
  500730,
  499099,
  498819,
  498789,
  499527,
  499173,
  499471,
  499531,
  499473,
  500557,
  499408,
  498479,
  498925,
  499816,
  499546,
  502301,
  500317,
  500137,
  500768,
  498616,
  499532,
  501895,
  501611,
  499793,
  501672,
  499065,
  500079,
  499341,
  500204,
  499823,
  499438,
  502186,
  501588,
  499894,
  499891,
  498232,
  499913,
  499879,
  498753,
  500960,
  500002,
  500170,
  500380,
  499846,
  499294,
  500603,
  500096,
  499058,
  498961,
  499924,
  498658,
  499944,
  498471,
  499720,
  499130,
  499708,
  499410,
  500424,
  500445,
  499666,
  499352,
  500828,
  501393,
  500057,
  498958,
  499745,
  500092,
  498578,
  501052,
  500007,
  501928,
  500262,
  498976,
  500755,
  498426,
  501141,
  501516,
  501775,
  501104,
  500825,
  501578,
  499901,
  499309,
  499436,
  500073,
  500663,
  498764,
  498405,
  498618,
  500288,
  498926,
  500978,
  500418],

In [12]:
%%time
res1 = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 82.7 ms, sys: 37.8 ms, total: 121 ms
Wall time: 5.73 s


In [13]:
%%time
res1 = dask.compute(res, scheduler = 'threading')

CPU times: user 9.79 s, sys: 172 ms, total: 9.96 s
Wall time: 10.3 s


In [14]:
io_task_delayed = dask.delayed(io_task)

In [15]:
%%time
res = [io_task_delayed() for _ in range(100)]
res1 = dask.compute(res)

CPU times: user 38.6 ms, sys: 15.4 ms, total: 54 ms
Wall time: 2.63 s


In [16]:
%%time
res = [io_task_delayed() for _ in range(100)]
res1 = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 109 ms, sys: 46.4 ms, total: 156 ms
Wall time: 3.15 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [26]:
# распаковываем из зип файла
from zipfile import ZipFile

with ZipFile('./data/reviewers_full.zip', 'r') as zip_file:
    zip_file.extractall('./data')

In [163]:
from bs4 import BeautifulSoup

def func1(i):
    itog = []

    with open(f'./data/reviewers_full/reviewers_full_{i}.xml', 'r', encoding='utf8') as bsf:
        ab = BeautifulSoup(bsf, 'xml')

    res = []
    for user in ab.users.find_all('user'):    
        idd = user.find('id').text if user.find('id') else ''
        un = user.find('username').text if user.find('username') else ''
        nm = user.find('name').text if user.find('name') else ''
        nmpr = user.get("prefix")
        sx = user.find('sex').text if user.find('sex') else ''
        cn = user.find('country').text if user.find('country') else ''
        cncode = user.find('country').get("code") if user.find('country') else ''
        ml = user.find('mail').text if user.find('mail') else ''
        rg = user.find('registered').text if user.find('registered') else ''
        bd = user.find('birthdate').text if user.find('birthdate') else ''
        res.append({'id':idd, 'username':un, 'name':nm, 'name_prefix':nmpr, 'sex':sx, 'country':cn, 'mail':ml, 'registered':rg, 'birthdate':bd, "birth_year":'', "country_code":cncode})

    return res

In [164]:
func1(0)

[{'id': '556011',
  'username': 'gabrielacalhoun',
  'name': '',
  'name_prefix': 'Mrs.',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '1988-01-25',
  'birth_year': '',
  'country_code': ''},
 {'id': '1251087',
  'username': 'qbaxter',
  'name': '',
  'name_prefix': None,
  'sex': '',
  'country': 'Norway',
  'mail': 'qware@gmail.com',
  'registered': '',
  'birthdate': '1985-01-19',
  'birth_year': '',
  'country_code': 'NO'},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'name_prefix': None,
  'sex': '',
  'country': '',
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': '1955-07-03',
  'birth_year': '',
  'country_code': ''},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'name_prefix': None,
  'sex': '',
  'country': 'Cuba',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'birth_year': '',
  'country_code': 'CU'},


2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [165]:
%%time
res2 = [func1(i) for i in range(5)]

CPU times: user 2min 29s, sys: 2.71 s, total: 2min 32s
Wall time: 2min 44s


In [166]:
%%time
func1_delayed = dask.delayed(func1)
res = [func1_delayed(i) for i in range(5)]
res22 = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 929 ms, sys: 1.18 s, total: 2.11 s
Wall time: 2min 17s


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [56]:
from dask import delayed
import dask.bag as db
from bs4 import BeautifulSoup

In [115]:
@delayed
def func2(i):
    itog = []

    with open(f'./data/reviewers_full/reviewers_full_{i}.xml', 'r', encoding='utf8') as bsf:
        ab = BeautifulSoup(bsf, 'xml')

    res = []
    for user in ab.users.find_all('user'):    
        idd = user.find('id').text if user.find('id') else ''
        un = user.find('username').text if user.find('username') else ''
        nm = user.find('name').text if user.find('name') else ''
        nmpr = user.get("prefix")
        sx = user.find('sex').text if user.find('sex') else ''
        cn = user.find('country').text if user.find('country') else ''
        cncode = user.find('country').get("code") if user.find('country') else ''
        ml = user.find('mail').text if user.find('mail') else ''
        rg = user.find('registered').text if user.find('registered') else ''
        bd = user.find('birthdate').text if user.find('birthdate') else ''
        
        res.append({'id':idd, 'username':un, 'name':nm, 'name_prefix':nmpr, 'sex':sx, 'country':cn, 'mail':ml, 'registered':rg, 'birthdate':bd, "birth_year":'', "country_code":cncode})

    return res

In [116]:
reviewers = db.from_delayed([func2(i) for i in range(5)])

In [121]:
reviewers

dask.bag<bag-from-delayed, npartitions=5>

In [117]:
def edit(x):
    x.update({'birth_year':x['birthdate'][:4]})
    x.update({'id': int(x['id'])})
    return x

In [118]:
def year(x):
    if x['birth_year'] > '1980':
        return x

In [119]:
itog = reviewers.map(edit).filter(year)

In [120]:
itog.compute()

[{'id': 556011,
  'username': 'gabrielacalhoun',
  'name': '',
  'name_prefix': 'Mrs.',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '1988-01-25',
  'birth_year': '1988',
  'country_code': ''},
 {'id': 1251087,
  'username': 'qbaxter',
  'name': '',
  'name_prefix': None,
  'sex': '',
  'country': 'Norway',
  'mail': 'qware@gmail.com',
  'registered': '',
  'birthdate': '1985-01-19',
  'birth_year': '1985',
  'country_code': 'NO'},
 {'id': 250427,
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'name_prefix': None,
  'sex': '',
  'country': 'Cuba',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'birth_year': '2007',
  'country_code': 'CU'},
 {'id': 452355,
  'username': 'smullen',
  'name': 'Cynthia Johnson',
  'name_prefix': 'Miss',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '2005-03-29',
  'birth_year': '2005',
  'country_code': ''},
 {'id': 2056668,
  'username

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [122]:
df_reviewers = itog.to_dataframe().set_index('id')

In [123]:
df_reviewers

Unnamed: 0_level_0,username,name,name_prefix,sex,country,mail,registered,birthdate,birth_year,country_code
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,object,object,object,object,object,object,object,object,object,object
364847,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
2000210387,...,...,...,...,...,...,...,...,...,...
2002372706,...,...,...,...,...,...,...,...,...,...


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [124]:
import dask.dataframe as dd

In [125]:
dd5 = dd.read_json(['../12_dask_bag/reviews_full/reviews_0.json',
                  '../12_dask_bag/reviews_full/reviews_1.json',
                  '../12_dask_bag/reviews_full/reviews_2.json'])

In [149]:
dd5.head()

Unnamed: 0,user_id,recipe_id,date,review
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6..."
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...


In [160]:
group_us_rec = dd5.groupby('user_id').review.count()
group_us_rec

Dask Series Structure:
npartitions=1
    int64
      ...
Name: review, dtype: int64
Dask Name: series-groupby-count-agg, 10 tasks

In [161]:
itog_merge = df_reviewers.merge(group_us_rec.rename('amount_of_reviews').to_frame())

In [162]:
itog_merge.head()

Unnamed: 0,username,name,name_prefix,sex,country,mail,registered,birthdate,birth_year,country_code,amount_of_reviews
1676,lgeorge,,,M,,,,1983-06-24,1983,,29
1792,qbeard,,,F,Guinea,rachel20@hotmail.com,,1986-03-12,1986,GN,14
1938,adambrown,William Fisher,,,New Caledonia,,2019-05-03,1991-11-11,1991,NC,3
2046,vthompson,Emily Sanford,,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,1981,AE,3
2178,michaeljones,Anthony Santiago,Mr.,M,Haiti,jessicaharris@gmail.com,2013-12-12,1985-08-12,1985,HT,20
