Dask – це відкрита бібліотека для Python, призначена для паралельних обчислень. Вона дозволяє ефективно обробляти великі набори даних, які не вміщаються в оперативну пам'ять однієї машини. Dask розбиває великі завдання на менші, які можуть оброблятися паралельно, а потім об'єднує результати.

Перевагами Dask є:
* Масштабування - Dask дозволяє легко масштабувати обчислення на великих кластерах, що робить його ідеальним для роботи з петабайтними даними.
* Швидкість - Паралельна обробка дозволяє значно прискорити виконання обчислень, особливо для задач, які потребують великої обчислювальної потужності.
* Зручність використання - Dask має простий і інтуїтивно зрозумілий інтерфейс, який нагадує такі популярні бібліотеки, як Pandas і NumPy. Це дозволяє швидко освоїти Dask навіть тим, хто вже працював з цими інструментами.
* Гнучкість - Dask можна використовувати для широкого спектру задач, включаючи аналіз даних, машинне навчання, наукові обчислення тощо.
Основні типи даних у Dask:

Dask працює з двома основними типами даних:

* Futures - Це об'єкти, які представляють результат майбутнього обчислення. Вони дозволяють запускати обчислення асинхронно і отримувати результати пізніше.
* Delayed - Це об'єкти, які відкладають виконання обчислень до того моменту, поки не буде викликаний метод compute(). Це дозволяє будувати складні обчислювальні графи.

In [1]:
import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd

In [2]:
pandas_df = pd.read_csv('ml-latest-small/combined_ratings_movies_data.csv')
pandas_df

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1,1,4.0,964982703,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,5,1,4.0,847434962,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,7,1,4.5,1106635946,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
3,15,1,2.5,1510577970,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
4,17,1,4.5,1305696483,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
...,...,...,...,...,...,...
100831,610,160341,2.5,1479545749,Bloodmoon (1997),Action|Thriller
100832,610,160527,4.5,1479544998,Sympathy for the Underdog (1971),Action|Crime|Drama
100833,610,160836,3.0,1493844794,Hazard (2005),Action|Drama|Thriller
100834,610,163937,3.5,1493848789,Blair Witch (2016),Horror|Thriller


## Створення датафреймів

### З Pandas датафреймів

[from_pandas()](https://docs.dask.org/en/stable/generated/dask.dataframe.from_pandas.html)

In [3]:
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df

Unnamed: 0_level_0,userId,movieId,rating,timestamp,title,genres
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,int64,int64,float64,int64,object,object
50418,...,...,...,...,...,...
100835,...,...,...,...,...,...


### З CSV файлів

[read_csv()](https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html).
Зауваження. Dask при завантажені даних з пропусками по замовчуванню видає помилку. Щоб обійти це обмеження використовується параметр `assume_missing=True`.

In [4]:
dask_df = dd.read_csv('ml-latest-small/combined_ratings_movies_data.csv')
dask_df

Unnamed: 0_level_0,userId,movieId,rating,timestamp,title,genres
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int64,int64,float64,int64,object,object
,...,...,...,...,...,...


### З Parquet файлів

[read_parquet()](https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html)

In [5]:
dask_df = dd.read_parquet('sample1.parquet')
dask_df

Unnamed: 0_level_0,column0,column1
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,object,object
,...,...


## Основні операції.

Зауваження. Наступні операції для візуальізації результатів будуть використовувати метод head(). В загальному, операції, які будуть використовуватись, не будуть показувати кінцеві результати одразу. Це пов'язано з тим, що Dask працює з відкладеним обчисленням. Dask будує граф обчислень, описуючи послідовність операцій, які потрібно виконати. Фактичні обчислення відкладаються до моменту виклику методу `compute()`. Це дозволяє оптимізувати обчислення і уникати зайвих проміжних результатів.

Разом з тим, такі методи як `head()`, `tail()`, `describe()` тощо, обчислюють лише необхідну частину даних.

In [6]:
dask_df = dd.read_csv('ml-latest-small/combined_ratings_movies_data.csv')

### Перегляд даних

[head()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.head.html)

In [7]:
dask_df.head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1,1,4.0,964982703,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,5,1,4.0,847434962,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,7,1,4.5,1106635946,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
3,15,1,2.5,1510577970,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
4,17,1,4.5,1305696483,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy


[tail()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.tail.html)

In [8]:
dask_df.tail()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
100831,610,160341,2.5,1479545749,Bloodmoon (1997),Action|Thriller
100832,610,160527,4.5,1479544998,Sympathy for the Underdog (1971),Action|Crime|Drama
100833,610,160836,3.0,1493844794,Hazard (2005),Action|Drama|Thriller
100834,610,163937,3.5,1493848789,Blair Witch (2016),Horror|Thriller
100835,610,163981,3.5,1493850155,31 (2016),Horror


### Отримання інформації про DataFrame:

[info()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.info.html)

In [9]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 6 entries, userId to genres
dtypes: object(2), float64(1), int64(3)

[describe()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.describe.html)

In [10]:
dask_df.describe()

Unnamed: 0_level_0,userId,movieId,rating,timestamp
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,float64,float64,float64,float64
,...,...,...,...


### Фільтрація.

Фільтрація за умовою:

In [11]:
dask_df[dask_df['rating'] == 0.5].head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
26,76,1,0.5,1439165548,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
245,308,3,0.5,1421374465,Grumpier Old Men (1995),Comedy|Romance
403,104,47,0.5,1053336550,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
517,426,47,0.5,1451081886,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
865,34,110,0.5,1162049056,Braveheart (1995),Action|Drama|War


Фільтрація за допомогою [query](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.query.html):

In [12]:
dask_df.query('rating == 0.5').head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
26,76,1,0.5,1439165548,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
245,308,3,0.5,1421374465,Grumpier Old Men (1995),Comedy|Romance
403,104,47,0.5,1053336550,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
517,426,47,0.5,1451081886,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
865,34,110,0.5,1162049056,Braveheart (1995),Action|Drama|War


### Сортування.

Для сортування викорисовується метод [sort_values](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.sort_values.html).

Сортування за одни стовпцем:

In [13]:
dask_df.sort_values('rating').head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
22891,116,2683,0.5,1337199960,Austin Powers: The Spy Who Shagged Me (1999),Action|Adventure|Comedy
100117,599,6557,0.5,1498502617,Born to Be Wild (1995),Adventure|Children|Comedy|Drama
86341,89,946,0.5,1520408337,To Be or Not to Be (1942),Comedy|Drama|War
91521,298,1107,0.5,1479065495,Loser (1991),Comedy
18311,3,1124,0.5,1306464216,On Golden Pond (1981),Drama


In [14]:
dask_df.sort_values('rating', ascending=False).head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
77065,42,4102,5.0,996258715,Eddie Murphy Raw (1987),Comedy|Documentary
31492,58,377,5.0,847718657,Speed (1994),Action|Romance|Thriller
31485,31,377,5.0,850467368,Speed (1994),Action|Romance|Thriller
14890,380,3033,5.0,1494803646,Spaceballs (1987),Comedy|Sci-Fi
74460,456,1393,5.0,856883540,Jerry Maguire (1996),Drama|Romance


Сортування за кількома стовпцями:

In [15]:
dask_df.sort_values(['rating', 'timestamp']).head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
99187,474,5771,0.5,1053021898,My Bloody Valentine (1981),Drama|Horror|Thriller
88080,474,4676,0.5,1053021903,Troop Beverly Hills (1989),Comedy
96212,474,1526,0.5,1053021910,Fathers' Day (1997),Comedy
66707,474,4040,0.5,1053021938,Don't Tell Mom the Babysitter's Dead (1991),Comedy
76652,474,2796,0.5,1053021959,Funny Farm (1988),Comedy


In [16]:
dask_df.sort_values(['rating', 'timestamp'], ascending=False).head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
86989,210,177765,5.0,1537632257,Coco (2017),Adventure|Animation|Children
96391,305,148671,5.0,1537354985,Saw (2003),Crime|Horror
55661,331,57669,5.0,1537235356,In Bruges (2008),Comedy|Crime|Drama|Thriller
69638,331,55820,5.0,1537235354,No Country for Old Men (2007),Crime|Drama
4757,331,608,5.0,1537235353,Fargo (1996),Comedy|Crime|Drama|Thriller


### Групування даних.

Групування даних відбувається методом [groupby()](https://examples.dask.org/dataframes/02-groupby.html).

In [17]:
grouped_df = dask_df.groupby('genres')
grouped_df

<dask.dataframe.groupby.DataFrameGroupBy at 0x1471ce510>

Аналогічно до Pandas/Polars, для роботи з об'єктом групування застосовують функції агрегації. Також, у випадку Dask функції агрегування є лінивими, тому, щоб отримати кінцеве значення потрібно викликати метод [compute()](https://docs.dask.org/en/stable/generated/dask.dataframe.compute.html).

In [18]:
grouped_df.size()

Dask Series Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: dataframe-groupby-size-agg, 3 graph layers

In [19]:
grouped_df.size().compute()

genres
(no genres listed)                      47
Action                                 186
Action|Adventure                       555
Action|Adventure|Animation              42
Action|Adventure|Animation|Children     39
                                      ... 
Sci-Fi|Thriller                        116
Sci-Fi|Thriller|IMAX                    12
Thriller                               628
War                                      9
Western                                151
Length: 951, dtype: int64

Далі застосуємо функції агрегування.

In [20]:
grouped_df['rating'].mean()

Dask Series Structure:
npartitions=1
    float64
        ...
Name: rating, dtype: float64
Dask Name: truediv, 7 graph layers

In [21]:
grouped_df['rating'].mean().compute()

genres
(no genres listed)                     3.489362
Action                                 2.935484
Action|Adventure                       3.706306
Action|Adventure|Animation             3.583333
Action|Adventure|Animation|Children    3.410256
                                         ...   
Sci-Fi|Thriller                        3.280172
Sci-Fi|Thriller|IMAX                   3.708333
Thriller                               3.426752
War                                    3.555556
Western                                3.658940
Name: rating, Length: 951, dtype: float64

In [22]:
grouped_df['rating'].sum().compute()

genres
(no genres listed)                      164.0
Action                                  546.0
Action|Adventure                       2057.0
Action|Adventure|Animation              150.5
Action|Adventure|Animation|Children     133.0
                                        ...  
Sci-Fi|Thriller                         380.5
Sci-Fi|Thriller|IMAX                     44.5
Thriller                               2152.0
War                                      32.0
Western                                 552.5
Name: rating, Length: 951, dtype: float64

In [23]:
grouped_df['rating'].max().compute()

genres
(no genres listed)                     5.0
Action                                 5.0
Action|Adventure                       5.0
Action|Adventure|Animation             5.0
Action|Adventure|Animation|Children    5.0
                                      ... 
Sci-Fi|Thriller                        5.0
Sci-Fi|Thriller|IMAX                   4.5
Thriller                               5.0
War                                    5.0
Western                                5.0
Name: rating, Length: 951, dtype: float64

In [24]:
grouped_df['rating'].min().compute()

genres
(no genres listed)                     0.5
Action                                 0.5
Action|Adventure                       0.5
Action|Adventure|Animation             2.0
Action|Adventure|Animation|Children    0.5
                                      ... 
Sci-Fi|Thriller                        0.5
Sci-Fi|Thriller|IMAX                   2.5
Thriller                               0.5
War                                    3.0
Western                                1.0
Name: rating, Length: 951, dtype: float64

In [25]:
grouped_df['rating'].count().compute()

genres
(no genres listed)                      47
Action                                 186
Action|Adventure                       555
Action|Adventure|Animation              42
Action|Adventure|Animation|Children     39
                                      ... 
Sci-Fi|Thriller                        116
Sci-Fi|Thriller|IMAX                    12
Thriller                               628
War                                      9
Western                                151
Name: rating, Length: 951, dtype: int64

### Складні агрегації.
У випадку, коли потрібно застосувати функції агрегації, складніші за ті, які надаються самою бібліотекою, можна використати метод [apply()](https://docs.dask.org/en/stable/generated/dask.dataframe.Series.apply.html):

In [26]:
def custom_agg(group):
    return group['rating'].sum() / group['rating'].mean()

grouped_df.apply(custom_agg, meta=('result', 'float64')).compute()

genres
(no genres listed)                      47.0
Action                                 186.0
Action|Adventure                       555.0
Action|Adventure|Animation              42.0
Action|Adventure|Animation|Children     39.0
                                       ...  
Sci-Fi|Thriller                        116.0
Sci-Fi|Thriller|IMAX                    12.0
Thriller                               628.0
War                                      9.0
Western                                151.0
Name: result, Length: 951, dtype: float64

### Робота з пропущеними даними.

Вибір методу обробки пропущених даних залежить від конкретної задачі і характеру даних.

* Заміна на константу: Підходить, якщо пропущені значення не несуть суттєвої інформації або якщо відомо, що вони повинні мати певне значення.
* Заміна на статистичні характеристики: Дозволяє зберегти загальну тенденцію даних.
* Видалення рядків: Використовується, якщо кількість пропущених даних невелика і їх видалення не призведе до суттєвої втрати інформації.
* Інші методи: Існують і інші складніші методи, такі як інтерполяція, використання моделей машинного навчання для заповнення пропусків тощо.

In [27]:
data = {'column1': [1, 2, np.nan, 4, 5, np.nan],
        'column2': ['A', np.nan, 'C', 'D', 'E', np.nan],
        'column3': [10.0, 20.0, 30.0, np.nan, 50.0, np.nan]}

# Створимо Pandas DataFrame
df_pandas = pd.DataFrame(data)

# Перетворимо Pandas DataFrame в Dask DataFrame
df = dd.from_pandas(df_pandas, npartitions=2)
df.compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,,20.0
2,,C,30.0
3,4.0,D,
4,5.0,E,50.0
5,,,


Для того, щоб перевірити, чи є порожні дані, використовується метод [isnull()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.isnull.html).

In [28]:
df.isnull().compute()

Unnamed: 0,column1,column2,column3
0,False,False,False
1,False,True,False
2,True,False,False
3,False,False,True
4,False,False,False
5,True,True,True


Заповнення пропущених даних методом [fillna()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.fillna.html):

константним значенням:

In [29]:
df.fillna(0).compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,0,20.0
2,0.0,C,30.0
3,4.0,D,0.0
4,5.0,E,50.0
5,0.0,0,0.0


Заміна на значення, яке обраховується:

In [30]:
df.fillna(df.max()).compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,,20.0
2,5.0,C,30.0
3,4.0,D,50.0
4,5.0,E,50.0
5,5.0,,50.0


In [31]:
df.fillna(df['column1'].mean()).compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,3.0,20.0
2,3.0,C,30.0
3,4.0,D,3.0
4,5.0,E,50.0
5,3.0,3.0,3.0


Заміна на значення з попереднього або наступного рядка:

In [32]:
df.fillna(method='ffill').compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,A,20.0
2,2.0,C,30.0
3,4.0,D,30.0
4,5.0,E,50.0
5,5.0,E,50.0


In [33]:
df.fillna(method='bfill').compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,C,20.0
2,4.0,C,30.0
3,4.0,D,50.0
4,5.0,E,50.0
5,,,


Видалення рядків, які містять пропущені значення виконується методом [dropna()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.dropna.html).

Видалення рядків з будь-яким пропущеним значенням:

In [34]:
df.dropna().compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
4,5.0,E,50.0


Видалення рядків, де пропущені всі значення:

In [35]:
df.dropna(how='all').compute()

Unnamed: 0,column1,column2,column3
0,1.0,A,10.0
1,2.0,,20.0
2,,C,30.0
3,4.0,D,
4,5.0,E,50.0


### Об’єднання та злиття даних.

Основні методи для злиття DataFrame в Dask:

* [merge()](https://docs.dask.org/en/latest/generated/dask.dataframe.merge.html): Цей метод використовується для злиття DataFrame за спільними стовпцями. Він аналогічний методу merge() в Pandas.
* [concat()](https://docs.dask.org/en/latest/generated/dask.dataframe.concat.html): Цей метод використовується для конкатенації DataFrame по рядках або стовпцях.

Типи злиття:
* Внутрішнє злиття (inner join): Зберігає тільки ті рядки, які мають відповідні значення в обох DataFrame за спільними стовпцями.
* Зовнішнє злиття (outer join): Зберігає всі рядки з обох DataFrame, заповнюючи пропущені значення.
* Ліве злиття (left join): Зберігає всі рядки з лівого DataFrame і додає відповідні рядки з правого DataFrame, якщо вони є.
* Праве злиття (right join): Аналогічно лівому, але зберігає всі рядки з правого DataFrame.

In [36]:
ratings_df = dd.read_csv('ml-latest-small/ratings.csv')
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [37]:
movies_df = dd.read_csv('ml-latest-small/movies.csv')
movies_df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [38]:
ratings_df.merge(movies_df, on='movieId').compute()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1,1,4.0,964982703,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,5,1,4.0,847434962,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,7,1,4.5,1106635946,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
3,15,1,2.5,1510577970,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
4,17,1,4.5,1305696483,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
...,...,...,...,...,...,...
100831,610,160341,2.5,1479545749,Bloodmoon (1997),Action|Thriller
100832,610,160527,4.5,1479544998,Sympathy for the Underdog (1971),Action|Crime|Drama
100833,610,160836,3.0,1493844794,Hazard (2005),Action|Drama|Thriller
100834,610,163937,3.5,1493848789,Blair Witch (2016),Horror|Thriller


In [39]:
ratings_df.merge(movies_df, on='movieId', how='outer').compute()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1.0,1,4.0,9.649827e+08,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,5.0,1,4.0,8.474350e+08,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,7.0,1,4.5,1.106636e+09,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
3,15.0,1,2.5,1.510578e+09,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
4,17.0,1,4.5,1.305696e+09,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
...,...,...,...,...,...,...
100849,,30892,,,In the Realms of the Unreal (2004),Animation|Documentary
100850,,32160,,,Twentieth Century (1934),Comedy
100851,,32371,,,Call Northside 777 (1948),Crime|Drama|Film-Noir
100852,,34482,,,"Browning Version, The (1951)",Drama


In [40]:
ratings_df.merge(movies_df, on='movieId', how='left').compute()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1,1,4.0,964982703,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,1,3,4.0,964981247,Grumpier Old Men (1995),Comedy|Romance
2,1,6,4.0,964982224,Heat (1995),Action|Crime|Thriller
3,1,47,5.0,964983815,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
4,1,50,5.0,964982931,"Usual Suspects, The (1995)",Crime|Mystery|Thriller
...,...,...,...,...,...,...
100831,610,166534,4.0,1493848402,Split (2017),Drama|Horror|Thriller
100832,610,168248,5.0,1493850091,John Wick: Chapter Two (2017),Action|Crime|Thriller
100833,610,168250,5.0,1494273047,Get Out (2017),Horror
100834,610,168252,5.0,1493846352,Logan (2017),Action|Sci-Fi


In [41]:
df1 = dd.from_pandas(pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']}), npartitions=1)
df2 = dd.from_pandas(pd.DataFrame({'col1': [4, 5, 6], 'col2': ['d', 'e', 'f']}), npartitions=1)
dd.concat([df1, df2]).compute()

Unnamed: 0,col1,col2
0,1,a
1,2,b
2,3,c
0,4,d
1,5,e
2,6,f


In [42]:
dd.concat([df1, df2]).compute()

Unnamed: 0,col1,col2
0,1,a
1,2,b
2,3,c
0,4,d
1,5,e
2,6,f


In [43]:
df3 = dd.from_pandas(pd.DataFrame({'col3': [7, 8, 9]}), npartitions=1)
df4 = dd.from_pandas(pd.DataFrame({'col4': ['g', 'h', 'i']}), npartitions=1)

dd.concat([df3, df4], axis=1).compute()

Unnamed: 0,col3,col4
0,7,g
1,8,h
2,9,i


## Робота з часовими рядами.
### Створення та маніпуляції з колонками типу `datetime`.
Створення:

In [44]:
dates = pd.date_range('2023-01-01', periods=1000)
df = dd.from_pandas(pd.DataFrame({'date': dates, 'value': np.random.randn(1000)}), npartitions=4)
df.compute()

Unnamed: 0,date,value
0,2023-01-01,-0.494722
1,2023-01-02,2.385669
2,2023-01-03,-1.183180
3,2023-01-04,-0.332248
4,2023-01-05,0.390986
...,...,...
995,2025-09-22,1.436101
996,2025-09-23,0.594444
997,2025-09-24,-0.431172
998,2025-09-25,0.295432


Виділення компонентів дати:

In [45]:
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df.compute()

Unnamed: 0,date,value,year,month,day
0,2023-01-01,-0.494722,2023,1,1
1,2023-01-02,2.385669,2023,1,2
2,2023-01-03,-1.183180,2023,1,3
3,2023-01-04,-0.332248,2023,1,4
4,2023-01-05,0.390986,2023,1,5
...,...,...,...,...,...
995,2025-09-22,1.436101,2025,9,22
996,2025-09-23,0.594444,2025,9,23
997,2025-09-24,-0.431172,2025,9,24
998,2025-09-25,0.295432,2025,9,25


### Часові зсуви та агрегування.
Часові зсуви.

In [46]:
df['date_shifted'] = df['date'] + pd.Timedelta(days=1)
df.compute()

Unnamed: 0,date,value,year,month,day,date_shifted
0,2023-01-01,-0.494722,2023,1,1,2023-01-02
1,2023-01-02,2.385669,2023,1,2,2023-01-03
2,2023-01-03,-1.183180,2023,1,3,2023-01-04
3,2023-01-04,-0.332248,2023,1,4,2023-01-05
4,2023-01-05,0.390986,2023,1,5,2023-01-06
...,...,...,...,...,...,...
995,2025-09-22,1.436101,2025,9,22,2025-09-23
996,2025-09-23,0.594444,2025,9,23,2025-09-24
997,2025-09-24,-0.431172,2025,9,24,2025-09-25
998,2025-09-25,0.295432,2025,9,25,2025-09-26


In [47]:
df['date_shifted'] = df['date'] + pd.Timedelta(days=1, weeks=1)
df.compute()

Unnamed: 0,date,value,year,month,day,date_shifted
0,2023-01-01,-0.494722,2023,1,1,2023-01-09
1,2023-01-02,2.385669,2023,1,2,2023-01-10
2,2023-01-03,-1.183180,2023,1,3,2023-01-11
3,2023-01-04,-0.332248,2023,1,4,2023-01-12
4,2023-01-05,0.390986,2023,1,5,2023-01-13
...,...,...,...,...,...,...
995,2025-09-22,1.436101,2025,9,22,2025-09-30
996,2025-09-23,0.594444,2025,9,23,2025-10-01
997,2025-09-24,-0.431172,2025,9,24,2025-10-02
998,2025-09-25,0.295432,2025,9,25,2025-10-03


Агрегування за часовими інтервалами.
* [resample()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.resample.html): Для агрегування за регулярними інтервалами (години, дні, місяці).
* [rolling()](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.rolling.html): Для створення ковзних вікон.

Зауваження. При використанні обох методів, необхідно, щоб колонка з датою була індексованою у DataFrame.

In [48]:
df = df.set_index('date')

Сумування по тижнях.

In [49]:
df.resample('W').sum().compute() 

  meta = getattr(meta_r, how)(*how_args, **how_kwargs)
  out = getattr(series.resample(rule, **resample_kwargs), how)(


Unnamed: 0_level_0,value,year,month,day
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-01,-0.494722,2023,1,1
2023-01-08,2.104194,14161,7,35
2023-01-15,-2.598580,14161,7,84
2023-01-22,-1.646891,14161,7,133
2023-01-29,-2.254376,14161,7,182
...,...,...,...,...
2025-08-31,-3.307244,14175,56,196
2025-09-07,6.945487,14175,63,28
2025-09-14,4.712706,14175,63,77
2025-09-21,2.302929,14175,63,126


Розрахунок середнього по місяцю.

In [50]:
df.resample('M').mean().compute() 

Unnamed: 0_level_0,value,year,month,day
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-31,-0.185625,2023.0,1.0,16.0
2023-02-28,-0.052822,2023.0,2.0,14.5
2023-03-31,0.091843,2023.0,3.0,16.0
2023-04-30,-0.008154,2023.0,4.0,15.5
2023-05-31,0.013634,2023.0,5.0,16.0
2023-06-30,0.158824,2023.0,6.0,15.5
2023-07-31,0.068046,2023.0,7.0,16.0
2023-08-31,-0.045711,2023.0,8.0,16.0
2023-09-30,0.174948,2023.0,9.0,15.5
2023-10-31,-0.154306,2023.0,10.0,16.0


In [51]:
df.rolling('7D').mean().compute()

Unnamed: 0_level_0,value,year,month,day
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-01,-0.494722,2023.0,1.0,1.0
2023-01-02,0.945474,2023.0,1.0,1.5
2023-01-03,0.235923,2023.0,1.0,2.0
2023-01-04,0.093880,2023.0,1.0,2.5
2023-01-05,0.153301,2023.0,1.0,3.0
...,...,...,...,...
2025-09-22,0.412616,2025.0,9.0,19.0
2025-09-23,0.279107,2025.0,9.0,20.0
2025-09-24,0.119726,2025.0,9.0,21.0
2025-09-25,0.334773,2025.0,9.0,22.0


## Ліниві обчислення у Dask.

Ліниві обчислення – це парадигма програмування, де обчислення відкладаються до останнього моменту, коли результат дійсно потрібен. Це дозволяє ефективно працювати з великими наборами даних, оскільки обчислення виконуються лише для необхідної частини даних.

Dask Delayed – це інструмент у Dask, який дозволяє створювати граф обчислень, де кожен вузол графа представляє деяку операцію над даними. Цей граф не виконується відразу, а лише тоді, коли ми явно викликаємо метод compute().

### Створення delayed об'єктів:
Нехай є якась функція, яка використовується для обробки даних:

In [52]:
def example_function(x):
    return x * 2

При обробці даних, ми викликаємо цю функцію, але оскільки результати не потрібні нам зразу, є можливість використати відкладене виконання:

In [53]:
lazy_result = dask.delayed(example_function)(10)
lazy_result

Delayed('example_function-3cf3c594-820e-4132-ba5b-f1347aad2fe2')

Десь у подальшому з'являється необхідність модифікувати ці дані ще раз певним чином, у цьому випадку додати 5:

In [54]:
lazy_result2 = lazy_result + 5
lazy_result2

Delayed('add-55043262d884c0f4b376da1bed101d51')

До моменту, поки не було викликано метод `compute()`, ніяких розрахунків виконано не було, усі дії були записані у граф, який і обробляється при виклику `compute()`:

In [55]:
result = lazy_result2.compute()
result

25

Це має рад переваг:
* Компіляція: Dask автоматично оптимізує граф обчислень, зменшуючи кількість проміжних результатів.
* Кешування: Dask кешує проміжні результати, щоб уникнути повторних обчислень.
* Паралелізм: Dask автоматично розподіляє обчислення по доступним ядрам процесора або навіть по кількох машинах.

### Паралельна обробка даних в Dask.

Dask чудово підходить для паралельної обробки великих наборів даних. Він розбиває дані на частини, які обробляються паралельно, а потім об'єднує результати.

* Dask DataFrame – це розподілений аналог Pandas DataFrame, який дозволяє працювати з великими табличними даними.
* Dask Bag – це розподілена колекція, яка дозволяє працювати з будь-якими ітерабельними об'єктами.

Переваги лінивих обчислень і Dask:
* Ефективне використання пам'яті: Обчислення виконуються лише для необхідних частин даних.
* Паралелізм: Можливість розподілити обчислення по кількох ядрах або машинах.
* Гнучкість: Підтримка різних типів даних і операцій.
* Інтеграція з іншими бібліотеками: Легко інтегрується з іншими інструментами для аналізу даних, такими як NumPy, SciPy, Pandas.

### Бази даних та Dask.
Dask може підключатися до різних баз даних, таких як PostgreSQL, MySQL, SQL Server та інших. Для цього використовуються відповідні драйвери баз даних.
Приклад (для PostgreSQL):
`df = dd.read_sql_table('my_table', 'postgresql://user:password@host:port/database')`

Dask підтримує виконання SQL-запитів до баз даних.

`query = "SELECT * FROM my_table WHERE column1 > 100"`
`df = dd.read_sql_query(query, 'postgresql://user:password@host:port/database')`

### Збереження даних після завершення роботи з ними. 
Dask пропонує ряд методів для збереження даних у різних форматах. Вони починаються частинкою `to_` і далі йде бажаний тип файлу, у якому воно буде зберігатись:

In [56]:
df.to_csv('output.csv')

['/Users/mykola/Documents/JupyterNotebooks/МТІД/MTID/Lab4_Dask_multiprocessing_joblib/output.csv/0.part',
 '/Users/mykola/Documents/JupyterNotebooks/МТІД/MTID/Lab4_Dask_multiprocessing_joblib/output.csv/1.part',
 '/Users/mykola/Documents/JupyterNotebooks/МТІД/MTID/Lab4_Dask_multiprocessing_joblib/output.csv/2.part',
 '/Users/mykola/Documents/JupyterNotebooks/МТІД/MTID/Lab4_Dask_multiprocessing_joblib/output.csv/3.part']

In [57]:
df.to_parquet('data.parquet')