# Session 19 🐍

☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️☀️

***

# 150. Dask 
Dask is a flexible parallel computing library in Python that integrates with the PyData ecosystem (NumPy, Pandas, Scikit-learn) to enable scalable and efficient computation on datasets that don't fit into memory. It provides dynamic task scheduling and parallel execution, making it ideal for big data processing, machine learning, and scientific computing.

Dask is designed to:
- Scale Python workflows from a single machine to distributed clusters.
- Mimic familiar APIs (like Pandas, NumPy) for seamless adoption.
- Handle larger-than-memory datasets efficiently using lazy evaluation and chunking.
- Work with existing tools (e.g., NumPy, Pandas, Scikit-learn, XGBoost).

***

# 151. Important Features
- Parallel Computing: Automatically splits tasks across CPU cores.
- Lazy Evaluation: Computations are deferred until explicitly requested.
- Out-of-Core Processing: Works with datasets larger than RAM.
- Distributed Computing: Scales to clusters (using dask.distributed).
- Integration: Works with Pandas, NumPy, Scikit-learn, and more.

***

# 152. Dask Components
Dask provides different high-level collections and low-level schedulers.

***

## 152-1. High-Level Collections

- `dask.array`:	Parallel NumPy arrays
- `dask.dataframe`:	Parallel Pandas DataFrames
- `dask.bag`: Parallel list-like operations
- `dask.delayed`: Custom task parallelism

***

## 152-2. Low-Level Schedulers
- `Single-machine scheduler` (default, multi-threaded/multi-process).
- `Distributed scheduler` (dask.distributed) for clusters.

***

# 153. Dask in Action

***

## 153-1. Dask Arrays (dask.array)
Chunked NumPy arrays processed in parallel.

In [None]:
import dask.array as da

# Create a large random array (split into chunks)
x = da.random.random((10000, 10000), chunks=(1000, 1000))

# Compute mean (lazy evaluation)
mean = x.mean()
mean.compute()  # Triggers actual computation

***

## 153-2. Dask DataFrames (dask.dataframe)
Larger-than-memory Pandas DataFrames with the same API.

In [None]:
import dask.dataframe as dd

# Read a large CSV in chunks
df = dd.read_csv("large_dataset.csv", blocksize=25e6)  # 25MB chunks

# Groupby and aggregate (lazy)
result = df.groupby("category").price.mean()
result.compute()  # Executes the computation

***

## 153-3. Dask Bags (dask.bag)
Parallel processing of semi-structured data (JSON, logs).

In [None]:
import dask.bag as db

# Process JSON lines in parallel
bag = db.read_text("logs.jsonl").map(json.loads)
filtered = bag.filter(lambda x: x["error"]).pluck("message")
filtered.compute()

***

## 153-4. Dask Delayed (dask.delayed)
Parallelize custom functions with minimal changes.

In [None]:
from dask import delayed

@delayed
def slow_function(x):
    return x * 2

results = [slow_function(i) for i in range(10)]
dask.compute(*results)  # Runs in parallel

***

# 154. Dask Distributed (Scaling to Clusters)
Dask can run on distributed systems (e.g., Kubernetes, HPC, cloud clusters).

***

## 154-1. Local Cluster (Multi-core)

In [None]:
from dask.distributed import Client

client = Client()  # Starts a local multi-worker cluster

***

## 154-2. Scaling to a Cluster

In [None]:
from dask.distributed import Client

# Connect to a remote cluster (e.g., Kubernetes, SLURM, YARN)
client = Client("scheduler-address:8786")

***

## 154-3. Dask Dashboard
Real-time monitoring at http://localhost:8787 (if using Client()).

***

# 155. When to Use Dask?
- Your data is too big for Pandas/NumPy but fits on a single machine.
- You need parallel processing without rewriting code.
- You want to scale from a laptop to a cluster seamlessly.
- You work with machine learning pipelines (Dask-ML).

***

# 156. Limitations
- Not as fast as Spark for very large distributed data (but easier to use).
- Overhead for small datasets (use Pandas/NumPy if data fits in RAM).
- Some Pandas operations not fully optimized yet.

***

***

# Some Excercises

**1.** Create a large random dask.array (shape (5000, 5000), chunked into (1000, 1000)).

Compute the mean, standard deviation, and sum.

Compare execution time with NumPy (%timeit in Jupyter).

___

**2.** Load a CSV file (e.g., sample data) using dask.dataframe.

Filter rows where a column (e.g., value > 0).

Group by a categorical column and compute aggregations (mean, max).

---

**3.** Read a JSON file (or multiline JSON) using dask.bag.

Extract a field (e.g., user_id).

Count occurrences of a value (e.g., error_type).

---

**4.** Create 3 functions (@delayed) that simulate slow tasks (e.g., time.sleep(1)).

Chain them in a workflow (e.g., task3(task2(task1(x)))).

Compute in parallel and compare runtime vs. sequential execution.

***

**5.** Initialize a local Client() with 4 workers.

Run a Dask DataFrame operation (e.g., groupby().mean()).

Monitor progress in the Dask Dashboard (http://localhost:8787).

***

**6.** Deploy a temporary Dask cluster (e.g., using dask-cloudprovider or LocalCluster).

Submit a parallel job (e.g., processing multiple files).

Verify worker logs in the dashboard.

***

**7.** Train a Scikit-learn model (e.g., RandomForestRegressor) on a large dataset using dask_ml.

Compare training time with/without Dask.

***

**8.** Benchmark Dask vs. Pandas on a small dataset (<1GB).

Identify operations where Pandas outperforms Dask.

***

#                                                        🌞 https://github.com/AI-Planet 🌞