# Dask Delayed

## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

In [3]:
def cpu_task():
    return sum([random.randint(0, 10) for _ in range(100_000)])
 
def io_task():
    time.sleep(0.1)
    return random.randint(0, 10)

In [4]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 14.1 s, sys: 57.3 ms, total: 14.1 s
Wall time: 14.2 s


In [5]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 62.8 ms, sys: 7.18 ms, total: 70 ms
Wall time: 10 s


In [6]:
cpu_task_delayed = dask.delayed(cpu_task)


In [7]:
 
%%time
res = [cpu_task_delayed() for _ in range(100)]

CPU times: user 4.09 ms, sys: 979 µs, total: 5.07 ms
Wall time: 5.26 ms


In [8]:
# так делать не надо
for r in res:
    r.compute()
 

In [9]:
%%time
# так надо
res1 = dask.compute(res, scheduler='multiprocessing')

CPU times: user 244 ms, sys: 39.6 ms, total: 284 ms
Wall time: 14.7 s


In [10]:
%%time
res1 = dask.compute(res, scheduler='threading')

CPU times: user 14.1 s, sys: 125 ms, total: 14.2 s
Wall time: 14.3 s


In [11]:
io_task_delayed = dask.delayed(io_task)


In [12]:
%%time
res = [io_task_delayed() for _ in range(100)]
res1 = dask.compute(res)

CPU times: user 71.8 ms, sys: 12.2 ms, total: 84 ms
Wall time: 5.04 s


In [13]:
%%time
res = [io_task_delayed() for _ in range(100)]
res1 = dask.compute(res, scheduler='multiprocessing')

CPU times: user 153 ms, sys: 36 ms, total: 189 ms
Wall time: 5.15 s


In [16]:
pip install xmltodict

Collecting xmltodict
  Downloading xmltodict-0.12.0-py2.py3-none-any.whl (9.2 kB)
Installing collected packages: xmltodict
Successfully installed xmltodict-0.12.0


In [15]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [25]:
import random
import time
import dask.delayed
import dask
import xmltodict 
import pprint 
import bs4
import json

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [None]:
with open ("/content/drive/MyDrive/Colab Notebooks/lab 14/reviewers_full/reviewers_full_0.xml", "r") as f:
  kal = f.readlines()
kal

In [33]:
from bs4 import BeautifulSoup as bs
content = []
# Read the XML file
with open("/content/drive/MyDrive/Colab Notebooks/lab 14/reviewers_full/reviewers_full_0.xml", "r") as file:
    # Read each line in the file, readlines() returns a list of lines
    content = file.readlines()
    # Combine the lines in the list into a string
    content = "".join(content)
    bs_content = bs(content, "lxml")
    soup= bs4.BeautifulSoup(file, "xml") 

bs_content

Output hidden; open in https://colab.research.google.com to view.

In [39]:
print (type(bs_content))

<class 'bs4.BeautifulSoup'>


In [34]:
symbol_id = soup.find_all("symbol")

# create dictionary
d = {}
for item in symbol_id:
    d[item['id']] = d.get(item['id'], 0) + 1

d

{}

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.