# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [None]:
import random
import time

def cpu_task():
  lst = [random.randint(0, 10) for _ in range(100_000)]
  return sum(lst)

def io_task():
  time.sleep(0.1)
  return random.randint(0, 10)

In [None]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 14.2 s, sys: 38.8 ms, total: 14.2 s
Wall time: 14.2 s


In [None]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 41 ms, sys: 6.05 ms, total: 47 ms
Wall time: 10 s


In [None]:
import dask

In [None]:
cpu_task_delayed = dask.delayed(cpu_task)

In [None]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res)

CPU times: user 14.1 s, sys: 127 ms, total: 14.2 s
Wall time: 14.2 s


In [None]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'threading')

CPU times: user 15.6 s, sys: 151 ms, total: 15.7 s
Wall time: 16.1 s


In [None]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 200 ms, sys: 44.7 ms, total: 245 ms
Wall time: 15 s


In [None]:
io_task_delayed = dask.delayed(io_task)

In [None]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'threading')

CPU times: user 54.8 ms, sys: 12.3 ms, total: 67.1 ms
Wall time: 5.04 s


In [None]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 118 ms, sys: 31.4 ms, total: 150 ms
Wall time: 5.12 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [24]:
import os
import dask.bag as db
import dask
from bs4 import BeautifulSoup

In [57]:
def read(number = 0):
    with open(os.path.join('data','reviewers_full',f'reviewers_full_{number}.xml')) as f:
        data = BeautifulSoup(f,'xml')
    parsed_xml = []
    keys = ['id','username','name','sex','country','mail','registered','birthdate','name_prefix','country_code']
    for users in data.find_all('users'):
        for user in users.find_all('user'):
            resulting_dict = {}
            for key in keys:
                if user.find(key):
                    resulting_dict[key] = user.find(key).text
                else:
                    resulting_dict[key] = 'Nan'
            parsed_xml.append(resulting_dict)
    return(parsed_xml)

In [70]:
int(1)

1

In [58]:
read()

[{'id': '556011',
  'username': 'gabrielacalhoun',
  'name': '',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '1988-01-25',
  'name_prefix': '',
  'country_code': ''},
 {'id': '1251087',
  'username': 'qbaxter',
  'name': '',
  'sex': '',
  'country': 'Norway',
  'mail': 'qware@gmail.com',
  'registered': '',
  'birthdate': '1985-01-19',
  'name_prefix': '',
  'country_code': ''},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'sex': '',
  'country': '',
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': '1955-07-03',
  'name_prefix': '',
  'country_code': ''},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': '',
  'country': 'Cuba',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'name_prefix': '',
  'country_code': ''},
 {'id': '2945188',
  'username': 'gambledanielle',
  'name': 'Henry Harris',
  'sex': '',
  'c

2. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [59]:
%%time
files = range(0,5)
_ = [read(i) for i in files]

CPU times: user 1min 8s, sys: 357 ms, total: 1min 8s
Wall time: 1min 8s


In [60]:
%%time
read_delayed = dask.delayed(read)
_ =dask.compute([read_delayed(i) for i in files],scheduler = 'multiprocessing')

CPU times: user 193 ms, sys: 188 ms, total: 381 ms
Wall time: 1min 5s


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [61]:
@dask.delayed
def read_decorated(number = 0):
    with open(os.path.join('data','reviewers_full',f'reviewers_full_{number}.xml')) as f:
        data = BeautifulSoup(f,'xml')
    parsed_xml = []
    keys = ['id','username','name','sex','country','mail','registered','birthdate','name_prefix','country_code']
    for users in data.find_all('users'):
        for user in users.find_all('user'):
            resulting_dict = {}
            for key in keys:
                if user.find(key):
                    resulting_dict[key] = user.find(key).text
                else:
                    resulting_dict[key] = ''
            parsed_xml.append(resulting_dict)
    return(parsed_xml)

In [62]:
reviewers = db.from_delayed([read_decorated(i) for i in files])

In [75]:
def make_int(review):
    review.update({'id': int(review['id'])})
    return review

def filter_birthdate(review):
    birthyear = review['birthdate'][:4]
    try:
        birthyear = int(birthyear)
    except:
        birthyear = 0
    if birthyear > 1980:
        return review

filtered = reviewers.map(make_int).filter(filter_birthdate).compute()
filtered

[{'id': 556011,
  'username': 'gabrielacalhoun',
  'name': '',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '1988-01-25',
  'name_prefix': '',
  'country_code': ''},
 {'id': 1251087,
  'username': 'qbaxter',
  'name': '',
  'sex': '',
  'country': 'Norway',
  'mail': 'qware@gmail.com',
  'registered': '',
  'birthdate': '1985-01-19',
  'name_prefix': '',
  'country_code': ''},
 {'id': 250427,
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': '',
  'country': 'Cuba',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'name_prefix': '',
  'country_code': ''},
 {'id': 452355,
  'username': 'smullen',
  'name': 'Cynthia Johnson',
  'sex': 'F',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '2005-03-29',
  'name_prefix': '',
  'country_code': ''},
 {'id': 2056668,
  'username': 'franklinhancock',
  'name': 'Michele Richards',
  'sex': '',
  'country': 'Namibia',
  'mail': 'claudiamccoy

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [81]:
df = reviewers.map(make_int).filter(filter_birthdate).to_dataframe().set_index('id')
df

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1676,object,object,object,object,object,object,object,object,object
367135,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
2000249591,...,...,...,...,...,...,...,...,...
2002372706,...,...,...,...,...,...,...,...,...


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

#### [версия 2]
* Уточнена формулировка задачи 1

In [83]:
reviews_full = dask.dataframe.read_json(['../12_dask_bag/data/reviews_full/reviews_0.json',
                                         '../12_dask_bag/data/reviews_full/reviews_1.json',
                                         '../12_dask_bag/data/reviews_full/reviews_2.json'])

In [84]:
grouped = reviews_full.groupby('user_id').review.count()
grouped

Dask Series Structure:
npartitions=1
    int64
      ...
Name: review, dtype: int64
Dask Name: series-groupby-count-agg, 10 tasks

In [89]:
grouped.compute()

user_id
1533           64
1535          441
1634           36
1676           29
1755            4
             ... 
2002366476      1
2002368940      1
2002369279      1
2002369650      1
2002372706      1
Name: review, Length: 145069, dtype: int64

In [90]:
resulting_df = df.join(grouped.rename('reviews_count'))
resulting_df.head()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,reviews_count
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,lgeorge,,M,,,,1983-06-24,,,29.0
1792,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,,,14.0
1938,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,,,3.0
2046,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,,,3.0
2095,djohnson,Jennifer Hawkins,F,Jamaica,,,1984-09-23,,,
