# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [21]:
import bs4
import dask
import glob
import json
import dask.bag as db
import dask.dataframe as dd

## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [4]:
def read(path):
    with open(path) as file:
        xml = bs4.BeautifulSoup(file, 'lxml-xml')
    
    users = []
    keys = ['id', 'username', 'name', 'sex', 'country', 'mail', 'registered', 'birthdate', 'prefix', 'country']
    
    for data in xml.find_all('user'):
        user = {}
        for key in keys:
            tag = data.find(key)
            user[key] = tag.get_text() if tag else None
            if key == 'country' and tag:
                country_code = tag.get('code', None)
                user['country_code'] = country_code if country_code else None
                
        users.append(user)
        
    return users     

In [5]:
read('data/14_delayed_data/reviewers_full/reviewers_full_0.xml')

[{'id': '556011',
  'username': 'gabrielacalhoun',
  'name': None,
  'sex': 'F',
  'country': None,
  'mail': None,
  'registered': None,
  'birthdate': '1988-01-25',
  'prefix': None},
 {'id': '1251087',
  'username': 'qbaxter',
  'name': None,
  'sex': None,
  'country': 'Norway',
  'country_code': 'NO',
  'mail': 'qware@gmail.com',
  'registered': None,
  'birthdate': '1985-01-19',
  'prefix': None},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'sex': None,
  'country': None,
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': '1955-07-03',
  'prefix': None},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': None,
  'country': 'Cuba',
  'country_code': 'CU',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'prefix': None},
 {'id': '2945188',
  'username': 'gambledanielle',
  'name': 'Henry Harris',
  'sex': None,
  'country': 'Serbia',
  'country_

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [7]:
%%time
files = glob.glob("data/14_delayed_data/reviewers_full/*.xml")

for file in files:
    read(file)

Wall time: 2min 7s


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [12]:
@dask.delayed
def read(path):
    with open(path) as file:
        xml = bs4.BeautifulSoup(file, 'lxml-xml')
    
    users = []
    keys = ['id', 'username', 'name', 'sex', 'country', 'mail', 'registered', 'birthdate', 'prefix', 'country']
    
    for data in xml.find_all('user'):
        user = {}
        for key in keys:
            tag = data.find(key)
            user[key] = tag.get_text() if tag else None
            if key == 'country' and tag:
                country_code = tag.get('code', None)
                user['country_code'] = country_code if country_code else None
                
        users.append(user)
        
    return users 

In [13]:
%%time
files = glob.glob("data/14_delayed_data/reviewers_full/*.xml")
reviewers = []
for file in files:
    reviewers.append(read(file))

Wall time: 1 ms


In [14]:
reviewers_db = db.from_delayed(reviewers)

In [18]:
def id_to_int(obj):
    obj['id'] = int(obj['id'])
    return obj

def add_birthday_year(obj):
    obj['birth_year'] = int(obj['birthdate'].split('-')[0]) if obj['birthdate'] else None
    return obj

def filter_birth_year(obj):
    return obj['birth_year'] > 1980 if obj['birth_year'] else False


process_reviewers_db = reviewers_db.map(id_to_int).map(add_birthday_year).filter(filter_birth_year)
process_reviewers_db.take(5)

({'id': 556011,
  'username': 'gabrielacalhoun',
  'name': None,
  'sex': 'F',
  'country': None,
  'mail': None,
  'registered': None,
  'birthdate': '1988-01-25',
  'prefix': None,
  'birth_year': 1988},
 {'id': 1251087,
  'username': 'qbaxter',
  'name': None,
  'sex': None,
  'country': 'Norway',
  'country_code': 'NO',
  'mail': 'qware@gmail.com',
  'registered': None,
  'birthdate': '1985-01-19',
  'prefix': None,
  'birth_year': 1985},
 {'id': 250427,
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': None,
  'country': 'Cuba',
  'country_code': 'CU',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'prefix': None,
  'birth_year': 2007},
 {'id': 452355,
  'username': 'smullen',
  'name': 'Cynthia Johnson',
  'sex': 'F',
  'country': None,
  'mail': None,
  'registered': None,
  'birthdate': '2005-03-29',
  'prefix': None,
  'birth_year': 2005},
 {'id': 2056668,
  'username': 'franklinhancock',
  'name': 'Michele Richards',
  

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [19]:
reviewers_df = process_reviewers_db.to_dataframe()
reviewers_df = reviewers_df.set_index('id')
reviewers_df.head()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,prefix,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1676,lgeorge,,M,,,,1983-06-24,,1983
1792,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,,1986
1938,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,,1991
2046,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,,1981
2095,djohnson,Jennifer Hawkins,F,Jamaica,,,1984-09-23,,1984


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [26]:
files = glob.glob("../12/data/reviews_full/*[0-2].json")
jsons = db.read_text(files).map(json.loads).to_dataframe()
jsons.head()

Unnamed: 0,user_id,recipe_id,date,review
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6..."
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...


In [57]:
count_df = jsons.groupby('user_id').count().rename(columns={'review': 'count'})
count_df.compute()

Unnamed: 0_level_0,recipe_id,date,count
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1533,64,64,64
1535,441,441,441
1634,36,36,36
1676,29,29,29
1755,4,4,4
...,...,...,...
2002366476,1,1,1
2002368940,1,1,1
2002369279,1,1,1
2002369650,1,1,1


In [58]:
merged_df = dd.merge(jsons, count_df['count'], how='left', left_index=True, right_index=True)

In [59]:
merged_df.compute()

Unnamed: 0,user_id,recipe_id,date,review,count
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...,
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...,
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6...",
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...,
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...,
...,...,...,...,...,...
112556,383346,45919,2016-08-21,This made a wonderful and delicious breakfast!...,
112557,353659,253948,2015-04-24,"I made this over the weekend, I filled it with...",
112558,50969,304989,2017-12-20,A solid introduction to a grand Mediterranean ...,
112559,666723,1792397,2021-06-08,I absolutely LOVE your recipe and I have made ...,
