# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [8]:
import bs4


def read_xml(path):
    with open(path) as f:
        xml = bs4.BeautifulSoup(f, 'lxml-xml')

    users = []
    for data in xml.find_all('user'):
        user = {}

        id_tag = data.find('id')
        if id_tag:
            user['id'] = id_tag.get_text()

        username_tag = data.find('username')
        if username_tag:
            user['username'] = username_tag.get_text()

        name_tag = data.find('name')
        if name_tag:
            user['name'] = name_tag.get_text()

        sex_tag = data.find('sex')
        if sex_tag:
            user['sex'] = sex_tag.get_text()

        mail_tag = data.find('mail')
        if mail_tag:
            user['mail'] = mail_tag.get_text()

        registered_tag = data.find('registered')
        if registered_tag:
            user['registered'] = registered_tag.get_text()

        birthdate_tag = data.find('birthdate')
        if birthdate_tag:
            user['birthdate'] = birthdate_tag.get_text()

        name_prefix = user.get('prefix', None)
        if name_prefix:
            user['name_prefix'] = name_prefix

        country_tag = data.find('country')
        if country_tag:
            user['country'] = country_tag.get_text()
            country_code = country_tag.get('code', None)
            if country_code:
                user['country_code'] = country_code

        required_keys = [
            'id',
            'username',
            'name',
            'sex',
            'country',
            'mail',
            'registered',
            'birthdate',
            'name_prefix',
            'country_code'
        ]
        for key in required_keys:
            user[key] = user.get(key)

        users.append(user)

    return users


In [9]:
read_xml('data/reviewers_full/reviewers_full_0.xml')

[{'id': '556011',
  'username': 'gabrielacalhoun',
  'sex': 'F',
  'birthdate': '1988-01-25',
  'name': None,
  'country': None,
  'mail': None,
  'registered': None,
  'name_prefix': None,
  'country_code': None},
 {'id': '1251087',
  'username': 'qbaxter',
  'mail': 'qware@gmail.com',
  'birthdate': '1985-01-19',
  'country': 'Norway',
  'country_code': 'NO',
  'name': None,
  'sex': None,
  'registered': None,
  'name_prefix': None},
 {'id': '537188',
  'username': 'crosschristopher',
  'name': 'Dana Moore',
  'mail': 'stephaniestrong@yahoo.com',
  'registered': '2018-11-21',
  'birthdate': '1955-07-03',
  'sex': None,
  'country': None,
  'name_prefix': None,
  'country_code': None},
 {'id': '250427',
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'country': 'Cuba',
  'country_code': 'CU',
  'sex': None,
  'name_prefix': None},
 {'id': '2945188',
  'username': 'gambledanielle',
  'name

2. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [10]:
%%time

for i in range(5):
    read_xml(f'data/reviewers_full/reviewers_full_{i}.xml')

Wall time: 58.9 s


In [11]:
from dask import delayed
import dask

read_xml_delayed = delayed(read_xml)

In [12]:
%%time

delayed_tasks = []
for i in range(5):
    delayed_tasks.append(read_xml_delayed(f'data/reviewers_full/reviewers_full_{i}.xml'))

dask.compute(delayed_tasks)

Wall time: 1min 40s


([[{'id': '556011',
    'username': 'gabrielacalhoun',
    'sex': 'F',
    'birthdate': '1988-01-25',
    'name': None,
    'country': None,
    'mail': None,
    'registered': None,
    'name_prefix': None,
    'country_code': None},
   {'id': '1251087',
    'username': 'qbaxter',
    'mail': 'qware@gmail.com',
    'birthdate': '1985-01-19',
    'country': 'Norway',
    'country_code': 'NO',
    'name': None,
    'sex': None,
    'registered': None,
    'name_prefix': None},
   {'id': '537188',
    'username': 'crosschristopher',
    'name': 'Dana Moore',
    'mail': 'stephaniestrong@yahoo.com',
    'registered': '2018-11-21',
    'birthdate': '1955-07-03',
    'sex': None,
    'country': None,
    'name_prefix': None,
    'country_code': None},
   {'id': '250427',
    'username': 'karen27',
    'name': 'Jennifer Horne',
    'mail': 'wjarvis@yahoo.com',
    'registered': '2013-11-20',
    'birthdate': '2007-04-30',
    'country': 'Cuba',
    'country_code': 'CU',
    'sex': None,
    

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [13]:
@delayed
def read_xml(path):
    with open(path) as f:
        xml = bs4.BeautifulSoup(f, 'lxml-xml')

    users = []
    for data in xml.find_all('user'):
        user = {}

        id_tag = data.find('id')
        if id_tag:
            user['id'] = id_tag.get_text()

        username_tag = data.find('username')
        if username_tag:
            user['username'] = username_tag.get_text()

        name_tag = data.find('name')
        if name_tag:
            user['name'] = name_tag.get_text()

        sex_tag = data.find('sex')
        if sex_tag:
            user['sex'] = sex_tag.get_text()

        mail_tag = data.find('mail')
        if mail_tag:
            user['mail'] = mail_tag.get_text()

        registered_tag = data.find('registered')
        if registered_tag:
            user['registered'] = registered_tag.get_text()

        birthdate_tag = data.find('birthdate')
        if birthdate_tag:
            user['birthdate'] = birthdate_tag.get_text()

        name_prefix = user.get('prefix', None)
        if name_prefix:
            user['name_prefix'] = name_prefix

        country_tag = data.find('country')
        if country_tag:
            user['country'] = country_tag.get_text()
            country_code = country_tag.get('code', None)
            if country_code:
                user['country_code'] = country_code

        required_keys = [
            'id',
            'username',
            'name',
            'sex',
            'country',
            'mail',
            'registered',
            'birthdate',
            'name_prefix',
            'country_code'
        ]
        for key in required_keys:
            user[key] = user.get(key)

        users.append(user)

    return users

In [14]:
reviewers = []
for i in range(5):
    reviewers.append(read_xml_delayed(f'data/reviewers_full/reviewers_full_{i}.xml'))
    
print(reviewers)

[Delayed('read_xml-92873af4-1cca-4474-8508-986363b62868'), Delayed('read_xml-c35855a3-72c2-4e08-ab8f-48ae46fcdf62'), Delayed('read_xml-63e9f295-2040-4f12-bb6f-8d782d298f3d'), Delayed('read_xml-1be84618-a339-4a43-bd59-043fbf3ccbab'), Delayed('read_xml-fe9e4c6e-1657-48bf-b60a-86452f7d9b0a')]


In [15]:
import dask.bag as db

bag = db.from_delayed(reviewers)

In [16]:
def add_birth_year(data):
    new_data = data.copy()

    birthdate = new_data.get('birthdate')
    if birthdate:
        new_data['birth_year'] = int(birthdate[:4])

    return new_data


def filter_by_birth_year(data):
    birth_year = data.get('birth_year')
    if birth_year:
        return birth_year > 1980
    return False


def id_to_int(data):
    new_data = data.copy()
    new_data['id'] = int(new_data['id'])
    return new_data


proc_bag = bag.map(add_birth_year).filter(filter_by_birth_year).map(id_to_int)
proc_bag.take(3)

({'id': 556011,
  'username': 'gabrielacalhoun',
  'sex': 'F',
  'birthdate': '1988-01-25',
  'name': None,
  'country': None,
  'mail': None,
  'registered': None,
  'name_prefix': None,
  'country_code': None,
  'birth_year': 1988},
 {'id': 1251087,
  'username': 'qbaxter',
  'mail': 'qware@gmail.com',
  'birthdate': '1985-01-19',
  'country': 'Norway',
  'country_code': 'NO',
  'name': None,
  'sex': None,
  'registered': None,
  'name_prefix': None,
  'birth_year': 1985},
 {'id': 250427,
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'country': 'Cuba',
  'country_code': 'CU',
  'sex': None,
  'name_prefix': None,
  'birth_year': 2007})

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [17]:
reviewers_df = proc_bag.to_dataframe()
reviewers_df = reviewers_df.set_index('id')
reviewers_df.head()

Unnamed: 0_level_0,username,sex,birthdate,name,country,mail,registered,name_prefix,country_code,birth_year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,lgeorge,M,1983-06-24,,,,,,,1983
1792,qbeard,F,1986-03-12,,Guinea,rachel20@hotmail.com,,,GN,1986
1938,adambrown,,1991-11-11,William Fisher,New Caledonia,,2019-05-03,,NC,1991
2046,vthompson,F,1981-11-27,Emily Sanford,United Arab Emirates,omelendez@yahoo.com,2001-10-30,,AE,1981
2095,djohnson,F,1984-09-23,Jennifer Hawkins,Jamaica,,,,JM,1984


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [18]:
import re
import json

pattern = re.compile(r'^.*_(\d+)\.json$')


def loads(x):
    data = json.loads(x[0])
    user_id = data['user_id']
    rating = int(pattern.match(x[1]).groups()[0])
    return {
        'user_id': user_id,
        'rating': rating
    }


reviews_bag = db.read_text('data/reviews_full/*.json', include_path=True).map(loads).filter(lambda x: x['rating'] < 3)

reviews_bag.take(3)

({'user_id': 452355, 'rating': 0},
 {'user_id': 329304, 'rating': 0},
 {'user_id': 227932, 'rating': 0})

In [19]:
count = reviews_bag.to_dataframe().groupby('user_id').count().rename(columns={'rating': 'count'})

In [20]:
import dask.dataframe as dd

merged_df = dd.merge(reviewers_df, count, how='left', left_index=True, right_index=True)

In [21]:
merged_df.compute()

Unnamed: 0_level_0,username,sex,birthdate,name,country,mail,registered,name_prefix,country_code,birth_year,count
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1676,lgeorge,M,1983-06-24,,,,,,,1983,29.0
1792,qbeard,F,1986-03-12,,Guinea,rachel20@hotmail.com,,,GN,1986,14.0
1938,adambrown,,1991-11-11,William Fisher,New Caledonia,,2019-05-03,,NC,1991,3.0
2046,vthompson,F,1981-11-27,Emily Sanford,United Arab Emirates,omelendez@yahoo.com,2001-10-30,,AE,1981,3.0
2095,djohnson,F,1984-09-23,Jennifer Hawkins,Jamaica,,,,JM,1984,
...,...,...,...,...,...,...,...,...,...,...,...
2002370648,wbradford,,2010-07-14,,,,,,,2010,1.0
2002371420,walkershirley,M,1994-02-16,Travis Harris,Vietnam,,2009-08-15,,VN,1994,1.0
2002371627,travisbrown,,2000-06-11,,,kdoyle@gmail.com,2005-03-10,,,2000,2.0
2002371716,greid,,2007-07-01,Jose Mata,Lithuania,,2002-01-06,,LT,2007,1.0


#### [версия 2]
* Уточнена формулировка задачи 1