# Dask Delayed

## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1\. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import numpy as np
import dask.delayed as delayed
import dask.bag as db
from time import sleep
import random

In [2]:
def cpu_task():
    numbers = [random.randint(0, 1000) for _ in range(100_000)]
    return sum(numbers)

In [3]:
def io_task():
    sleep(0.1)
    return random.randint(0, 1000)

In [4]:
%%time
r = [cpu_task() for _ in range(100)]

Wall time: 7.91 s


In [5]:
%%time
r = [io_task() for _ in range(100)]

Wall time: 11.2 s


In [6]:
cpu_task_delayed = dask.delayed(cpu_task)

In [7]:
%%time
r = [cpu_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="threading")

Wall time: 7.91 s


In [8]:
%%time
r = [cpu_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="multiprocessing")

Wall time: 1.81 s


In [9]:
io_task_delayed = dask.delayed(io_task)

In [10]:
%%time
r = [io_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="threading")

Wall time: 790 ms


In [11]:
%%time
r = [io_task_delayed() for _ in range(100)]
r = dask.compute(r, scheduler="multiprocessing")

Wall time: 1.65 s


## Лабораторная работа 14

In [1]:
import dask
import numpy as np
import dask.bag as db
from time import sleep
from bs4 import BeautifulSoup
import dask.delayed as delayed

1\. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [2]:
def read_xml_2(filename):
    with open(
        filename,
        "r",
        encoding="utf-8"
    ) as fp:
        xml = BeautifulSoup(fp)
    
    
    
    lst = []
    lst_of_user_xml = xml.find_all('user')

    for user_ in range(len(lst_of_user_xml)):
        lst_of_names_xml = list(filter(lambda i: i is not None, [i.name for i in lst_of_user_xml[user_]]))

        try:
            if lst_of_user_xml[user_]['prefix'] is not None:
                lst_of_names_xml.append('name_prefix')
        except:
            pass

        try:
            if lst_of_user_xml[user_].find('country')['code'] is not None:
                lst_of_names_xml.append('country_code')
        except:
            pass



        lst_of_texts = lst_of_user_xml[user_].get_text().split('\n')[1:-1]

        try:
            if lst_of_user_xml[user_]['prefix'] is not None:
                lst_of_texts.append(lst_of_user_xml[user_]['prefix'])
        except:
            pass

        try:
            if lst_of_user_xml[user_].find('country')['code'] is not None:
                lst_of_texts.append(lst_of_user_xml[user_].find('country')['code'])
        except:
            pass



        dct = dict(zip(lst_of_names_xml, lst_of_texts))

        lst.append(dct)

    return lst

In [3]:
%%time

read_xml_2("reviewers_full_0.xml")

Wall time: 10.7 s


[{'id': '88005',
  'mail': 'morenocharlotte@yahoo.com',
  'name': 'Michele Lewis',
  'username': 'jacqueline00'},
 {'birthdate': '2005-03-06',
  'country': 'Germany',
  'id': '68591',
  'sex': 'F',
  'username': 'daniellegomez',
  'name_prefix': 'Dr.',
  'country_code': 'DE'},
 {'id': '81003',
  'mail': 'larsenrobert@gmail.com',
  'name': 'Tammy Patton',
  'username': 'alucero'},
 {'id': '61509',
  'name': 'Jeremy Schmidt',
  'registered': '2017-09-21',
  'sex': 'M',
  'username': 'mcleanjacqueline',
  'name_prefix': 'Dr.'},
 {'id': '74667', 'registered': '2006-12-29', 'username': 'wesley96'},
 {'birthdate': '1994-10-10',
  'country': 'Aruba',
  'id': '43614',
  'mail': 'laura77@hotmail.com',
  'registered': '2019-11-19',
  'sex': 'F',
  'username': 'jeffreynelson',
  'country_code': 'AW'},
 {'id': '68142',
  'name': 'Suzanne Ramirez',
  'registered': '2001-12-14',
  'sex': 'F',
  'username': 'sarahgarcia'},
 {'country': 'American Samoa',
  'id': '46182',
  'mail': 'hardyanthony@gmail.

2\. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

In [3]:
read_xml_delayed = dask.delayed(read_xml_2)

In [229]:
%%time 

amount = 20
for n_file in range(amount):
    read_xml(f"reviewers_full_{n_file}.xml")

Wall time: 4min 6s


In [228]:
%%time

amount = 20
task = [read_xml_delayed(f"reviewers_full_{n_file}.xml") for n_file in range(amount)]
result = dask.compute(task, scheduler="multiprocessing")

Wall time: 1min 13s


3\. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года.

In [4]:
task_3 = [read_xml_delayed(f"reviewers_full_{n_file}.xml") for n_file in range(5)]
bag = db.from_delayed(task_3)

In [10]:
def upd_bag(bag):
    try:
        bag["birth_year"] = int(bag['birthdate'][:4])
        return bag
    except:
        return bag

In [11]:
updated_bag = bag.map(upd_bag)
updated_bag.compute()

[{'id': '88005',
  'mail': 'morenocharlotte@yahoo.com',
  'name': 'Michele Lewis',
  'username': 'jacqueline00'},
 {'birthdate': '2005-03-06',
  'country': 'Germany',
  'id': 68591,
  'sex': 'F',
  'username': 'daniellegomez',
  'name_prefix': 'Dr.',
  'country_code': 'DE',
  'birth_year': 2005},
 {'id': '81003',
  'mail': 'larsenrobert@gmail.com',
  'name': 'Tammy Patton',
  'username': 'alucero'},
 {'id': '61509',
  'name': 'Jeremy Schmidt',
  'registered': '2017-09-21',
  'sex': 'M',
  'username': 'mcleanjacqueline',
  'name_prefix': 'Dr.'},
 {'id': '74667', 'registered': '2006-12-29', 'username': 'wesley96'},
 {'birthdate': '1994-10-10',
  'country': 'Aruba',
  'id': 43614,
  'mail': 'laura77@hotmail.com',
  'registered': '2019-11-19',
  'sex': 'F',
  'username': 'jeffreynelson',
  'country_code': 'AW',
  'birth_year': 1994},
 {'id': '68142',
  'name': 'Suzanne Ramirez',
  'registered': '2001-12-14',
  'sex': 'F',
  'username': 'sarahgarcia'},
 {'country': 'American Samoa',
  'id':

In [12]:
def filter_more_1980(us):
    return us.get('birth_year') is not None and us['birth_year'] > 1980

In [13]:
result = updated_bag.filter(filter_more_1980)

In [14]:
result.take(5)

({'birthdate': '2005-03-06',
  'country': 'Germany',
  'id': 68591,
  'sex': 'F',
  'username': 'daniellegomez',
  'name_prefix': 'Dr.',
  'country_code': 'DE',
  'birth_year': 2005},
 {'birthdate': '1994-10-10',
  'country': 'Aruba',
  'id': 43614,
  'mail': 'laura77@hotmail.com',
  'registered': '2019-11-19',
  'sex': 'F',
  'username': 'jeffreynelson',
  'country_code': 'AW',
  'birth_year': 1994},
 {'birthdate': '1996-09-07',
  'id': 25362,
  'mail': 'steven53@gmail.com',
  'registered': '2005-11-28',
  'username': 'davidmaxwell',
  'birth_year': 1996},
 {'birthdate': '1997-03-29',
  'country': 'Tajikistan',
  'id': 1509,
  'registered': '2016-09-07',
  'sex': 'F',
  'username': 'thomas07',
  'name_prefix': 'Dr.',
  'country_code': 'TJ',
  'birth_year': 1997},
 {'birthdate': '1990-04-22',
  'id': 94076,
  'mail': 'adamrobinson@hotmail.com',
  'registered': '2017-02-25',
  'username': 'hannahturner',
  'birth_year': 1990})