# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [None]:
import numpy as np
import dask.delayed as delayed
import dask
from time import sleep
import random
import time

In [None]:
# CPU-bound задача
def cpu_task():
    numbers = [random.randint(1, 100) for _ in range(100000)]
    return sum(numbers)

# IO-bound задача
def io_task():
    time.sleep(0.1)
    return random.randint(1, 100)

In [None]:
%%time
for i in range(100):
    result = cpu_task()

Wall time: 25.2 s


In [None]:
%%time
for i in range(100):
    result = io_task()

Wall time: 10.7 s


In [None]:
%%time
dask.config.set(scheduler='threads')  # выбор планировщика
lazy_results = [delayed(cpu_task)() for i in range(100)]
results = dask.compute(*lazy_results)

Wall time: 28.5 s


In [None]:
%%time
dask.config.set(scheduler='threads')  # выбор планировщика
lazy_results = [delayed(io_task)() for i in range(100)]
results = dask.compute(*lazy_results)

Wall time: 1.49 s


In [None]:
%%time
dask.config.set(scheduler='multiprocessing')  # выбор планировщика
lazy_results = [delayed(cpu_task)() for i in range(100)]
results = dask.compute(*lazy_results)

Wall time: 7.55 s


In [None]:
%%time
dask.config.set(scheduler='multiprocessing')  # выбор планировщика
lazy_results = [delayed(io_task)() for i in range(100)]
results = dask.compute(*lazy_results)

Wall time: 2.81 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать.



In [None]:
import numpy as np
import dask.delayed as delayed
import dask
from time import sleep
import random
import time

In [None]:
import zipfile
from bs4 import BeautifulSoup

def parse_reviewers_zip(zip_path):
    with zipfile.ZipFile(zip_path) as z:
        xml_files = [f for f in z.namelist() if 'reviewers_full/reviewers_full_' in f]
        for xml_file in xml_files:
            with z.open(xml_file) as f:
                soup = BeautifulSoup(f, 'xml')
                reviewers = []
                for reviewer in soup.find_all("user"):
                    reviewer_dict = {}
                    reviewer_dict['id'] = reviewer.find('id').text
                    reviewer_dict['username'] = reviewer.find('username').text if reviewer.find('username') else None
                    reviewer_dict['name'] = reviewer.find('name').text if reviewer.find('name') else None
                    reviewer_dict['sex'] = reviewer.find('sex').text if reviewer.find('sex') else None
                    reviewer_dict['country'] = reviewer.find('country').text if reviewer.find('country') else None
                    reviewer_dict['mail'] = reviewer.find('mail').text if reviewer.find('mail') else None
                    reviewer_dict['registered'] = reviewer.find('registered').text if reviewer.find('registered') else None
                    reviewer_dict['birthdate'] = reviewer.find('birthdate').text if reviewer.find('birthdate') else None
                    reviewer_dict['name_prefix'] = reviewer.get('prefix')
                    reviewer_dict['country_code'] = reviewer.find('country').get('code') if reviewer.find('country') else None
                    reviewers.append(reviewer_dict)
    return reviewers
parse_reviewers_zip("reviewers_full.zip")

[{'id': '2214951',
  'username': 'nancyharrison',
  'name': None,
  'sex': 'M',
  'country': 'Mali',
  'mail': None,
  'registered': '2017-05-24',
  'birthdate': None,
  'name_prefix': None,
  'country_code': 'ML'},
 {'id': '133503',
  'username': 'patrickbird',
  'name': None,
  'sex': None,
  'country': 'Venezuela',
  'mail': 'matthew29@yahoo.com',
  'registered': '2005-03-22',
  'birthdate': '1958-10-07',
  'name_prefix': None,
  'country_code': 'VE'},
 {'id': '465229',
  'username': 'xcamacho',
  'name': 'Lisa Anderson',
  'sex': 'F',
  'country': 'Suriname',
  'mail': 'david13@yahoo.com',
  'registered': None,
  'birthdate': None,
  'name_prefix': None,
  'country_code': 'SR'},
 {'id': '2000340227',
  'username': 'andersonshane',
  'name': None,
  'sex': None,
  'country': None,
  'mail': 'shermanterri@hotmail.com',
  'registered': '2014-05-15',
  'birthdate': '1982-12-26',
  'name_prefix': None,
  'country_code': None},
 {'id': '2000070758',
  'username': 'williamdelgado',
  'nam

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [None]:
%%time
parse_reviewers_zip("reviewers_full.zip")

Wall time: 5min 3s


[{'id': '2214951',
  'username': 'nancyharrison',
  'name': None,
  'sex': 'M',
  'country': 'Mali',
  'mail': None,
  'registered': '2017-05-24',
  'birthdate': None,
  'name_prefix': None,
  'country_code': 'ML'},
 {'id': '133503',
  'username': 'patrickbird',
  'name': None,
  'sex': None,
  'country': 'Venezuela',
  'mail': 'matthew29@yahoo.com',
  'registered': '2005-03-22',
  'birthdate': '1958-10-07',
  'name_prefix': None,
  'country_code': 'VE'},
 {'id': '465229',
  'username': 'xcamacho',
  'name': 'Lisa Anderson',
  'sex': 'F',
  'country': 'Suriname',
  'mail': 'david13@yahoo.com',
  'registered': None,
  'birthdate': None,
  'name_prefix': None,
  'country_code': 'SR'},
 {'id': '2000340227',
  'username': 'andersonshane',
  'name': None,
  'sex': None,
  'country': None,
  'mail': 'shermanterri@hotmail.com',
  'registered': '2014-05-15',
  'birthdate': '1982-12-26',
  'name_prefix': None,
  'country_code': None},
 {'id': '2000070758',
  'username': 'williamdelgado',
  'nam

In [None]:
from dask import delayed
@delayed
def parse_reviewers_file(xml_file):
        with open(xml_file) as f:
            soup = BeautifulSoup(f, 'xml')
            reviewers = []
            for reviewer in soup.find_all("user"):
                reviewer_dict = {}
                    reviewer_dict['id'] = reviewer.find('id').text
                    reviewer_dict['username'] = reviewer.find('username').text if reviewer.find('username') else None
                    reviewer_dict['name'] = reviewer.find('name').text if reviewer.find('name') else None
                    reviewer_dict['sex'] = reviewer.find('sex').text if reviewer.find('sex') else None
                    reviewer_dict['country'] = reviewer.find('country').text if reviewer.find('country') else None
                    reviewer_dict['mail'] = reviewer.find('mail').text if reviewer.find('mail') else None
                    reviewer_dict['registered'] = reviewer.find('registered').text if reviewer.find('registered') else None
                    reviewer_dict['birthdate'] = reviewer.find('birthdate').text if reviewer.find('birthdate') else None
                    reviewer_dict['name_prefix'] = reviewer.get('prefix')
                    reviewer_dict['country_code'] = reviewer.find('country').get('code') if reviewer.find('country') else None
                    reviewers.append(reviewer_dict)
        return reviewers

def parse_reviewers_zip(zip_path):
    with zipfile.ZipFile(zip_path) as z:
        xml_files = [f for f in z.namelist() if 'reviewers_full/reviewers_full_' in f]
        tasks = [parse_reviewers_file(f) for f in xml_files]
        reviewers = dask.compute(*tasks, scheduler='multiprocessing')
        return reviewers

In [None]:
%%time
parse_reviewers_zip("reviewers_full.zip")

Wall time: 4min 21s


([{'username': 'gabrielacalhoun',
   'name_prefix': 'Mrs.',
   'sex': 'F',
   'birthdate': '1988-01-25'},
  {'username': 'qbaxter',
   'country': 'Norway',
   'country_code': 'NO',
   'mail': 'qware@gmail.com',
   'birthdate': '1985-01-19'},
  {'username': 'crosschristopher',
   'name': 'Dana Moore',
   'mail': 'stephaniestrong@yahoo.com',
   'registered': '2018-11-21',
   'birthdate': '1955-07-03'},
  {'username': 'karen27',
   'name': 'Jennifer Horne',
   'country': 'Cuba',
   'country_code': 'CU',
   'mail': 'wjarvis@yahoo.com',
   'registered': '2013-11-20',
   'birthdate': '2007-04-30'},
  {'username': 'gambledanielle',
   'name': 'Henry Harris',
   'country': 'Serbia',
   'country_code': 'RS',
   'registered': '2011-04-08'},
  {'username': 'smullen',
   'name_prefix': 'Miss',
   'name': 'Cynthia Johnson',
   'sex': 'F',
   'birthdate': '2005-03-29'},
  {'username': 'barrettmichael',
   'name': 'Margaret Banks',
   'mail': 'christinaday@hotmail.com',
   'birthdate': '1971-11-15'},

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [None]:
import dask.bag as db

@delayed
def parse_reviewers_file(xml_file):
    with open(xml_file) as f:
        soup = BeautifulSoup(f, 'xml')
        reviewers = []
        for reviewer in soup.find_all("user"):
            reviewer_dict = {}
            reviewer_dict['id'] = int(reviewer.find('id').text)
            reviewer_dict['username'] = reviewer.find('username').text if reviewer.find('username') else None
            reviewer_dict['name'] = reviewer.find('name').text if reviewer.find('name') else None
            reviewer_dict['sex'] = reviewer.find('sex').text if reviewer.find('sex') else None
            reviewer_dict['country'] = reviewer.find('country').text if reviewer.find('country') else None
            reviewer_dict['mail'] = reviewer.find('mail').text if reviewer.find('mail') else None
            reviewer_dict['registered'] = reviewer.find('registered').text if reviewer.find('registered') else None
            reviewer_dict['birthdate'] = reviewer.find('birthdate').text if reviewer.find('birthdate') else None
            reviewer_dict['birth_year'] = int(reviewer.find('birthdate').text.split('-')[0]) if reviewer.find('birthdate') else None
            reviewer_dict['name_prefix'] = reviewer.get('prefix')
            reviewer_dict['country_code'] = reviewer.find('country').get('code') if reviewer.find('country') else None
            reviewers.append(reviewer_dict)
        return reviewers


def parse_reviewers_zip(zip_path):
    with zipfile.ZipFile(zip_path) as z:
        xml_files = [f for f in z.namelist() if 'reviewers_full/reviewers_full_' in f]
        delayed_results = [parse_reviewers_file(f) for f in xml_files]
        b = db.from_delayed(delayed_results)
        b = b.filter(lambda x: x['birth_year'] != None and x['birth_year'] >= 1980)
        return b

In [None]:
%%time
revs = parse_reviewers_zip('reviewers_full.zip')

Wall time: 772 ms


4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [None]:
df = revs.to_dataframe()
df = df.set_index('id')
df

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,birth_year,name_prefix,country_code
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,object,object,object,object,object,object,object,int64,object,object
367646,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
2000355109,...,...,...,...,...,...,...,...,...,...
2002372706,...,...,...,...,...,...,...,...,...,...


In [None]:
df.compute()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,birth_year,name_prefix,country_code
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,lgeorge,,M,,,,1983-06-24,1983,,
1792,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,1986,,GN
1938,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,1991,,NC
2046,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,1981,,AE
2095,djohnson,Jennifer Hawkins,F,Jamaica,,,1984-09-23,1984,Mrs.,JM
...,...,...,...,...,...,...,...,...,...,...
2002370648,wbradford,,,,,,2010-07-14,2010,,
2002371420,walkershirley,Travis Harris,M,Vietnam,,2009-08-15,1994-02-16,1994,Mr.,VN
2002371627,travisbrown,,,,kdoyle@gmail.com,2005-03-10,2000-06-11,2000,,
2002371716,greid,Jose Mata,,Lithuania,,2002-01-06,2007-07-01,2007,,LT


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [None]:
import json
import glob
import re

def parse_review_json_with_rating(json_string, include_path):
    json_obj = json.loads(json_string)
    match = re.search(r'reviews_(\d+)\.json', include_path)
    if match:
        rating = int(match.group(1))
        json_obj['rating'] = rating
    return json_obj

In [None]:
# Читаем все файлы в список
files = glob.glob('reviews_full/reviews_*.json')
print(files)
# Создаем первый dask.bag
b_ratings = db.read_text(files[0]).map(parse_review_json_with_rating, include_path=files[0]).filter(lambda x: x['rating'] < 3)

#Объединяем остальные dask.bag
for file in files[1:]:
    cur = db.read_text(file).map(parse_review_json_with_rating, include_path=file).filter(lambda x: x['rating'] < 3)
    b_ratings = db.concat([b_ratings, cur])

['reviews_full\\reviews_0.json', 'reviews_full\\reviews_1.json', 'reviews_full\\reviews_2.json', 'reviews_full\\reviews_3.json', 'reviews_full\\reviews_4.json', 'reviews_full\\reviews_5.json']


In [None]:
dask_dataframe_revs = b_ratings.to_dataframe()

In [None]:
dask_dataframe_revs.compute()

Unnamed: 0,user_id,recipe_id,date,review,rating
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...,0
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...,0
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6...",0
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...,0
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...,0
...,...,...,...,...,...
112556,383346,45919,2016-08-21,This made a wonderful and delicious breakfast!...,2
112557,353659,253948,2015-04-24,"I made this over the weekend, I filled it with...",2
112558,50969,304989,2017-12-20,A solid introduction to a grand Mediterranean ...,2
112559,666723,1792397,2021-06-08,I absolutely LOVE your recipe and I have made ...,2


In [None]:
revs_by_user = dask_dataframe_revs.groupby('user_id')['review'].count()
revs_by_user.compute()

user_id
1533           64
1535          441
1634           36
1676           29
1755            4
             ... 
2002366476      1
2002368940      1
2002369279      1
2002369650      1
2002372706      1
Name: review, Length: 145069, dtype: int64

In [None]:
dask_dataframe_revs_join = df.join(revs_by_user.compute())

In [None]:
dask_dataframe_revs_join.compute()

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,birth_year,name_prefix,country_code,review
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1676,lgeorge,,M,,,,1983-06-24,1983,,,29.0
1792,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,1986,,GN,14.0
1938,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,1991,,NC,3.0
2046,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,1981,,AE,3.0
2095,djohnson,Jennifer Hawkins,F,Jamaica,,,1984-09-23,1984,Mrs.,JM,
...,...,...,...,...,...,...,...,...,...,...,...
2002370648,wbradford,,,,,,2010-07-14,2010,,,1.0
2002371420,walkershirley,Travis Harris,M,Vietnam,,2009-08-15,1994-02-16,1994,Mr.,VN,1.0
2002371627,travisbrown,,,,kdoyle@gmail.com,2005-03-10,2000-06-11,2000,,,2.0
2002371716,greid,Jose Mata,,Lithuania,,2002-01-06,2007-07-01,2007,,LT,1.0
