# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [None]:
import time
import random
import dask
from dask import delayed


def cpu_task():
    numbers = [random.random() for _ in range(100_000)]
    return sum(numbers)


def io_task():
    time.sleep(0.1)
    return random.random()


start_time = time.time()
for _ in range(100):
    cpu_task()
cpu_time = time.time() - start_time
print(f"CPU-bound task time: {cpu_time:.4f} seconds")

start_time = time.time()
for _ in range(100):
    io_task()
io_time = time.time() - start_time
print(f"IO-bound task time: {io_time:.4f} seconds")


CPU-bound task time: 1.2302 seconds
IO-bound task time: 10.0188 seconds


In [None]:
@delayed
def cpu_task():
    numbers = [random.random() for _ in range(100_000)]
    return sum(numbers)

@delayed
def io_task():
    time.sleep(0.1)
    return random.random()


start_time = time.time()
tasks = []
for _ in range(100):
    tasks.append(cpu_task())
dask.delayed()(tasks).compute()
cpu_time = time.time() - start_time
print(f"CPU-bound task time: {cpu_time:.4f} seconds")

start_time = time.time()
for _ in range(100):
    io_task().compute()
io_time = time.time() - start_time
print(f"IO-bound task time: {io_time:.4f} seconds")

CPU-bound task time: 0.6940 seconds
IO-bound task time: 10.0897 seconds


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать.



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from lxml import etree

def parse_xml_file(file_path):
    tree = etree.parse(file_path)
    root = tree.getroot()

    reviewers = []
    for user in root.findall('user'):
        reviewer_dict = {}
        reviewer_dict['id'] = user.findtext('id')
        reviewer_dict['username'] = user.findtext('username')
        reviewer_dict['name'] = user.findtext('name')
        reviewer_dict['sex'] = user.findtext('sex')
        reviewer_dict['country'] = user.findtext('country')
        reviewer_dict['mail'] = user.findtext('mail')
        reviewer_dict['registered'] = user.findtext('registered')
        reviewer_dict['birthdate'] = user.findtext('birthdate')

        if user.get('prefix') is not None:
            reviewer_dict['name_prefix'] = user.get('prefix')
        else:
            reviewer_dict['name_prefix'] = None

        country_elem = user.find('country')
        if country_elem is not None and country_elem.get('code') is not None:
            reviewer_dict['country_code'] = country_elem.get('code')
        else:
            reviewer_dict['country_code'] = None

        reviewers.append(reviewer_dict)

    return reviewers

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [None]:
from dask import delayed

@delayed
def parse_xml_file_delayed(file_path):
    tree = etree.parse(file_path)
    root = tree.getroot()

    reviewers = []
    for user in root.findall('user'):
        reviewer_dict = {}
        reviewer_dict['id'] = user.findtext('id')
        reviewer_dict['username'] = user.findtext('username')
        reviewer_dict['name'] = user.findtext('name')
        reviewer_dict['sex'] = user.findtext('sex')
        reviewer_dict['country'] = user.findtext('country')
        reviewer_dict['mail'] = user.findtext('mail')
        reviewer_dict['registered'] = user.findtext('registered')
        reviewer_dict['birthdate'] = user.findtext('birthdate')

        if user.get('prefix') is not None:
            reviewer_dict['name_prefix'] = user.get('prefix')
        else:
            reviewer_dict['name_prefix'] = None

        country_elem = user.find('country')
        if country_elem is not None and country_elem.get('code') is not None:
            reviewer_dict['country_code'] = country_elem.get('code')
        else:
            reviewer_dict['country_code'] = None

        reviewers.append(reviewer_dict)

    return reviewers

In [None]:
%%time
for i in range (5):
  parse_xml_file(f"/content/drive/MyDrive/tobd/reviewers_full/reviewers_full_{i}.xml");


CPU times: user 5.96 s, sys: 27.6 ms, total: 5.99 s
Wall time: 6.08 s


In [None]:
%%time
for i in range (5):
  parse_xml_file_delayed(f"/content/drive/MyDrive/tobd/reviewers_full/reviewers_full_{i}.xml").compute();

CPU times: user 4.86 s, sys: 100 ms, total: 4.96 s
Wall time: 4.98 s


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [None]:
import dask.bag as db

delayed = [parse_xml_file_delayed(f"/content/drive/MyDrive/tobd/reviewers_full/reviewers_full_{i}.xml") for i in range (5) ]
b = db.from_delayed(delayed).map(lambda x: {'birth_year': int(x['birthdate'][0:4]) if x and x.get('birthdate') else None, **x})
b = b.filter(lambda x: x['birth_year'] and x['birth_year'] > 1980 and x['birth_year'] is not None)
b = b.map(lambda x: {**x, 'id': int(x['id'])})


In [None]:
b.compute()

[{'birth_year': 1988,
  'id': 556011,
  'username': 'gabrielacalhoun',
  'name': None,
  'sex': 'F',
  'country': None,
  'mail': None,
  'registered': None,
  'birthdate': '1988-01-25',
  'name_prefix': 'Mrs.',
  'country_code': None},
 {'birth_year': 1985,
  'id': 1251087,
  'username': 'qbaxter',
  'name': None,
  'sex': None,
  'country': 'Norway',
  'mail': 'qware@gmail.com',
  'registered': None,
  'birthdate': '1985-01-19',
  'name_prefix': None,
  'country_code': 'NO'},
 {'birth_year': 2007,
  'id': 250427,
  'username': 'karen27',
  'name': 'Jennifer Horne',
  'sex': None,
  'country': 'Cuba',
  'mail': 'wjarvis@yahoo.com',
  'registered': '2013-11-20',
  'birthdate': '2007-04-30',
  'name_prefix': None,
  'country_code': 'CU'},
 {'birth_year': 2005,
  'id': 452355,
  'username': 'smullen',
  'name': 'Cynthia Johnson',
  'sex': 'F',
  'country': None,
  'mail': None,
  'registered': None,
  'birthdate': '2005-03-29',
  'name_prefix': 'Miss',
  'country_code': None},
 {'birth_y

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [None]:
df4 = b.to_dataframe().set_index('id').compute()

In [None]:
df4

Unnamed: 0_level_0,birth_year,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,1983,lgeorge,,M,,,,1983-06-24,,
1792,1986,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,,GN
1938,1991,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,,NC
2046,1981,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,,AE
2095,1984,djohnson,Jennifer Hawkins,F,Jamaica,,,1984-09-23,Mrs.,JM
...,...,...,...,...,...,...,...,...,...,...
2002370648,2010,wbradford,,,,,,2010-07-14,,
2002371420,1994,walkershirley,Travis Harris,M,Vietnam,,2009-08-15,1994-02-16,Mr.,VN
2002371627,2000,travisbrown,,,,kdoyle@gmail.com,2005-03-10,2000-06-11,,
2002371716,2007,greid,Jose Mata,,Lithuania,,2002-01-06,2007-07-01,,LT


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [None]:
import dask.dataframe as dd
def modifiedLoads(x):
  dct = json.loads(x[0])
  dct["rating"] = int(x[1][-6])
  return dct

bag = db.read_text("/content/drive/MyDrive/tobd/reviews_full/reviews_*.json", include_path=True).map(modifiedLoads)
df = bag.to_dataframe()
df = df[df["rating"] < 3]
count = df.groupby('user_id')['rating'].count().compute()


In [None]:
dd.concat([df4, count.to_frame()], axis=1).dropna().compute()

Unnamed: 0,birth_year,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,rating
2178,1985.0,michaeljones,Anthony Santiago,M,Haiti,jessicaharris@gmail.com,2013-12-12,1985-08-12,Mr.,HT,20.0
2611,1982.0,graymargaret,Leah Brooks,F,Ireland,gomezangela@hotmail.com,2011-08-04,1982-03-01,Ms.,IE,3.0
9748,2010.0,sreeves,Janet Walters,F,Andorra,lowemargaret@yahoo.com,2018-07-20,2010-01-30,Mrs.,AD,88.0
10054,1983.0,whiterichard,Patrick Hinton,M,Palau,brian76@gmail.com,2002-01-13,1983-03-20,Mr.,PW,3.0
12678,2018.0,evansjonathon,Douglas Thomas,M,Namibia,donaldpierce@yahoo.com,2013-03-04,2018-01-14,Mr.,,2.0
...,...,...,...,...,...,...,...,...,...,...,...
2002254771,2008.0,michellearmstrong,Connie Carter,F,Marshall Islands,rebekahkerr@yahoo.com,2003-05-19,2008-06-10,Dr.,MH,1.0
2002268118,2014.0,ledwards,Kristin Stanley,F,Ireland,patrick22@hotmail.com,2005-10-07,2014-02-12,Mrs.,IE,6.0
2002337783,2013.0,kjones,Tanya Porter,F,Denmark,kgreene@gmail.com,2007-03-17,2013-03-06,Dr.,DK,1.0
2002348851,2004.0,thompsonjoshua,Melissa Jackson,F,Bahrain,michaelmccoy@hotmail.com,2018-08-20,2004-09-07,Mrs.,BH,1.0
