# Introduction to Dask and Parquet Workshop


**What you will learn:**
- What is Dask and why use it for large datasets?
- Core techniques including lazy evaluation and using `.compute()`
- How to work with CSV and Parquet files for efficient data handling
- Visualizing Dask’s computation graph with `.visualize()`
- The "start small, go big" approach for scaling analyses
- Hands-on challenges to solidify your understanding


## What is Dask?

**Dask** is a Python library that enables parallel computing and out-of-core processing by splitting large datasets into smaller partitions. Its API mirrors that of Pandas, which makes it easier to scale your workflows without learning a completely new system.

Below is a simple example that converts a small Pandas DataFrame into a Dask DataFrame

Lazy Evaluation and `.compute()`**

### Lazy Evaluation and `.compute()`

Dask employs **lazy evaluation**. This means that when you perform operations on a Dask DataFrame, it builds a task graph of computations without immediately executing them. Actual computation only happens when you call `.compute()`.

For example, consider calculating the mean of a column:



In [None]:
import pandas as pd
import dask.dataframe as dd

# Create a small Pandas DataFrame
small_df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [10, 20, 30]})
# Convert it to a Dask DataFrame with 1 partition (since it's tiny)
ddf = dd.from_pandas(small_df, npartitions=1)

# Perform an operation on the Dask DataFrame (lazy, no computation yet)
result = ddf['col2'].mean()
print("Dask result (lazy):", result)         # This is a lazy Dask Scalar, not a real number yet

# Now actually compute the result
real_result = result.compute()
print("Computed result:", real_result)


## What Are Partitions in Dask?

In Dask, **partitions** are smaller chunks of your larger dataset. Think of them as pieces of a large puzzle:

- **Division of Data:** When you load a dataset (for example, from a CSV or a Parquet file) into a Dask DataFrame, Dask splits the data into multiple partitions. Each partition is essentially a smaller Pandas DataFrame that holds just a subset of the data.
- **Parallel Processing:** Because each partition is independent, Dask can process them in parallel. This means that operations on your DataFrame (like filtering, aggregating, etc.) can be executed on each partition at the same time, making the process faster.
- **Memory Efficiency:** Working with many partitions allows you to process datasets that are larger than your machine's memory. Only one partition (or a few) is loaded at any moment, helping to avoid memory overload.
- **Lazy Evaluation:** Dask builds a task graph based on these partitions. For instance, when you call an operation like `.mean()`, Dask plans to compute the mean on each partition first, and then combine the results. The whole process is done lazily (i.e., it won't compute until you explicitly trigger it with `.compute()` or similar).

**In summary:**  
Partitions are Dask's way of breaking down a large dataset into smaller, manageable chunks that can be processed in parallel. This approach is what makes Dask so powerful for handling big data with a familiar, Pandas-like syntax.

### In this example, we:
- Create a Dask DataFrame with **10 partitions** from a Pandas DataFrame containing 100,000 rows.
- Apply a filter to keep only rows where `value1` is positive.
- Group the filtered data by the `group` column and calculate the mean of `value1` for each group.
- Visualize the computation graph using `.visualize()`, which will show a more complex graph because it processes multiple partitions in parallel.


In [None]:
import numpy as np

# Create a sample Pandas DataFrame with 100,000 rows
N=20000
df = pd.DataFrame({
    'id': np.arange(1, N+1),
    'value1': np.random.randn(N),    # Simulated continuous measurement
    'group': np.random.choice(['A', 'B', 'C'], N)  # Simulated categorical groups
})

# Convert the Pandas DataFrame to a Dask DataFrame with 10 partitions
ddf = dd.from_pandas(df, npartitions=10)
print("Number of partitions:", ddf.npartitions)

# Filter the DataFrame to keep only rows where 'value1' is positive
filtered_ddf = ddf[ddf['value1'] > 0]

# Group the filtered data by 'group' and compute the mean of 'value1' for each group
grouped = filtered_ddf.groupby('group')['value1'].mean()

# Compute the final result (this triggers the execution of the task graph)
result = grouped.compute()
print("Mean of 'value1' for each group (after filtering):")
print(result)




### More artificial datasets

In scientific workflows, you often work with very large datasets. In this example, we simulate a large dataset by creating four CSV files, each containing 250,000 rows. These files mimic experimental batches or repeated measurements.


In [None]:
import numpy as np

n = 250_000  # number of rows per file
for i in range(4):
    start_id = i * n + 1
    end_id = (i+1) * n
    # Generate data for this chunk
    ids = np.arange(start_id, end_id + 1)  # inclusive of end
    value1 = np.random.randn(n)           # n random floats (normal distribution)
    value2 = np.random.randint(0, 100, size=n)  # n random ints between 0 and 99
    groups = np.random.choice(['A', 'B'], size=n)  # n random choices of 'A' or 'B'
    chunk_df = pd.DataFrame({
        'id': ids,
        'value1': value1,
        'value2': value2,
        'group': groups
    })
    filename = f"data_part{i}.csv"
    chunk_df.to_csv(filename, index=False)
    print(f"Wrote {filename} with {len(chunk_df)} rows")


 Quick Challenge – Summarize a Large CSV with Dask

**Challenge:** Load your simulated CSV files into a Dask DataFrame and compute summary statistics, such as:
- The mean of `value1`
- The total row count

**Steps:**
1. Use `dd.read_csv("data_part*.csv")` to load all CSV files.
2. Preview the data using `.head()`.
3. Compute the mean of `value1` and the count of `id` using `.mean()` and `.count()`, followed by `.compute()`.


In [None]:
# Load the data using Dask (reading all CSV parts)
ddf = dd.read_csv('data_part*.csv')

# Then, repartition the DataFrame to a desired number of partitions
ddf = ddf.repartition(npartitions=8)

# Peek at the data (this will print a few rows from the first partition)
print("Preview of data:\n", ddf.head())

# Compute summary statistics: mean of value1 and total row count
mean_val = ddf["value1"].mean()  # lazy Dask scalar (operation defined, not computed yet)
row_count = ddf["id"].count()    # lazy Dask scalar (this counts non-null 'id' values)

# Now trigger the computation to obtain the actual results
mean_val_result = mean_val.compute()
row_count_result = row_count.compute()

print("Mean of value1:", mean_val_result)
print("Total rows:", row_count_result)

## Converting CSV to Parquet

While CSV is a common format, **Parquet** is a more efficient columnar data storage format that is well suited for large datasets. Parquet files are typically faster to read and write, support compression, and store schema information.

In this example, we convert our CSV dataset (loaded into a Dask DataFrame) to Parquet files:


In [None]:
# Write the Dask DataFrame to Parquet files
ddf.to_parquet('data_parquet', write_index=False)

# Check the output directory
import os
print("Parquet files:", os.listdir('data_parquet'))


## Reading Parquet Files with Dask

After converting CSV files to Parquet, loading the data back is very efficient. Parquet’s embedded schema helps Dask quickly reconstruct the DataFrame.


In [None]:
# Read the Parquet files as a Dask DataFrame
ddf2 = dd.read_parquet('data_parquet')
print(type(ddf2))  # Should be a Dask DataFrame
# Check the schema and first few rows
print("Data types:", ddf2.dtypes)
print("Preview of Parquet data:\n", ddf2.head())


### Visualizing the Dask Computation Graph

One of Dask’s powerful features is the ability to **visualize its task graph**. This helps you understand how Dask schedules and performs computations in parallel.

In this example, we visualize the computation graph for calculating the mean of the `value1` column.


### Start Small, Go Big

A common strategy in data processing is to **start small**:
- Test your code on a subset of your data (using `.head()` or reading a single file) to catch errors quickly.
- Once verified, scale your analysis up to the full dataset.

This approach minimizes wasted time and avoids long computations during development. Dask’s lazy evaluation allows you to work on a small sample and later execute the entire pipeline seamlessly.

Remember: Always validate your code on a small sample before scaling up.


In [None]:
import dask.dataframe as dd

# 1. Read the CSV files into a Dask DataFrame
ddf = dd.read_csv('data_part*.csv')

# 2. Write the Dask DataFrame to Parquet files (in a new directory)
ddf.to_parquet('challenge_parquet', write_index=False)

# 3. Read back the Parquet files into a new Dask DataFrame
ddf_parquet = dd.read_parquet('challenge_parquet')

# 4. Verify the number of rows and columns match
rows_csv = ddf["id"].count().compute()
rows_parquet = ddf_parquet["id"].count().compute()
print("Rows in original CSV data:", rows_csv)
print("Rows in Parquet data:     ", rows_parquet)

cols_csv = set(ddf.columns)
cols_parquet = set(ddf_parquet.columns)
print("Columns in original CSV data:", cols_csv)
print("Columns in Parquet data:     ", cols_parquet)
print("Same number of rows in both? ", rows_csv == rows_parquet)
print("Same columns in both?        ", cols_csv == cols_parquet)
