# Introduction To Dask

In [1]:
# Built-in library
from pathlib import Path
import re
import json
from typing import Any, Optional, Union
import logging
import warnings

# Standard imports
import numpy as np
import numpy.typing as npt
from pprint import pprint
import pandas as pd
import polars as pl
from rich.console import Console
from rich.theme import Theme

custom_theme = Theme(
    {
        "info": "#76FF7B",
        "warning": "#FBDDFE",
        "error": "#FF0000",
    }
)
console = Console(theme=custom_theme)

# Visualization
import matplotlib.pyplot as plt


# Pandas settings
pd.options.display.max_rows = 1_000
pd.options.display.max_columns = 1_000
pd.options.display.max_colwidth = 600

warnings.filterwarnings("ignore")

# Add seed
np.random.seed(0)

# Black code formatter (Optional)
%load_ext lab_black

# auto reload imports
%load_ext autoreload
%autoreload 2

In [2]:
# Import Dask
import dask
import dask.dataframe as dd

## Dask DataFrames

- Dask Dataframes parallelize the popular pandas library, providing:
  - Larger-than-memory execution for single machines, allowing you to process data that is larger than your available RAM.
  - Parallel execution for faster processing.
  - Distributed computation for terabyte-sized datasets.
- It can be thought of as a `collection of many Pandas DataFrames`.

<img src="./images/dask_dfs.png" alt="Dask DataFrame" width="400">

In [3]:
# Set seet for reproducibility
np.random.seed(seed=123)


def main() -> None:
    # Create data
    N: int = 5_000
    data_dict: dict[str, Any] = {
        "a": np.arange(N),
        "b": np.random.randn(N),
        "c": np.random.choice(["a", "b", "c"], size=N),
    }

    # Create Pandas DF
    df_pandas: pd.DataFrame = pd.DataFrame(data_dict)
    console.print(f"df_pandas: \n{df_pandas.head()}")

    # Create Dask DF from Pandas DF
    df_dask: dd.DataFrame = dd.from_pandas(
        data=df_pandas,
        npartitions=10,  # this will partition the dataframe into 10 (Pandas DFs) partitions
    )
    # Lazy Evaluation
    console.print(f"Lazy Evaluation [df_dask]: {df_dask}")
    console.print(f"df_dask: \n{df_dask.head()}")
    console.print(f"Convert to Pandas [df_dask]: {df_dask.compute()}")

In [4]:
main()

In [5]:
# Create data
N: int = 5_000
data_dict: dict[str, Any] = {
    "a": np.arange(N),
    "b": np.random.randn(N),
    "c": np.random.choice(["a", "b", "c"], size=N),
}

# Create Pandas DF
df_pandas: pd.DataFrame = pd.DataFrame(data_dict)
console.print(f"df_pandas: \n{df_pandas.head()}")

# Create Dask DF from Pandas DF
df_dask: dd.DataFrame = dd.from_pandas(
    data=df_pandas,
    npartitions=10,  # this will partition the dataframe into 10 (Pandas DFs) partitions
)
# Lazy Evaluation
# console.print(f"Lazy Evaluation [df_dask]: {df_dask}")
# console.print(f"df_dask: \n{df_dask.head()}")
# console.print(f"Convert to Pandas [df_dask]: {df_dask.compute()}")

In [6]:
# Convert to Pandas DF
df_dask.compute()

Unnamed: 0,a,b,c
0,0,1.102583,c
1,1,-1.071606,b
2,2,-0.971151,a
3,3,-0.092405,a
4,4,-1.809795,c
...,...,...,...
4995,4995,-0.199352,b
4996,4996,-0.914287,b
4997,4997,1.047449,c
4998,4998,-0.877910,c


In [7]:
# Access Partitions
df_dask.partitions[0].compute().head()

Unnamed: 0,a,b,c
0,0,1.102583,c
1,1,-1.071606,b
2,2,-0.971151,a
3,3,-0.092405,a
4,4,-1.809795,c


In [8]:
df_dask.partitions[5].compute().head()

Unnamed: 0,a,b,c
2500,2500,-1.951697,a
2501,2501,0.460704,b
2502,2502,-1.379373,c
2503,2503,-0.68187,c
2504,2504,1.304617,c


In [9]:
# Perform Aggregate Function(s)
sum_lazy_df = df_dask.groupby("c").sum()
sum_df: pd.DataFrame = sum_lazy_df.compute()

sum_df

Unnamed: 0_level_0,a,b
c,Unnamed: 1_level_1,Unnamed: 2_level_1
a,4120832,40.057335
b,4184150,71.243072
c,4192518,-10.268417


In [10]:
mean_lazy_df = df_dask.groupby("c").mean()
mean_df: pd.DataFrame = mean_lazy_df.compute()

mean_df

Unnamed: 0_level_0,a,b
c,Unnamed: 1_level_1,Unnamed: 2_level_1
a,2514.235509,0.02444
b,2499.492234,0.042559
c,2485.191464,-0.006087


### Visualize The Constructed Graph

```sh
pip install ipycytoscape

# OR
pip install graphviz
```


In [11]:
## Dask Delayed


@dask.delayed
def increment(x: int) -> int:
    return x + 1


@dask.delayed
def add(a: int, b: int) -> int:
    return a + b

In [12]:
# Build the computation graph
a = increment(x=1)
b = increment(x=2)
c = add(a=a, b=b)

In [13]:
a, b

(Delayed('increment-255ee635-24b9-48bc-a2de-cf07dbd6593e'),
 Delayed('increment-7d50c429-4930-4a0d-b4e4-385de018af3f'))

In [14]:
console.print(c)

In [15]:
c.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

### Explanation

- The figure above shows that the `increment` function runs in parallel and doesn't depend on any other function.
- the `add` function on the other hand depends on the `increment` function.

In [16]:
c.compute()

5

### [Reading And Saving Data](https://docs.dask.org/en/stable/dataframe-create.html)

```py
# Read CSV files into a Dask.DataFrame
read_csv(urlpath[, blocksize, ...])


# Read a Parquet file into a Dask DataFrame
read_parquet(path[, columns, filters, ...])


# Read HDF files into a Dask DataFrame
read_hdf(pattern, key[, start, stop, ...])


# Read dataframe from ORC file(s)
read_orc(path[, engine, columns, index, ...])


# Create a dataframe from a set of JSON files
read_json(url_path[, orient, lines, ...])


# Read SQL database table into a DataFrame.
read_sql_table(table_name, con, index_col[, ...])


# Read SQL query into a DataFrame.
read_sql_query(sql, con, index_col[, ...])


# Read SQL query or database table into a DataFrame.
read_sql(sql, con, index_col, **kwargs)


# Read delimited files into a Dask.DataFrame
read_table(urlpath[, blocksize, ...])


# Read fixed-width files into a Dask.DataFrame
read_fwf(urlpath[, blocksize, ...])


# Read any sliceable array into a Dask Dataframe
from_array(x[, chunksize, columns, meta])


# Store Dask DataFrame to CSV files
to_csv(df, filename[, single_file, ...])


# Store Dask.dataframe to Parquet files
to_parquet(df, path[, engine, compression, ...])


# Store Dask Dataframe to Hierarchical Data Format (HDF) files
to_hdf(df, path, key[, mode, append, ...])


# Store Dask Dataframe to a SQL table
to_sql(df, name, uri[, schema, if_exists, ...])
```

<rr><hr>

### Dask Collections:

```py
# Create Dask DataFrame from many Dask Delayed objects
from_delayed(dfs[, meta, divisions, prefix, ...])


# Create a Dask DataFrame from a Dask Array.
from_dask_array(x[, columns, index, meta])


# Create a DataFrame collection from a custom function map
from_map(func, *iterables[, args, meta, ...])


# Create Dask Dataframe from a Dask Bag.
dask.bag.core.Bag.to_dataframe([meta, ...])


# Convert into a list of dask.delayed objects, one per partition.
DataFrame.to_delayed([optimize_graph])


# Create Dask Array from a Dask Dataframe
to_records(df)


# Create Dask Bag from a Dask DataFrame
to_bag(df[, index, format])
```

<rr><hr>

### Pandas

```py
# Construct a Dask DataFrame from a Pandas DataFrame
from_pandas()


# Construct a Dask DataFrame from a Python Dictionary
DataFrame.from_dict(data, *, npartitions[, ...])
```

### Dask Delayed

In [17]:
## Without Dask Delayed
import time


def increment(x: int | float) -> int | float:
    time.sleep(1)
    return x + 1


def double(x: int | float) -> int | float:
    time.sleep(1)
    return x * 2


def add(a: int | float, b: int | float) -> int | float:
    time.sleep(1)
    return a + b

In [18]:
start_time = time.time()
data: list[int] = [1, 2, 3, 4, 5]
output: list[int | Any] = []

for x in data:
    a = increment(x=x)
    b = double(x=x)
    c = add(a=a, b=b)
    output.append(c)

total: int = sum(output)
stop_time = time.time()

console.print(f"It took {(stop_time - start_time):.2f}s to process the data")
console.print(f"{total=}")

In [19]:
## Dask Delayed version


@dask.delayed
def increment_delayed(x: int | float) -> int | float:
    time.sleep(1)
    return x + 1


@dask.delayed
def double_delayed(x: int | float) -> int | float:
    time.sleep(1)
    return x * 2


@dask.delayed
def add_delayed(a: int | float, b: int | float) -> int | float:
    time.sleep(1)
    return a + b

In [20]:
start_time = time.time()
data: list[int] = [1, 2, 3, 4, 5]
output: list[int | Any] = []

for x in data:
    a = increment_delayed(x=x)
    b = double_delayed(x=x)
    c = add_delayed(a=a, b=b)
    output.append(c)

total_delayed: Any = dask.delayed(sum)(output)
total: int = total_delayed.compute()
stop_time = time.time()

console.print(f"It took {(stop_time - start_time):.2f}s to process the data")
console.print(f"{total=}")

In [21]:
total_delayed.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

### Futures

- `Futures` help manage asynchronous programming in Python, allowing you to run tasks concurrently and handle their results effectively.

#### Concurrent Futures (i.e. concurrent.futures)

- `concurrent.futures` module: This is a standard library module that provides a high-level interface for running asynchronous tasks concurrently.
- It offers two ways to achieve this:
  - Using threads with `ThreadPoolExecutor`
  - Using separate processes with `ProcessPoolExecutor`
- Both utilize the Future class, which represents the eventual result of an asynchronous operation.
- You can't directly access the return value; instead, you use methods like `.result()` to wait and retrieve it once the task finishes.

#### ThreadPoolExecutor

```py
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


start_time = time.time()
outputs: list[Any] = []
futures: list[Any] = []

with ThreadPoolExecutor(max_workers=4) as executor:
    for inp in range(10):
        future: Any = executor.submit(double, inp)
        futures.append(future)
    outputs = [future.result() for future in futures]

stop_time = time.time()

console.print(f"It took {(stop_time - start_time):.2}s to process the data")
console.print(f"{outputs=}")
```

<hr>

#### ProcessPoolExecutor

```py
# Process Pool requires the code to be in a different module.
from utils import double

if __name__ == "__main__":

    start_time = time.time()
    outputs: list[Any] = []
    futures: list[Any] = []

    with ProcessPoolExecutor(max_workers=4) as executor:  # updated line!
        for inp in range(10):
            future: Any = executor.submit(double, inp)
            futures.append(future)
        outputs = [future.result() for future in futures]

    stop_time = time.time()

    console.print(f"It took {(stop_time - start_time):.2}s to process the data")
    console.print(f"{outputs=}")
```


In [22]:
def double(x: int | float) -> int | float:
    time.sleep(1)
    return x * 2

In [23]:
# Without Concurency

start_time = time.time()
outputs: list[Any] = []
for inp in range(10):
    output: int | float = double(x=inp)
    outputs.append(output)

stop_time = time.time()

console.print(f"It took {(stop_time - start_time):.2}s to process the data")
console.print(f"{outputs=}")

#### Thread Pool

In [24]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


start_time = time.time()
outputs: list[Any] = []
futures: list[Any] = []

with ThreadPoolExecutor(max_workers=4) as executor:
    for inp in range(10):
        future: Any = executor.submit(double, inp)
        futures.append(future)
    outputs = [future.result() for future in futures]

stop_time = time.time()

console.print(f"It took {(stop_time - start_time):.2}s to process the data")
console.print(f"{outputs=}")

#### Process Pool

In [25]:
# Process Pool requires the code to be in a different module.
from utils import double

if __name__ == "__main__":

    start_time = time.time()
    outputs: list[Any] = []
    futures: list[Any] = []

    with ProcessPoolExecutor(max_workers=4) as executor:  # updated line!
        for inp in range(10):
            future: Any = executor.submit(double, inp)
            futures.append(future)
        outputs = [future.result() for future in futures]

    stop_time = time.time()

    console.print(f"It took {(stop_time - start_time):.2}s to process the data")
    console.print(f"{outputs=}")

### Dask Implementation of Futures

In [26]:
from dask.distributed import Client


if __name__ == "__main__":
    client = Client()

    start_time = time.time()
    futures: list[Any] = []

    for inp in range(10):
        future: Any = client.submit(double, inp)
        futures.append(future)
    outputs: list[Any] = [future.result() for future in futures]

    stop_time = time.time()

    console.print(f"It took {(stop_time - start_time):.2}s to process the data")
    console.print(f"{outputs=}")

In [28]:
## Another Dask Future Implementation
if __name__ == "__main__":
    client = Client()

    start_time = time.time()

    inputs: list[Any] = list(range(10))
    futures: list[Any] = client.map(double, inputs)
    outputs = client.gather(futures)

    stop_time = time.time()

    console.print(f"It took {(stop_time - start_time):.2}s to process the data")
    console.print(f"{outputs=}")

In [None]:
from dask.distributed import Client

client = Client()