# Тема блока: Dask. Распределённая обработка данных

>__Dask__ - это инновационная библиотека решает критические проблемы современного анализа данных, позволяя эффективно работать с массивами данных, которые превышают объем оперативной памяти, и выполнять задачи распределенно на многопроцессорных машинах или кластерах. Для установки полной версии выполните: !pip install "dask[complete]"
>
>>Dask строит граф вычислений (DAG), который выполняется только при явном вызове метода compute(). Это обеспечивает возможность оптимизации и планирования вычислений;
>
>>Dask разбивает большие массивы данных на управляемые фрагменты (chunks), что позволяет эффективно использовать доступную память и распределять нагрузку;
>
>>Dask эффективен только при обработке больших данных, которые не помещаются в памяти, иногда его использование может быть неэффективно.

## Содержание

1. [Что такое NumPy в Python](#Что-такое-NumPy-в-Python-1)
    - [Для чего используют NumPy 1.1](#Для-чего-используют-NumPy-1.1)
    - [Сравнение Pandas и NumPy 1.2](#Сравнение-Pandas-и-NumPy-1.2)
2. [Установка NumPy](#Установка-NumPy-2)    
3. [Основы синтаксиса](#Основы-синтаксиса-3)
    - [Создание массивов 3.1](#Создание-массивов-3.1)
    - [Операции с использованием NumPy 3.2](#Операции-с-использованием-NumPy-3.2)
    - [Типы данных NumPy 3.3](#Типы-данных-NumPy-3.3)
    - [Объекты типов данных 3.4](#Объекты-типов-данных-3.4)
        - [numpy.dtype 3.4.1](#numpy.dtype-3.4.1)
        - [numpy.shape 3.4.2](#numpy.shape-3.4.2)
        - [numpy.ndim 3.4.3](#numpy.ndim-3.4.3)
        - [numpy.itemsize 3.4.4](#numpy.itemsize-3.4.4)  

## Инициализация библиотек

In [5]:
# для декораторов и подсчёта времени
import time 
from functools import wraps

# всё что касается dask
import dask
import dask.dataframe as dd
import dask.array as da
from dask import delayed
from dask.distributed import Client

# сторонние фреймворке которые пригодятся в работе
import clickhouse_connect
import pandas as pd
from sqlalchemy import create_engine
import gc
import warnings
import numpy

warnings.filterwarnings('ignore')

## Тут выведу все версии фреймворков, которые использую в рамках работы

## Объявление декораторов для удобства

In [11]:
# декоратор для посчёта времени выполнения функции
def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        print(f'{func.__name__} выполнилась за {end - start:.6f} секунд')
        return result
    return wrapper

## C РАЗВЁРТКОЙ КЛАСТЕРА

|Функция/Метод|Описание|Пример использования
|:-----|:-----|:-----|
|__Client()__|Создание клиента|Client('localhost:8786')
|__client.submit()__|Отправка задачи|client.submit(func, args)
|__client.map()__|Параллельная обработка|client.map(func, iterable)
|__client.gather()__|Сбор результатов|client.gather(futures)
|__client.scatter()__|Распределение данных|client.scatter(data)
|__future.result()__|Получение результата|future.result()
|__client.compute()__|Распределенные вычисления|client.compute(dask_object)
|__client.persist()__|Сохранение в памяти кластера|client.persist(dask_object)

In [1]:
from dask.distributed import Client
import dask.dataframe as dd

# Это создаёт планировщик в локальном процессе, а также количество воркеров и потоков на воркера, связанные с количеством ядер на машине.
client = Client()

# ИЛИ создаём 4 воркера
# client = Client(n_workers=4)

# ИЛИ создаём 4 потока на воркера
# client = Client(threads_per_worker=4)

# ИЛИ подключение к кластеру c конкретным localhost
# client = Client('localhost:8787')

# ИЛИ настройка ограничений памяти для воркеров — с помощью параметра memory_limit
# client = Client(memory_limit='2GB')

# ПРИМЕР
# df = dd.read_csv(r'C:\Users\kopyl\Тестовое задание СПУТНИК\store_checkout_queues.csv', sep=',')
# df.compute()

# client.close()

In [3]:
client.profile()

{'description': {'filename': '', 'name': '', 'line_number': 0, 'line': ''},
 'children': {},
 'count': 0,
 'identifier': 'root'}

In [4]:
client.scatter(df)

In [5]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 15.80 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55699,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 15.80 GiB

0,1
Comm: tcp://127.0.0.1:55751,Total threads: 2
Dashboard: http://127.0.0.1:55755/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55702,
Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-5kpg0b5q,Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-5kpg0b5q

0,1
Comm: tcp://127.0.0.1:55753,Total threads: 2
Dashboard: http://127.0.0.1:55960/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55704,
Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-3zb14v_3,Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-3zb14v_3

0,1
Comm: tcp://127.0.0.1:55749,Total threads: 2
Dashboard: http://127.0.0.1:55754/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55706,
Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-l0bybeay,Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-l0bybeay

0,1
Comm: tcp://127.0.0.1:55750,Total threads: 2
Dashboard: http://127.0.0.1:55756/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55708,
Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-z_sfku1c,Local directory: C:\Users\kopyl\AppData\Local\Temp\dask-scratch-space\worker-z_sfku1c


![image.png](attachment:403a1913-6469-4720-a3e4-e5a73cea4539.png)

In [17]:
# ВЫЧИСЛЕНИЯ НЕ ВЫПОЛНЯЮТСЯ! Тут происходит формирование графа вычислений 
# ВЫЧИСЛЕНИЯ НЕ ВЫПОЛНЯЮТСЯ! Тут происходит создание массива
df = dd.read_csv(r'C:\Users\\kopyl\Тестовое задание СПУТНИК\store_checkout_queues.csv', 
                  parse_dates=['start_operation_dt', 'end_operation_dt'],
                  sep=',',
                  encoding = 'utf-8')

# df = df.persist()  # если необходимо сохранить результат в память

# ЗАПУСК ВЫЧИСЛЕНИЙ

# используйте compute, когда результат небольшой и его нужно обработать только на одном компьютере
client.compute(df)

# используйте persist, когда результат большой и его необходимо распределить между множеством компьютеров
client.persist(df)

# Без развёртки кластера. Dask DataFrame

>Dask превосходно работает на одной машине, автоматически используя все доступные ядра процессора для параллельных вычислений

## Чтение данных из файлов 

In [11]:
import dask.dataframe as dd
import dask

# ВЫЧИСЛЕНИЯ НЕ ВЫПОЛНЯЮТСЯ! Тут происходит создание массива
df = dd.read_csv(r'C:\Users\\kopyl\Тестовое задание СПУТНИК\store_checkout_queues.csv',
                  # blocksize=25e6                 
                  parse_dates=['start_operation_dt', 'end_operation_dt'],
                  sep=',',
                  encoding = 'utf-8')

# тут уже происходят вычисления
df = df.compute()

df.head(3)

Unnamed: 0,checks_number,store_uuid,employees_id,quantity,selling_price,checkout_id1,start_operation_dt,end_operation_dt
0,787514,tdel8oni-egjd-ddij-u7k4-ccxxh7ho12,121140,4,271,6,02.12.2024 10:13:23,02.12.2024 10:13:31
1,19753765,tdel8oni-egjd-ddij-u7k4-ccxxh7ho12,121140,4,271,6,22.12.2024 14:12:31,22.12.2024 14:12:46
2,24941583,tdel8oni-egjd-ddij-u7k4-ccxxh7ho12,121140,4,271,6,27.11.2024 12:57:10,27.11.2024 12:57:40


## Чтение данных из Postgres посредством Dask

In [5]:
from sqlalchemy import create_engine
import dask.dataframe as dd

# читаем данные с помощью DASK
# DASK использует индес для разделени
df = dd.read_sql(sql = "amocrm_leads",
                 con = "postgresql://test_user:test_password@localhost:5432/test_db", 
                 index_col = "id",
                 schema = "analytical",
                 npartitions = 10
                )

df.head(3)

Unnamed: 0_level_0,hash_id,system_created_at,system_updated_at,name,price,status_id,responsible_user_id
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
10001,89690e919b15de3ead32ffe405b4b415,2025-10-28 17:51:02.820966,2025-11-27 11:01:02.820966,"""Лид #1 - Кондиционер""",50000.0,123,456
10002,91eb152f241e46fc8addcf90eb964754,2025-10-28 18:51:02.820966,2025-11-27 11:11:02.820966,"""Лид #2 - Монтаж""",75000.0,124,457
10003,14adae003b59b5421d1a2d6dc86a84ee,2025-10-28 19:51:02.820966,2025-11-27 11:21:02.820966,"""Лид #3 - Сервис""",30000.0,125,458


## Параллельное выполнение функций с помощью Dask

In [15]:
import dask
import time

start_time = time.time()

# наша функция которая имитирует какой-то action
def dask_func(id):
    print(f"Обрабатывается элемент {id}...")
    time.sleep(1)
    return id

# оборачиваем нашу функцию в "ленивую" версию функции
lazy_dask_func = dask.delayed(dask_func)

# ВАРИАНТ 1 (оптимизированный)
list_results = [lazy_dask_func(i) for i in range(10)]

# ВАРИАНТ 2 (НЕоптимизированный)
# list_results = []

# for i in range(10):
#     list_results.append(lazy_dask_func(i))

# запуск графа, где * - распаковка спика в отдельные аргументы
dask.compute(*list_results)

print(f"Параллельно с Dask: {time.time() - start_time:.2f} сек.")

Параллельно с Dask: 2.01 сек.


## Гибкое управлене задачами (пример с ETL)

In [22]:
from dask import delayed
import time

# чтение данных
@delayed
def extract_data():
    time.sleep(1)
    print('Данные прочитаны')
    
# обработка данных
@delayed
def transform_data():
    time.sleep(1)
    print('Данные обработаны')

# загрузка данных
@delayed
def load_data():
    time.sleep(1)
    print('Данные загружены') 


final_result = load_data()

# Выполнение всего графа
result = final_result.compute()