# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import random
import time
import dask

def cpu_task():
    lst = [random.randint(0,10) for _ in range(100_000)]
    return sum(lst)

def io_task():
    time.sleep(0.1)
    return random.randint(0,10)

In [2]:
cpu_task()

499076

In [3]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 4.74 s, sys: 14.2 ms, total: 4.76 s
Wall time: 4.77 s


In [4]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 3.87 ms, sys: 1.45 ms, total: 5.32 ms
Wall time: 10.4 s


In [5]:
from dask import delayed

In [6]:
cpu_task_delayed = delayed(cpu_task)

In [7]:
cpu_task_delayed().compute()

499290

In [8]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="threading")

CPU times: user 4.49 s, sys: 37.6 ms, total: 4.52 s
Wall time: 4.51 s


In [9]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="multiprocessing")

CPU times: user 23 ms, sys: 31.5 ms, total: 54.5 ms
Wall time: 1.4 s


In [10]:
io_task_delayed = delayed(io_task)

In [11]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="threading")

CPU times: user 17.8 ms, sys: 4.58 ms, total: 22.4 ms
Wall time: 1.35 s


In [12]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="multiprocessing")

CPU times: user 38.7 ms, sys: 34.6 ms, total: 73.3 ms
Wall time: 1.89 s


1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [13]:
import glob
from bs4 import BeautifulSoup
from typing import List
from enum import Enum

In [14]:
%%file xml2dict.py
from bs4 import BeautifulSoup
from typing import List
from enum import Enum

class BSTypes(Enum):
    TAG = "tag"
    ATTR = "attribute"

XML_FIELDS_LIST = [
    {"name":"id", "type":BSTypes.TAG},
    {"name":"username", "type" : BSTypes.TAG},
    {"name": "name", "type" : BSTypes.TAG},
    {"name": "sex", "type" : BSTypes.TAG},
    {"name" : "country", "type" : BSTypes.TAG},
    {"name": "mail", "type" : BSTypes.TAG},
    {"name": "registered", "type" : BSTypes.TAG},
    {"name" : "birthdate", "type": BSTypes.TAG},
    {"name": "prefix", "type" : BSTypes.ATTR, "parent" : None},
    {"name": "code", "type" : BSTypes.ATTR, "parent" : "country"},
]

def xml2dict(path: str) -> List[dict]:
    content = open(path,"r").read()
    soup = BeautifulSoup(content,'xml')
    result_list = []
    
    for user in soup.find_all('user'):
        current_dict = {}
        
        for item in XML_FIELDS_LIST:
            
            #Если это обыкновенный тег
            if item["type"] == BSTypes.TAG:
                item_name = item["name"]
                current_obj = user.find(item_name)
                if current_obj is not None:
                    current_dict[item_name] = current_obj.get_text()
            
            # Если это атрибут, то сначала стучимся к родителю
            else:
                
                #Если у элемента нет родителя, то родителем будет сам текущий элемент user
                item_parent = user.find(item["parent"]) if item["parent"] is not None else user
                item_name = item["name"]
                
                #Проверки на None т.к. не факт, что у текущего user будет такой родитель и атрибут
                if item_parent is not None:
                    current_obj = item_parent.get(item_name)
                    if current_obj is not None:
                        current_dict[item_name] = current_obj

        result_list.append(current_dict)
    
    return result_list

Overwriting xml2dict.py


In [15]:
from xml2dict import xml2dict, XML_FIELDS_LIST

In [16]:
files_list = glob.glob("./data/reviewers_full/reviewers_full_*.xml")
main_result = []
for path in files_list:
    
    buffer_result = xml2dict(path)
    main_result.extend(buffer_result)


print(len(main_result))
for field in XML_FIELDS_LIST:
    res_field = sum(map(lambda x: x.get(field["name"]) is not None, main_result))
    print(f"{field['name']} -> {res_field}")

226570
id -> 226570
username -> 226570
name -> 113368
sex -> 113395
country -> 113144
mail -> 113188
registered -> 112752
birthdate -> 112825
prefix -> 56683
code -> 102074


2. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [17]:
dask.config.set(scheduler='processes') 

<dask.config.set at 0x11c66e970>

In [18]:
%%time
for path in files_list:
    xml2dict(path)

CPU times: user 39.1 s, sys: 159 ms, total: 39.3 s
Wall time: 39.5 s


In [19]:
xml2dict_delayed = delayed(xml2dict)

In [20]:
%%time
res = [xml2dict_delayed(path) for path in files_list]
res_computed = dask.compute(res, scheduler="processes")

CPU times: user 84.1 ms, sys: 42 ms, total: 126 ms
Wall time: 37.8 s


In [21]:
%%time
res = [xml2dict_delayed(path) for path in files_list]
res_computed = dask.compute(res, scheduler="threads")

CPU times: user 38 s, sys: 6.55 s, total: 44.6 s
Wall time: 39.3 s


In [22]:
import multiprocessing as mp

In [23]:
%%time
if __name__ == "__main__":
    with mp.Pool(processes=len(files_list)) as pool:
        counters = pool.map(xml2dict, files_list)

CPU times: user 105 ms, sys: 76 ms, total: 181 ms
Wall time: 10.4 s


**Вывод** Ускорить не получается, но проблема не в моем методе т.к. multiprocessing вручную даёт норм время

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [24]:
import dask.bag as db
from datetime import datetime
from typing import Dict, Any

In [25]:
#чтоб еще раз не дублировать код xml2dict с декоратором через @, прост вызываю dask.delayed, метод то один и тот же
reviewers = [dask.delayed(xml2dict)(path) for path in files_list]
len(reviewers)

5

In [26]:
def my_formater(item : Dict[str, Any]) -> Dict[str, Any]:
    
    #Преобразуйте поле id к целому типу. 
    item["id"] = int(item["id"])
    
    if item.get("birthdate") is not None:
        datetime_obj = datetime.strptime(item["birthdate"], '%Y-%m-%d')
        item["birthdate"] = datetime_obj
        #Добавьте ключ birth_year, в котором хранится год рождения человека
        item["birth_year"] = datetime_obj.year
        
    return item

b = db.from_delayed(reviewers).map(my_formater)
b.take(3)

({'id': 394270,
  'username': 'bridgesdennis',
  'name': 'Melissa Vaughn',
  'sex': 'F',
  'mail': 'carmengonzales@hotmail.com',
  'birthdate': datetime.datetime(1992, 7, 28, 0, 0),
  'prefix': 'Mrs.',
  'birth_year': 1992},
 {'id': 512192,
  'username': 'vanessawilson',
  'name': 'Matthew Roach',
  'birthdate': datetime.datetime(1998, 8, 17, 0, 0),
  'birth_year': 1998},
 {'id': 2199952,
  'username': 'eric77',
  'sex': 'F',
  'country': 'Brazil',
  'code': 'BR'})

In [27]:
def my_filter(item):
    return item.get("birth_year") is not None and item["birth_year"] > 1980

In [28]:
# Оставьте в выборке только тех людей, которые наверняка моложе 1980 года.
result_bag = b.filter(my_filter)
result_bag.take(5)

({'id': 394270,
  'username': 'bridgesdennis',
  'name': 'Melissa Vaughn',
  'sex': 'F',
  'mail': 'carmengonzales@hotmail.com',
  'birthdate': datetime.datetime(1992, 7, 28, 0, 0),
  'prefix': 'Mrs.',
  'birth_year': 1992},
 {'id': 512192,
  'username': 'vanessawilson',
  'name': 'Matthew Roach',
  'birthdate': datetime.datetime(1998, 8, 17, 0, 0),
  'birth_year': 1998},
 {'id': 352465,
  'username': 'cindypierce',
  'name': 'Katherine Coleman',
  'country': 'Slovakia (Slovak Republic)',
  'birthdate': datetime.datetime(1988, 8, 10, 0, 0),
  'birth_year': 1988},
 {'id': 223092,
  'username': 'hintonlevi',
  'name': 'Phillip Smith',
  'sex': 'M',
  'mail': 'daniellenelson@hotmail.com',
  'birthdate': datetime.datetime(1998, 2, 13, 0, 0),
  'prefix': 'Mr.',
  'birth_year': 1998},
 {'id': 85183,
  'username': 'christensenrebecca',
  'name': 'Christopher Raymond',
  'sex': 'M',
  'country': 'Macao',
  'mail': 'mccartystephanie@hotmail.com',
  'birthdate': datetime.datetime(1981, 3, 8, 0, 

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [29]:
result_df = result_bag.to_dataframe().set_index("id")
result_df

Unnamed: 0_level_0,username,name,sex,mail,birthdate,prefix,birth_year
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1676,object,object,object,object,datetime64[ns],object,int64
367135,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
2000077241,...,...,...,...,...,...,...
2002372706,...,...,...,...,...,...,...


In [30]:
result_df.head()

Unnamed: 0_level_0,username,name,sex,mail,birthdate,prefix,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1676,lgeorge,,M,,1983-06-24,,1983
1792,qbeard,,F,rachel20@hotmail.com,1986-03-12,,1986
1938,adambrown,William Fisher,,,1991-11-11,,1991
2046,vthompson,Emily Sanford,F,omelendez@yahoo.com,1981-11-27,,1981
2095,djohnson,Jennifer Hawkins,F,,1984-09-23,Mrs.,1984


In [31]:
result_df.tail()

Unnamed: 0_level_0,username,name,sex,mail,birthdate,prefix,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2002370648,wbradford,,,,2010-07-14,,2010
2002371420,walkershirley,Travis Harris,M,,1994-02-16,Mr.,1994
2002371627,travisbrown,,,kdoyle@gmail.com,2000-06-11,,2000
2002371716,greid,Jose Mata,,,2007-07-01,,2007
2002372706,gibbsnicholas,Jeffrey Rivera,M,thomas67@gmail.com,2007-05-20,,2007


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [32]:
import dask.dataframe as dd
import re
import json

In [33]:
def my_loader(data) -> dict:

    json_str, path = data
    file_number = re.findall('reviews_([0-9]).json', path)
    
    if file_number is None or len(file_number) != 1:
        raise ValueError("Не могу извлечь номер файла")
    
    json_obj = json.loads(json_str)
    json_obj["rating"] = int(file_number[0])
    json_obj["date"] = datetime.strptime(json_obj["date"], '%Y-%m-%d')
    return json_obj

In [34]:
buf = db.read_text(urlpath="../12_dask_bag/data/reviews_full/reviews_*.json", include_path=True).map(my_loader)
reviews_df = buf.to_dataframe()

In [35]:
reviews_df_filtered = reviews_df[(reviews_df['rating'] > -1) & (reviews_df['rating'] < 3)]

In [36]:
reviews_df_filtered.head()

Unnamed: 0,user_id,recipe_id,date,review,rating
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...,0
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...,0
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6...",0
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...,0
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...,0


In [37]:
reviews_df_filtered.tail()

Unnamed: 0,user_id,recipe_id,date,review,rating


In [38]:
#Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. 
reviews_df_filtered.groupby("user_id")["recipe_id"].count().compute()

user_id
1533           64
1535          441
1634           36
1676           29
1755            4
             ... 
2002366476      1
2002368940      1
2002369279      1
2002369650      1
2002372706      1
Name: recipe_id, Length: 145069, dtype: int64

In [39]:
result_df.head()

Unnamed: 0_level_0,username,name,sex,mail,birthdate,prefix,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1676,lgeorge,,M,,1983-06-24,,1983
1792,qbeard,,F,rachel20@hotmail.com,1986-03-12,,1986
1938,adambrown,William Fisher,,,1991-11-11,,1991
2046,vthompson,Emily Sanford,F,omelendez@yahoo.com,1981-11-27,,1981
2095,djohnson,Jennifer Hawkins,F,,1984-09-23,Mrs.,1984


In [40]:
reviews_df_filtered.head()

Unnamed: 0,user_id,recipe_id,date,review,rating
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...,0
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...,0
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6...",0
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...,0
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...,0


In [41]:
reviews_df_filtered[reviews_df_filtered["user_id"] == 1676].compute()

Unnamed: 0,user_id,recipe_id,date,review,rating
12516,1676,43431,2013-12-25,I am not sure what happened. I didn't have any...,0
25382,1676,1219040,1996-02-06,Half the neighborhood gathered in a house acro...,0
45384,1676,108019,2018-12-25,my brother made this and it was awesome! i mad...,0
73233,1676,742672,1990-08-30,Very good! I used a hot salsa and added some ...,0
103017,1676,693834,2008-03-21,I had some leftover egg whites from making gin...,0
128670,1676,1033304,2013-06-28,The family LOVED this meal. Best part is even...,0
144659,1676,1390478,2006-03-27,OH MY! These are so good! 5 Stars hands down...,0
152317,1676,1083489,2006-10-06,"This is Oh, so good! Made for ZWT #7 and the W...",0
167282,1676,1382120,2013-04-05,I made belgian waffles and definitely needed a...,0
168510,1676,1369133,2017-05-11,These sure were tasty little guys. We really e...,0


In [42]:
result_df.loc[1676].compute()

Unnamed: 0_level_0,username,name,sex,mail,birthdate,prefix,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1676,lgeorge,,M,,1983-06-24,,1983


In [43]:
#Объедините результат с таблицей, полученной в задаче 4.
result_merged = result_df.merge(reviews_df_filtered, how='inner', left_index=True, right_on="user_id")

In [44]:
result_merged[result_merged["user_id"] == 1676].compute()

Unnamed: 0,username,name,sex,mail,birthdate,prefix,birth_year,user_id,recipe_id,date,review,rating
12516,lgeorge,,M,,1983-06-24,,1983,1676,43431,2013-12-25,I am not sure what happened. I didn't have any...,0
25382,lgeorge,,M,,1983-06-24,,1983,1676,1219040,1996-02-06,Half the neighborhood gathered in a house acro...,0
45384,lgeorge,,M,,1983-06-24,,1983,1676,108019,2018-12-25,my brother made this and it was awesome! i mad...,0
73233,lgeorge,,M,,1983-06-24,,1983,1676,742672,1990-08-30,Very good! I used a hot salsa and added some ...,0
103017,lgeorge,,M,,1983-06-24,,1983,1676,693834,2008-03-21,I had some leftover egg whites from making gin...,0
128670,lgeorge,,M,,1983-06-24,,1983,1676,1033304,2013-06-28,The family LOVED this meal. Best part is even...,0
144659,lgeorge,,M,,1983-06-24,,1983,1676,1390478,2006-03-27,OH MY! These are so good! 5 Stars hands down...,0
152317,lgeorge,,M,,1983-06-24,,1983,1676,1083489,2006-10-06,"This is Oh, so good! Made for ZWT #7 and the W...",0
167282,lgeorge,,M,,1983-06-24,,1983,1676,1382120,2013-04-05,I made belgian waffles and definitely needed a...,0
168510,lgeorge,,M,,1983-06-24,,1983,1676,1369133,2017-05-11,These sure were tasty little guys. We really e...,0


In [45]:
result_merged.tail()

Unnamed: 0,username,name,sex,mail,birthdate,prefix,birth_year,user_id,recipe_id,date,review,rating
75999,walkershirley,Travis Harris,M,,1994-02-16,Mr.,1994,2002371420,1803798,2018-08-08,I found this to be super easy and quite tasty....,1
126811,travisbrown,,,kdoyle@gmail.com,2000-06-11,,2000,2002371627,153647,2018-12-19,Best thing about this recipe? I didn't have to...,0
47693,travisbrown,,,kdoyle@gmail.com,2000-06-11,,2000,2002371627,745689,2011-07-28,This Meatloaf is the only meatloaf i make in m...,2
485660,greid,Jose Mata,,,2007-07-01,,2007,2002371716,203755,2018-12-19,The chocolate lace cookie is not truly a lace ...,0
37856,gibbsnicholas,Jeffrey Rivera,M,thomas67@gmail.com,2007-05-20,,2007,2002372706,147999,2017-01-10,DELICIOUS AND FUN!!! My family always made red...,2


#### [версия 2]
* Уточнена формулировка задачи 1