# 5. Parallel and Distributed Computing with Dask

## The Scaling Problem: When Data Exceeds Memory

In the age of big data, economists are increasingly working with datasets that are too large to fit into a single computer's RAM. A typical pandas DataFrame, for example, resides entirely in memory. If you try to load a 50 GB file on a machine with 16 GB of RAM, your process will fail. Similarly, many computations are CPU-bound and could be significantly sped up by using all the available cores on a modern processor.

**Dask** is an open-source Python library designed to solve these scaling problems. It provides a flexible framework for parallel and distributed computing, allowing you to scale the Python tools you already know and love—like NumPy, pandas, and scikit-learn—to multi-core machines and large, distributed clusters.

Dask works by breaking large computations into smaller pieces and executing them in parallel on a task graph. It does this lazily, meaning it only computes results when explicitly asked to, which allows for complex, optimized execution plans.

In this notebook, you will learn:
- The core concepts behind Dask: lazy evaluation and task graphs.
- How to use **Dask DataFrames** to work with larger-than-memory tabular data.
- How to use **Dask Arrays** for scalable numerical computing.
- How to set up a local Dask cluster and use its diagnostic dashboard to monitor computations.

### Getting Started: Installation

To get started, you'll need to install Dask. Installing `dask[complete]` ensures you have all the necessary dependencies, including for the dashboard.

In [1]:
%pip install "dask[complete]"








[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Note: you may need to restart the kernel to use updated packages.


### Setting Up a Local Dask Cluster

To use Dask, you first need to set up a "cluster." For a single machine, this means creating a scheduler and a set of "workers" (typically one per CPU core). The `dask.distributed` library makes this incredibly simple. When you create a `Client`, it automatically sets up a local cluster for you.

In [2]:
from dask.distributed import Client

# This creates a local cluster with a scheduler and workers.
# The client provides a dashboard link to monitor your work.
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 7.77 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45483,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:41745,Total threads: 1
Dashboard: http://127.0.0.1:35689/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:33723,
Local directory: /tmp/dask-scratch-space/worker-mja166wm,Local directory: /tmp/dask-scratch-space/worker-mja166wm

0,1
Comm: tcp://127.0.0.1:40245,Total threads: 1
Dashboard: http://127.0.0.1:44091/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:46827,
Local directory: /tmp/dask-scratch-space/worker-t5echcse,Local directory: /tmp/dask-scratch-space/worker-t5echcse

0,1
Comm: tcp://127.0.0.1:45709,Total threads: 1
Dashboard: http://127.0.0.1:42013/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:45117,
Local directory: /tmp/dask-scratch-space/worker-bhzojlhf,Local directory: /tmp/dask-scratch-space/worker-bhzojlhf

0,1
Comm: tcp://127.0.0.1:42589,Total threads: 1
Dashboard: http://127.0.0.1:42005/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:46175,
Local directory: /tmp/dask-scratch-space/worker-5fukvoe6,Local directory: /tmp/dask-scratch-space/worker-5fukvoe6


**Important:** Click the link for the Dashboard. It is a powerful tool that provides a real-time visualization of your computations, showing you which tasks are running on which workers and how memory is being used. Keep it open in a separate browser tab as you go through this notebook.

### Dask DataFrames: Parallelizing Pandas

A Dask DataFrame is a large, parallel DataFrame composed of many smaller pandas DataFrames, split along the index. These smaller DataFrames are called partitions, and each partition can be computed on by a different worker.

The Dask DataFrame API mimics the pandas API, so you can perform many familiar operations. However, these operations are **lazy**—Dask builds a task graph of the planned computation but doesn't execute it until you explicitly request a result with a method like `.compute()`.

#### Example: Analyzing a Large CSV

Dask comes with some sample data. Let's use `dask.datasets` to create a large, synthetic dataset of time-series data.

In [3]:
import dask.dataframe as dd

# Create a sample Dask DataFrame
# This represents a much larger dataset than can fit in memory.
ddf = dd.demo.make_timeseries(
    '2000-01-01', '2000-12-31',
    freq='1s', partition_freq='1M', dtypes={'x': float, 'y': float, 'id': int}
)

# Notice that printing the DataFrame doesn't show data.
# It shows the structure: columns, types, and number of partitions.
ddf

  return pd.date_range(start=self.start, end=self.end, freq=self.partition_freq)


Unnamed: 0_level_0,x,y,id
npartitions=11,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2000-01-31,float64,float64,int64
2000-02-29,...,...,...
...,...,...,...
2000-11-30,...,...,...
2000-12-31,...,...,...


Let's perform a standard pandas-like operation: group by an ID and find the mean of one of the columns. Notice that this operation executes instantly.

In [4]:
# This is a lazy operation. No computation is done yet.
mean_x_by_id = ddf.groupby('id').x.mean()
mean_x_by_id

Dask Series Structure:
npartitions=1
    float64
        ...
Dask Name: getitem, 5 expressions
Expr=((ArrowStringConversion(frame=Timeseries(75bb8ad))[['x', 'id']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='x'))['x']

To trigger the computation, we call `.compute()`. Now, watch your Dask dashboard. You will see the task graph being executed in parallel across all your CPU cores.

In [5]:
# This triggers the actual computation.
result = mean_x_by_id.compute()

print(result.head())

  return pd.date_range(start=self.start, end=self.end, freq=self.partition_freq)


id
852   -0.048936
853    0.038775
855   -0.123706
856    0.227323
859   -0.069386
Name: x, dtype: float64


### Dask Arrays: Parallelizing NumPy

Similarly, a Dask Array is composed of many smaller NumPy arrays, called "chunks." It supports a large subset of the NumPy API.

Let's create a large random Dask array and perform some standard operations.

In [6]:
import dask.array as da

# Create a 20000x20000 array of random numbers.
# This would be ~3.2 GB, potentially too large for some machines.
# Dask handles it by chunking it into smaller NumPy arrays.
x = da.random.random((20000, 20000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Dask graph,400 chunks in 1 graph layer,400 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.98 GiB 7.63 MiB Shape (20000, 20000) (1000, 1000) Dask graph 400 chunks in 1 graph layer Data type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Dask graph,400 chunks in 1 graph layer,400 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Now, let's perform a computation, like taking the mean along an axis. Again, this is lazy.

In [7]:
# A lazy operation
y = x.mean(axis=0)
y

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,7.81 kiB
Shape,"(20000,)","(1000,)"
Dask graph,20 chunks in 5 graph layers,20 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 156.25 kiB 7.81 kiB Shape (20000,) (1000,) Dask graph 20 chunks in 5 graph layers Data type float64 numpy.ndarray",20000  1,

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,7.81 kiB
Shape,"(20000,)","(1000,)"
Dask graph,20 chunks in 5 graph layers,20 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


And now, trigger the computation with `.compute()` and watch the dashboard.

In [8]:
result_array = y.compute()
print(result_array.shape)
print(result_array[:10])

(20000,)
[0.49586515 0.49565619 0.50097809 0.50395828 0.50030513 0.5033218
 0.50137784 0.50041652 0.49943043 0.49847764]


## Conclusion: When to Use Dask

Dask is the right tool when your data is large and your computations are parallelizable. It excels when:

- **Working with Large Datasets:** Your data does not fit in RAM, and you need to perform pandas- or NumPy-like operations on it.
- **Leveraging Multi-Core CPUs:** You have computationally intensive workflows that can be sped up by running them in parallel across multiple cores.
- **Scaling to a Cluster:** While we have used a local cluster, Dask is designed to scale seamlessly to large HPC or cloud-based clusters with minimal code changes.

By providing scalable versions of familiar tools, Dask empowers economists to tackle larger and more complex empirical problems without straying from the Python ecosystem. It is a foundational tool for modern, data-intensive research.

In [9]:
# Don't forget to close the client to release resources
client.close()