# Dask Delayed

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Dask Delayed"
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.
* https://saturncloud.io/blog/a-data-scientist-s-guide-to-lazy-evaluation-with-dask/
* https://www.coiled.io/blog/how-to-learn-dask-in-2021

## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1\. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [42]:
import numpy as np
import dask.delayed as delayed
import dask.bag as db
import dask
from time import sleep
import random

In [2]:
def cpu_task():
    numbers = [random.randint(0, 1000) for _ in range(100_000)]
    return sum(numbers)

In [3]:
def io_task():
    sleep(0.1)
    return random.randint(0, 1000)

In [4]:
%%time
r = [cpu_task() for _ in range(100)]

CPU times: total: 7.31 s
Wall time: 7.31 s


In [5]:
%%time
r = [io_task() for _ in range(100)]

CPU times: total: 0 ns
Wall time: 10.9 s


In [6]:
cpu_task_delayed = dask.delayed(cpu_task)

In [7]:
%%time
r = [cpu_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="threading")

CPU times: total: 7.36 s
Wall time: 7.35 s


In [8]:
%%time
r = [cpu_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="multiprocessing")

CPU times: total: 78.1 ms
Wall time: 2.51 s


In [9]:
io_task_delayed = dask.delayed(io_task)

In [10]:
%%time
r = [io_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="threading")

CPU times: total: 15.6 ms
Wall time: 762 ms


In [11]:
%%time
r = [io_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="multiprocessing")

CPU times: total: 78.1 ms
Wall time: 1.89 s


## Лабораторная работа 14

1\. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [12]:
from bs4 import BeautifulSoup

In [16]:
def read_xml(file):
    with open(
        file,
        "r",
        encoding="utf-8"
    ) as fp:
        xml = BeautifulSoup(fp, 'xml')
        
    reviewers = xml.find_all('user')

    users = []

    for user in reviewers:
        new_user = {}
        new_user['id'] = user.find('id').text
        new_user['username'] = user.find('username').text
        if user.find('name'):
            new_user['name'] = user.find('name').text
        if user.find('sex'):
            new_user['sex'] = user.find('sex').text
        if user.find('country'):
            new_user['country'] = user.find('country').text
        if user.find('mail'):
            new_user['mail'] = user.find('mail').text
        if user.find('registered'):
            new_user['registered'] = user.find('registered').text
        if user.find('birthdate'):
            new_user['birthdate'] = user.find('birthdate').text
        if user.get('prefix'):
            new_user['name_prefix'] = user.get('prefix')
        if user.find('country'):
            if user.find('country').get('code'):
                new_user['country_code'] = user.find('country').get('code')  

        users.append(new_user)
        
    return users


In [17]:
read_xml("reviewers_full/reviewers_full_0.xml")

[{'id': '88005',
  'username': 'jacqueline00',
  'name': 'Michele Lewis',
  'mail': 'morenocharlotte@yahoo.com'},
 {'id': '68591',
  'username': 'daniellegomez',
  'sex': 'F',
  'country': 'Germany',
  'birthdate': '2005-03-06',
  'name_prefix': 'Dr.',
  'country_code': 'DE'},
 {'id': '81003',
  'username': 'alucero',
  'name': 'Tammy Patton',
  'mail': 'larsenrobert@gmail.com'},
 {'id': '61509',
  'username': 'mcleanjacqueline',
  'name': 'Jeremy Schmidt',
  'sex': 'M',
  'registered': '2017-09-21',
  'name_prefix': 'Dr.'},
 {'id': '74667', 'username': 'wesley96', 'registered': '2006-12-29'},
 {'id': '43614',
  'username': 'jeffreynelson',
  'sex': 'F',
  'country': 'Aruba',
  'mail': 'laura77@hotmail.com',
  'registered': '2019-11-19',
  'birthdate': '1994-10-10',
  'country_code': 'AW'},
 {'id': '68142',
  'username': 'sarahgarcia',
  'name': 'Suzanne Ramirez',
  'sex': 'F',
  'registered': '2001-12-14'},
 {'id': '46182',
  'username': 'brucekennedy',
  'sex': 'F',
  'country': 'Ame

2\. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [32]:
%%time
files = [f"reviewers_full/reviewers_full_{i}.xml" for i in np.arange(0, 20)]

for file in files:
    read_xml(file)

CPU times: total: 5min 38s
Wall time: 5min 38s


In [34]:
read_xml_delayed = dask.delayed(read_xml)

In [35]:
r1 = [read_xml_delayed(file) for file in files]

In [36]:
%time res = dask.compute(r1, scheduler="multiprocessing")

CPU times: total: 719 ms
Wall time: 1min 42s


3\. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [44]:
def add_birth_year(x):
    if x.get('birthdate'):
        x['birth_year'] = int(x['birthdate'][:4])
    return x

def make_id_int(x):
    x['id'] = int(x['id'])
    return x

read_xml_delayed = dask.delayed(read_xml)
reviewers = [read_xml_delayed(file) for file in files[:5]]
bag = db.from_delayed(reviewers)
bag = bag.map(add_birth_year)
bag = bag.filter(lambda x: x.get('birth_year')).filter(lambda x: x.get('birth_year') > 1980)
bag = bag.map(make_id_int)
bag.take(2)

({'id': 68591,
  'username': 'daniellegomez',
  'sex': 'F',
  'country': 'Germany',
  'birthdate': '2005-03-06',
  'name_prefix': 'Dr.',
  'country_code': 'DE',
  'birth_year': 2005},
 {'id': 43614,
  'username': 'jeffreynelson',
  'sex': 'F',
  'country': 'Aruba',
  'mail': 'laura77@hotmail.com',
  'registered': '2019-11-19',
  'birthdate': '1994-10-10',
  'country_code': 'AW',
  'birth_year': 1994})