# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import random
import time
import dask

def cpu_task():
    lst = [random.randint(0,10) for _ in range(100_000)]
    return sum(lst)

def io_task():
    time.sleep(0.1)
    return random.randint(0,10)

In [2]:
cpu_task()

500279

In [3]:
%%time
res = [cpu_task() for _ in range(100)]

Wall time: 6 s


In [4]:
%%time
res = [io_task() for _ in range(100)]

Wall time: 10.9 s


In [5]:
from dask import delayed

In [6]:
cpu_task_delayed = delayed(cpu_task)

In [7]:
cpu_task_delayed().compute()

500245

In [8]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="threading")

Wall time: 6.49 s


In [9]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="multiprocessing")

Wall time: 2.76 s


In [10]:
io_task_delayed = delayed(io_task)

In [11]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="threading")

Wall time: 997 ms


In [12]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler="multiprocessing")

Wall time: 2.11 s


## Лабораторная работа 14

In [42]:
from bs4 import BeautifulSoup
from datetime import datetime
import glob
import dask
import dask.bag as db
import json
import re

1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [13]:
tag_list = ['id', 'username', 'name', 'sex', 'country', 'mail', 'registered', 'birthdate']

def reader(path):
    file = open(path,"r").read()
    data = BeautifulSoup(file,'xml')
    result_list = []
    
    for user in data.find_all('user'):
        user_dict = {}
        if user.get('prefix') is not None:
            user_dict['name_prefix'] = user.get('prefix')
        
        for tag in tag_list:
            user_tag = user.find(tag)
            if user_tag is not None:
                if tag == 'birthdate':
                    user_dict[tag] = datetime.strptime(user_tag.get_text(), '%Y-%m-%d')
                else:
                    user_dict[tag] = user_tag.get_text()

                if tag == 'country' and user.find(tag).get('code') is not None:
                    user_dict['country_code'] = user.find(tag).get('code')

        result_list.append(user_dict)
    
    return result_list

In [14]:
files_list = glob.glob("../data/reviewers_full/reviewers_full_*.xml")
result = []
for path in files_list:
    result.extend(reader(path))

result[:5]

[{'name_prefix': 'Mrs.',
  'id': '556011',
  'username': 'gabrielacalhoun',
  'sex': 'F',
  'birthdate': datetime.datetime(1988, 1, 25, 0, 0)},
 {'id': '1251087',
  'username': 'qbaxter',
  'country': 'Norway',
  'country_code': 'NO',
  'mail': 'qware@gmail.com',
  'birthdate': datetime.datetime(1985, 1, 19, 0, 0)},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': datetime.datetime(1955, 7, 3, 0, 0)},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'country': 'Cuba',
  'country_code': 'CU',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': datetime.datetime(2007, 4, 30, 0, 0)},
 {'id': '2945188',
  'username': 'gambledanielle',
  'name': 'Henry Harris',
  'country': 'Serbia',
  'country_code': 'RS',
  'registered': '2011-04-08'}]

2. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [15]:
%%time
for path in files_list:
    reader(path)

Wall time: 1min 34s


In [20]:
delayed_reader = dask.delayed(reader)

In [22]:
%%time
dask.compute([delayed_reader(path) for path in files_list], scheduler="processes")

Wall time: 1min 24s


([[{'name_prefix': 'Mrs.',
    'id': '556011',
    'username': 'gabrielacalhoun',
    'sex': 'F',
    'birthdate': datetime.datetime(1988, 1, 25, 0, 0)},
   {'id': '1251087',
    'username': 'qbaxter',
    'country': 'Norway',
    'country_code': 'NO',
    'mail': 'qware@gmail.com',
    'birthdate': datetime.datetime(1985, 1, 19, 0, 0)},
   {'id': '537188',
    'username': 'crosschristopher',
    'name': 'Dana Moore',
    'mail': 'stephaniestrong@yahoo.com',
    'registered': '2018-11-21',
    'birthdate': datetime.datetime(1955, 7, 3, 0, 0)},
   {'id': '250427',
    'username': 'karen27',
    'name': 'Jennifer Horne',
    'country': 'Cuba',
    'country_code': 'CU',
    'mail': 'wjarvis@yahoo.com',
    'registered': '2013-11-20',
    'birthdate': datetime.datetime(2007, 4, 30, 0, 0)},
   {'id': '2945188',
    'username': 'gambledanielle',
    'name': 'Henry Harris',
    'country': 'Serbia',
    'country_code': 'RS',
    'registered': '2011-04-08'},
   {'name_prefix': 'Miss',
    'id':

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [23]:
reviewers = [dask.delayed(reader)(path) for path in files_list]

In [32]:
reviewers_bag = db.from_delayed(reviewers)

In [None]:
def add_year(item):
    if item.get('birthdate') is not None:
        item['birth_year'] = item.get('birthdate').year
        
    item['id'] = int(item['id'])

    return item

reviewers_bag_new = reviewers_bag.map(add_year).filter(lambda x: x.get('birth_year') is not None and x['birth_year'] > 1980)
reviewers_bag_new.take(5)

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [None]:
reviewers_df = reviewers_bag_new.to_dataframe().set_index('id')
reviewers_df.head(5)

5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [43]:
 def add_rating(data):
    string, path = data
    json_obj = json.loads(string)
    json_obj['rating'] = int(re.findall('reviews_([0-9]).json', path)[0])
    json_obj['date'] = datetime.strptime(json_obj['date'], '%Y-%m-%d')
    return json_obj

reviews = db.read_text(urlpath='../data/reviews_full/reviews_*.json', include_path=True).map(add_rating).to_dataframe()
reviews.head(5)

Unnamed: 0,user_id,recipe_id,date,review,rating
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...,0
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...,0
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6...",0
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...,0
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...,0


In [None]:
reviews_neg = reviews[(reviews['rating'] > -1) & (reviews['rating'] < 3)]
reviews_neg_count = reviews_neg.groupby(reviews_neg['user_id'])['recipe_id'].count().compute()
reviews_neg_count

In [None]:
reviewers_bad_count = reviewers_df.merge(reviews_neg_count, how='inner', left_index=True, right_on='user_id')
reviewers_bad_count

#### [версия 2]
* Уточнена формулировка задачи 1