In [1]:
# Cell 1: Project Setup and Imports
# Description: Import necessary libraries for data manipulation, Dask, machine learning, and utilities.

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression as DaskLinearRegression
from dask_ml.preprocessing import StandardScaler as DaskStandardScaler

import pandas as pd
import numpy as np
import os
import shutil
import time
import matplotlib.pyplot as plt
import seaborn as sns
import logging  # <<< Add this import

# Ensure reproducibility for data generation
np.random.seed(42)

print(f"Dask version: {dask.__version__}")

Dask version: 2025.5.1


In [2]:
# Cell 2: Dask Configuration and Client Initialization
# Description: Set up a Dask LocalCluster and Client for distributed computation.
# This allows you to monitor computations via the Dask dashboard.

# Close existing clusters/clients if any, to ensure a fresh start
try:
    if "client" in locals() and client:  # Check if client exists and is not None
        client.close()
    if "cluster" in locals() and cluster:  # Check if cluster exists and is not None
        cluster.close()
    print("Closed existing Dask client and cluster.")
except NameError:
    print("No existing Dask client/cluster to close (NameError).")
except Exception as e:
    print(f"An error occurred while closing existing Dask setup: {e}")


# Option 1: Let Dask manage the number of workers and threads (typically based on your CPU cores)
cluster = LocalCluster(silence_logs=logging.ERROR)  # <<< Corrected line
# Option 2: Explicitly set workers and threads (example: 4 workers, 2 threads each)
# cluster = LocalCluster(n_workers=4, threads_per_worker=2, silence_logs=logging.ERROR) # <<< Corrected line

client = Client(cluster)
print(f"Dask Client created: {client}")
print(f"Dask Dashboard link: {client.dashboard_link}")

# You can access the dashboard by navigating to the link printed above in your web browser.

Closed existing Dask client and cluster.
Dask Client created: <Client: 'tcp://127.0.0.1:58355' processes=7 threads=14, memory=48.00 GiB>
Dask Dashboard link: http://127.0.0.1:8787/status


In [3]:
DATA_DIR = "dask_project_data"
N_FILES = 5  # Number of CSV files to generate
N_ROWS_PER_FILE = 6_000_000  # Number of rows per file
N_FEATURES = 10  # Number of features (excluding the target)
TARGET_NOISE_LEVEL = 0.1  # Noise level for the target variable

In [None]:
# Cell 3: Data Generation
# Description: Generate a moderately large dataset (aiming for ~5GB).
# The data will be tabular, with numerical features and a target variable for regression.
# It will be saved as multiple CSV files, which Dask handles well.

DATA_DIR = "dask_project_data"
N_FILES = 5  # Number of CSV files to generate
N_ROWS_PER_FILE = 6_000_000  # Number of rows per file
N_FEATURES = 10  # Number of features (excluding the target)
TARGET_NOISE_LEVEL = 0.1  # Noise level for the target variable

# Total rows = N_FILES * N_ROWS_PER_FILE
# Estimated size: (N_FEATURES + 1 target) * 8 bytes/float * Total_rows
# (10 + 1) * 8 bytes/float * (10 * 6,000,000) rows = 11 * 8 * 60,000,000 bytes
# = 88 * 60,000,000 bytes = 5,280,000,000 bytes
# 5,280,000,000 / (1024**3)  ~= 4.9 GB. This fits the requirement.


def generate_data_chunk(num_rows, num_features, file_idx):
    """Generates a chunk of data and saves it to a CSV file."""
    data = pd.DataFrame(
        np.random.rand(num_rows, num_features),
        columns=[f"feature_{i}" for i in range(num_features)],
    )

    # Create a target variable based on a linear combination of features plus noise
    # Example: target = 0.5*f0 + 1.2*f1 - 0.8*f2 + ... + noise
    coeffs = np.random.rand(num_features) * 2 - 1  # Coefficients between -1 and 1
    true_signal = np.zeros(num_rows)
    for i in range(num_features):
        true_signal += coeffs[i] * data[f"feature_{i}"]

    noise = TARGET_NOISE_LEVEL * np.random.randn(num_rows)
    data["target"] = true_signal + noise

    filepath = os.path.join(DATA_DIR, f"data_part_{file_idx:02d}.csv")
    data.to_csv(filepath, index=False)
    return filepath


if os.path.exists(DATA_DIR):
    print(
        f"Data directory '{DATA_DIR}' already exists. Skipping generation if files are present."
    )
    # Check if files exist, if not, create dir and generate
    expected_files = [
        os.path.join(DATA_DIR, f"data_part_{i:02d}.csv") for i in range(N_FILES)
    ]
    if all(os.path.exists(f) for f in expected_files):
        print(f"{N_FILES} data files found. Using existing data.")
    else:
        shutil.rmtree(DATA_DIR)  # Clean up incomplete data
        os.makedirs(DATA_DIR, exist_ok=True)
        print(f"Generating {N_FILES} data files in '{DATA_DIR}'...")
        for i in range(N_FILES):
            path = generate_data_chunk(N_ROWS_PER_FILE, N_FEATURES, i)
        print(f"Data generation complete.")
else:
    os.makedirs(DATA_DIR, exist_ok=True)
    print(f"Generating {N_FILES} data files in '{DATA_DIR}'...")
    for i in range(N_FILES):
        path = generate_data_chunk(N_ROWS_PER_FILE, N_FEATURES, i)
        print(f"Generated {path}")
    print(f"Data generation complete")

Generating 5 data files in 'dask_project_data'...
Generated dask_project_data/data_part_00.csv


In [5]:
file_pattern = os.path.join(DATA_DIR, "data_part_*.csv")

In [6]:
# Cell 4: Define Data Loading and Preprocessing Pipeline
# Description: Load data using Dask DataFrame and apply transformations.
# Transformations:
# 1. Create an interaction feature: feature_A = feature_0 * feature_1
# 2. Create a polynomial feature: feature_B = feature_2 ** 2
# 3. Scale all features using DaskStandardScaler.

import dask.array as da  # Ensure dask.array is imported


def load_and_preprocess_data(file_pattern):
    """Loads and preprocesses the data using Dask."""
    ddf = dd.read_csv(file_pattern)

    print(f"Number of partitions in loaded ddf: {ddf.npartitions}")
    # print("Sample of loaded data (ddf.head()):")
    # print(ddf.head())

    # Transformation 1: Interaction feature
    ddf["interaction_feature"] = ddf["feature_0"] * ddf["feature_1"]

    # Transformation 2: Polynomial feature
    ddf["polynomial_feature"] = ddf["feature_2"] ** 2

    feature_columns = [col for col in ddf.columns if col != "target"]
    X_df = ddf[feature_columns]  # This is a Dask DataFrame
    y_series = ddf["target"]  # This is a Dask Series

    # Transformation 3: Scale features
    scaler = DaskStandardScaler()
    X_scaled_da = scaler.fit_transform(X_df)  # This returns a Dask Array

    # Convert y_series (Dask Series) to a Dask Array
    # lengths=True can be more efficient if partition lengths are known/computable
    # For a single series, just .to_dask_array() is often sufficient.
    y_da = y_series.to_dask_array(lengths=True)

    # Ensure y_da is 1D (e.g., (n_samples,)) not (n_samples, 1))
    if y_da.ndim > 1 and y_da.shape[1] == 1:
        y_da = y_da.ravel()  # Or y_da.squeeze() if you are sure it's (N,1)
    elif y_da.ndim == 0:  # Handle scalar case if it somehow occurs
        y_da = y_da.reshape(
            1,
        )

    print(f"Type of X returned by load_and_preprocess_data: {type(X_scaled_da)}")
    if isinstance(X_scaled_da, da.Array):
        print(f"   X dtype: {X_scaled_da.dtype}, X chunks: {X_scaled_da.chunks}")
    print(f"Type of y returned by load_and_preprocess_data: {type(y_da)}")
    if isinstance(y_da, da.Array):
        print(f"   y dtype: {y_da.dtype}, y chunks: {y_da.chunks}")

    return X_scaled_da, y_da, feature_columns

In [7]:
# Cell 5: Define Machine Learning Task (Regression)
# Description: Perform a regression task using Dask-ML.
# This includes splitting data, training a model, making predictions, and evaluating.


def run_ml_task(X, y):
    """Runs the machine learning regression task."""
    # X and y are expected to be Dask Arrays from load_and_preprocess_data

    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, shuffle=True, convert_mixed_types=True
    )

    # Initialize and train the Dask-ML Linear Regression model
    model = DaskLinearRegression()

    print("Starting model training...")
    print(f"Type of X_train before fit: {type(X_train)}")
    if isinstance(X_train, da.Array):
        print(f"   X_train dtype: {X_train.dtype}, X_train chunks: {X_train.chunks}")
    elif isinstance(X_train, dd.DataFrame):
        print(f"   X_train is unexpectedly a Dask DataFrame!")

    print(f"Type of y_train before fit: {type(y_train)}")
    if isinstance(y_train, da.Array):
        print(f"   y_train dtype: {y_train.dtype}, y_train chunks: {y_train.chunks}")
    elif isinstance(y_train, dd.Series):
        print(f"   y_train is unexpectedly a Dask Series!")

    # .fit() on Dask collections will trigger computations
    model.fit(X_train, y_train)
    print("Model training complete.")

    # Make predictions
    print("Making predictions...")
    model.predict(X_test)

    return model

In [None]:
# Cell 6: Sequential Benchmark (Dask with Single Worker/Thread for Dashboard)
# Description: Run the full pipeline using Dask's distributed scheduler
# configured with a single worker and single thread to establish a baseline
# while still allowing dashboard access.

print("\n--- Starting Sequential Benchmark (Single Worker/Thread for Dashboard) ---")

# Ensure any existing Dask client/cluster is shut down
# This is important to avoid conflicts with the new sequential_cluster
try:
    if "client" in locals() and client and client.status != "closed":
        client.close()
        print("Closed existing global Dask client.")
    if "cluster" in locals() and cluster and cluster.status != "closed":
        cluster.close()
        print("Closed existing global Dask cluster.")
except NameError:
    pass  # No client/cluster was active
except Exception as e:
    print(f"Error closing pre-existing Dask client/cluster: {e}")

# Configure and start a Dask LocalCluster for sequential-like execution
# This cluster will have 1 worker and 1 thread per worker.
sequential_cluster = None
sequential_client = None
try:
    sequential_cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=1,
        silence_logs=logging.ERROR,  # Make sure 'logging' is imported
    )
    sequential_client = Client(sequential_cluster)
    print(f"Sequential Dask Client (1 worker, 1 thread) created: {sequential_client}")
    print(f"Dashboard for sequential run: {sequential_client.dashboard_link}")
    print("You can open this dashboard link to observe the 'sequential' execution.")

    start_time_seq = time.time()

    # Run the full pipeline
    print("Loading and preprocessing data (sequentially via single worker)...")
    # Pass the client if your functions are designed to use an explicit client,
    # otherwise, Dask will use the active global client (sequential_client here).
    X_seq, y_seq, feature_cols_seq = load_and_preprocess_data(file_pattern)

    print("Running ML task (sequentially via single worker)...")
    model_seq = run_ml_task(X_seq, y_seq)

    end_time_seq = time.time()
    sequential_duration = end_time_seq - start_time_seq

    print(f"\nSequential Benchmark Complete.")
    print(
        f"Total time taken (Dask single worker/thread): {sequential_duration:.2f} seconds"
    )

except Exception as e:
    print(f"An error occurred during the sequential benchmark: {e}")
    # Ensure sequential_duration is set if an error occurs before its calculation
    if "start_time_seq" in locals():
        sequential_duration = time.time() - start_time_seq
    else:
        sequential_duration = -1  # Indicate an error or that it didn't run

finally:
    # IMPORTANT: Shut down the sequential client and cluster
    if sequential_client:
        try:
            sequential_client.close()
            print("Sequential Dask client closed.")
        except Exception as e:
            print(f"Error closing sequential Dask client: {e}")
    if sequential_cluster:
        try:
            sequential_cluster.close()
            print("Sequential Dask cluster closed.")
        except Exception as e:
            print(f"Error closing sequential Dask cluster: {e}")

# Reset Dask configuration to a sensible default for any operations
# that might not explicitly use a client later, though Cell 7 (Parallel Benchmark)
# will create its own specific cluster and client.
dask.config.set(scheduler="threads")  # Default for many Dask collections if no client
print("Dask config reset to 'threads' scheduler as a general default.")

In [None]:
# Cell 7: Parallel Benchmark (Dask with LocalCluster)
# Description: Run the full pipeline using a Dask LocalCluster for parallel execution.
# Compare its performance against the sequential benchmark.

print("\n--- Starting Parallel Benchmark ---")

# Re-initialize Dask client and cluster for parallel execution
try:
    if "client" in locals() and client:
        client.close()
    if "cluster" in locals() and cluster:
        cluster.close()
except NameError:
    pass
except Exception as e:
    print(f"Error closing Dask client/cluster: {e}")


# Use the cluster settings from Cell 2 or define new ones
# cluster = LocalCluster(n_workers=4, threads_per_worker=2, silence_logs=logging.ERROR) # <<< Corrected line
cluster = LocalCluster(
    silence_logs=logging.ERROR
)  # <<< Corrected line (Or default, as in Cell 2)
client = Client(cluster)
print(f"Dask Client for parallel run: {client}")
print(f"Dashboard link for parallel run: {client.dashboard_link}")
print("Please open the Dask Dashboard to observe the parallel execution.")

start_time_parallel = time.time()

# Run the full pipeline
print("Loading and preprocessing data (in parallel)...")
X_parallel, y_parallel, feature_cols_parallel = load_and_preprocess_data(file_pattern)

print("Running ML task (in parallel)...")
model_parallel = run_ml_task(X_parallel, y_parallel)

end_time_parallel = time.time()
parallel_duration = end_time_parallel - start_time_parallel

print(f"\nParallel Benchmark Complete.")
print(f"Total time taken (parallel Dask): {parallel_duration:.2f} seconds")

# Performance Comparison
print("\n--- Performance Comparison ---")
print(f"Sequential execution time: {sequential_duration:.2f} seconds")
print(f"Parallel execution time: {parallel_duration:.2f} seconds")
if (
    parallel_duration > 0 and sequential_duration > 0
):  # ensure sequential_duration is also positive
    speedup = sequential_duration / parallel_duration
    print(f"Speedup: {speedup:.2f}x")
else:
    print(
        "Parallel or sequential execution was too fast, zero, or an error occurred, cannot calculate speedup."
    )

# Plotting the results
times = [sequential_duration, parallel_duration]
labels = ["Sequential (Dask Single-Threaded)", "Parallel (Dask LocalCluster)"]

if sequential_duration > 0 and parallel_duration > 0:  # Only plot if times are valid
    plt.figure(figsize=(8, 6))
    sns.barplot(x=labels, y=times)
    plt.title("Benchmark: Sequential vs. Parallel Execution Time")
    plt.ylabel("Time (seconds)")
    plt.xticks(rotation=15, ha="right")
    plt.tight_layout()
    plt.show()
else:
    print("Skipping plot due to invalid execution times.")

In [None]:
# Cell 8: Dask Dashboard Analysis and Task Graph Visualization (Guidance)
# Description: This cell provides guidance on what to analyze using the Dask dashboard
# and how to interpret task graphs. You should take screenshots from your dashboard
# during the parallel benchmark run for your report.

# --- Guidance for your Report ---

# 1. Task Execution Timeline (Stream Plot on Dashboard):
#    - Observe how tasks are distributed across workers and threads.
#    - Look for periods of high activity (many tasks running in parallel) vs. idle time.
#    - Identify which stages of your pipeline (e.g., read_csv, feature engineering, scaling, model fitting)
#      correspond to different patterns on the timeline.
#    - Colors on the stream plot often indicate different types of operations (e.g., I/O, compute).

# 2. Worker Load Balance (Workers Tab -> Memory, CPU, Network):
#    - Check if memory usage is evenly distributed or if some workers are overburdened (potential for OOM errors).
#    - Monitor CPU utilization across workers. Are all workers actively contributing?
#    - Observe network traffic, especially during data loading and shuffling phases.
#    - Imbalances can indicate poor data partitioning or tasks that are hard to parallelize.

# 3. Graph Structure (Graph Tab or from `.visualize()`):
#    - Examine the Dask task graph. It shows the dependencies between tasks.
#    - Large, wide graphs indicate high potential for parallelism.
#    - Long, narrow "bottleneck" chains in the graph limit parallelism. These are critical paths.
#    - The complexity of the graph (number of nodes and edges) gives an idea of Dask's overhead in managing tasks.
#    - Example: (You can run this for specific Dask collections to see their graph)
#      if 'X_parallel' in locals(): # check if X_parallel exists
#          X_parallel.visualize(filename='X_parallel_graph.png') # Requires graphviz
#          print("Task graph for X_parallel saved to 'X_parallel_graph.png'")
#          # Or view a small part if the graph is too big:
#          # (X_parallel.feature_0 + X_parallel.feature_1).visualize()

# To take screenshots:
# - Run the parallel benchmark (Cell 7).
# - While it's running, open the Dask dashboard (link provided when client starts).
# - Navigate to relevant tabs (Status, Workers, Graph, Profile) and capture images.
# - Focus on periods where significant computation is happening.

# You would include these screenshots in your report and discuss your observations.

In [None]:
# Cell 9: Experimenting with Dask Task Graph Design (Guidance & Example)
# (Ensure client and cluster from Cell 7 are active and healthy, or restart them)

# ... (ensure client is set up as in Cell 7) ...
# if 'client' not in locals() or not client or client.status == 'closed':
#     print("Client not running or closed. Restarting client and cluster for experiment.")
#     if 'cluster' in locals() and cluster:
#         try: cluster.close()
#         except Exception: pass
#     cluster = LocalCluster(silence_logs=logging.ERROR)
#     client = Client(cluster)
#     print(f"Re-initialized Dask Client: {client}")
#     print(f"Dashboard: {client.dashboard_link}")


def pipeline_with_strategic_persist(file_pattern):  # Renamed for clarity
    print("Running pipeline with strategic .persist()...")
    start_time = time.time()

    ddf_loaded = dd.read_csv(file_pattern)  # Load fresh

    # Initial transformations (these are lazy)
    ddf_transformed = ddf_loaded.assign(
        interaction_feature=ddf_loaded["feature_0"] * ddf_loaded["feature_1"],
        polynomial_feature=ddf_loaded["feature_2"] ** 2,
    )

    feature_columns = [
        col
        for col in ddf_transformed.columns
        if col != "target" and col in ddf_loaded.columns
    ]  # Original features
    feature_columns.extend(
        ["interaction_feature", "polynomial_feature"]
    )  # Add new ones

    X_for_scaling = ddf_transformed[feature_columns]
    y_target = ddf_transformed["target"]  # This is a Dask Series

    scaler = DaskStandardScaler()
    X_scaled_lazy = scaler.fit_transform(X_for_scaling)  # X_scaled_lazy is a Dask Array
    y_target_lazy_array = y_target.to_dask_array(
        lengths=True
    )  # Convert y to Dask Array
    if y_target_lazy_array.ndim > 1:
        y_target_lazy_array = y_target_lazy_array.ravel()

    # --- Strategic Persist ---
    # Persist only the fully preprocessed X and y that will feed into ML
    print("Persisting scaled X and target y array before ML...")
    X_persisted = X_scaled_lazy.persist()
    y_persisted = y_target_lazy_array.persist()

    # IMPORTANT: Do NOT use dask.compute(X_persisted, y_persisted) here if you just want them on workers.
    # .persist() itself returns Dask collections that point to data held on workers.
    # A small operation can ensure scheduling starts if desired for observation,
    # or use dask.distributed.wait if precise control over completion is needed.
    # For this experiment, just letting downstream tasks trigger use of persisted data is fine.
    print("X_scaled and y_target_array scheduled for persistence.")

    # Proceed with ML using the persisted collections
    X_train, X_test, y_train, y_test = train_test_split(
        X_persisted,
        y_persisted,
        test_size=0.2,
        random_state=42,
        shuffle=True,
        convert_mixed_types=True,
    )

    model = DaskLinearRegression()
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)

    duration = time.time() - start_time
    print(f"Pipeline with strategic persist finished in: {duration:.2f} seconds")
    return duration


# Run the modified pipeline experiment
# Ensure client from Cell 7 is active, or re-initialize one (see commented code above)
if "client" not in locals() or client.status == "closed":
    print(
        "Dask client not active. Please ensure Cell 7's client is running or re-initialize."
    )
    # Handle error or re-initialize client here for robustness
else:
    duration_persist_exp = pipeline_with_strategic_persist(file_pattern)

    if "parallel_duration" in locals() and parallel_duration > 0:
        print(f"Original parallel duration (from Cell 7): {parallel_duration:.2f}s")
        print(f"Pipeline with strategic persist duration: {duration_persist_exp:.2f}s")
    else:
        print("Original parallel duration not available for comparison.")
        print(f"Pipeline with strategic persist duration: {duration_persist_exp:.2f}s")

In [None]:
# Cell 10: Report Guidance (Placeholders for Your Text)
# Description: Key points to cover in your 1-2 page report based on the project requirements.
# Fill this out in a separate Markdown document or text file.

# --- Report Structure Guidance ---

# ## 1. Processing Task and Dataset
#    - **Dataset Description:**
#      - Type: Generated tabular data.
#      - Size: Approx. 5 GB (N_FILES files, N_ROWS_PER_FILE rows/file, N_FEATURES features + 1 target).
#      - Features: Numerical, randomly generated (`feature_0` to `feature_N-1`).
#      - Target: Numerical, derived from features with added noise (`target`).
#      - Generation method: `numpy.random` for features, linear combination + noise for target, saved as multiple CSVs.
#    - **Processing Task:**
#      - Goal: Perform a regression analysis to predict the target variable.
#      - Transformations:
#        1. Creation of an interaction feature (`feature_0 * feature_1`).
#        2. Creation of a polynomial feature (`feature_2 ** 2`).
#        3. Feature scaling using `DaskStandardScaler`.
#      - Machine Learning:
#        - `dask_ml.model_selection.train_test_split` for splitting data.
#        - `dask_ml.linear_model.LinearRegression` for model training.
#        - Evaluation using Mean Squared Error (`dask_ml.metrics.mean_squared_error`).

# ## 2. Parallelism Implementation
#    - **Where:**
#      - Data Loading: `dask.dataframe.read_csv` reads multiple CSVs in parallel (one task per file/block).
#      - Transformations: Element-wise operations on Dask DataFrames (e.g., `ddf['col1'] * ddf['col2']`) are parallelized per partition.
#      - Feature Scaling: `DaskStandardScaler` computes statistics (mean, std) in parallel across partitions and then applies scaling per partition.
#      - Train/Test Split: `dask_ml.model_selection.train_test_split` operates on Dask collections, preserving parallelism.
#      - Model Training (`LinearRegression`): Dask-ML's Linear Regression can use parallel algorithms (e.g., direct methods involving matrix operations on Dask arrays, or iterative methods like Tall-Skinny QR that are designed for parallelism).
#      - Prediction & Evaluation: Also performed in parallel on Dask collections.
#    - **How:**
#      - Dask DataFrames: Core data structure, automatically parallelizes operations across partitions.
#      - Dask Arrays: Output of scaler and input to ML model, also support parallel operations.
#      - Dask-ML: Provides ML algorithms that integrate with Dask collections.
#      - Dask Distributed Scheduler: `Client(LocalCluster(...))` used to manage and distribute tasks to multiple workers/threads on a local machine.
#      - Lazy Evaluation: Dask builds a task graph, and computations are only triggered by `.compute()`, `.persist()`, or when a Dask-ML model's `.fit()` or `.predict()` method requires results.

# ## 3. Sequential vs. Parallel Performance
#    - **Include Numbers/Graphs:**
#      - Present the table/bar chart comparing `sequential_duration` and `parallel_duration`.
#      - Report the speedup factor (`sequential_duration / parallel_duration`).
#    - **Analysis:**
#      - Discuss why the parallel version is faster (or if not, why not).
#      - Relate to the Dask dashboard observations: e.g., "The dashboard showed tasks for reading CSVs and applying transformations running concurrently across N workers, leading to the observed speedup in the data preprocessing phase."

# ## 4. Amdahl's Law Discussion
#    - **Concept:** Amdahl's Law states that the maximum speedup achievable by parallelizing a task is limited by its sequential portion. Speedup = 1 / ((1-P) + P/S), where P is the proportion of the program that can be parallelized, and S is the speedup of that portion.
#    - **Application to Your Pipeline:**
#      - **Inherently Sequential Parts (Bottlenecks):**
#        - Initial script setup, Python interpreter overhead (minor).
#        - Certain operations in `StandardScaler` (e.g., combining statistics from all partitions might have a sequential component).
#        - Some parts of specific ML algorithms might not be perfectly parallelizable (e.g., final aggregation steps).
#        - Communication overhead: Moving data between workers (serialization, network if not local cluster). This is not computation but adds to time.
#        - Dask scheduler overhead: Managing the task graph.
#        - Final result gathering: If a single result (like MSE) is computed and brought back to the client, this is a point of synchronization.
#        - Reading metadata or very small files can sometimes be dominated by overhead rather than parallel I/O benefits.
#      - **How these limit scalability:** Even if 95% of your workload is perfectly parallelizable, the 5% sequential part means you can't get more than a 20x speedup, regardless of how many cores you add.
#      - "In my pipeline, while data loading and most transformations are highly parallel, the `StandardScaler.fit()` involves computing global statistics (mean, std dev) which requires an aggregation step. This aggregation, while parallelized by Dask, still forms a synchronization point that could be a bottleneck according to Amdahl's Law. Similarly, the final steps of the linear regression solver might have components that are not perfectly scalable."

# ## 5. Task Graph Analysis and Reflection
#    - **Structure Observations:**
#      - Describe the general shape of your task graphs (e.g., from `ddf.visualize()` or dashboard).
#        "The graph for data loading and initial transformations showed a wide structure, with parallel chains of tasks for each partition/file, followed by merging points for operations like `ddf.एमइआरजीइ` (if used) or aggregations."
#      - "The `StandardScaler().fit_transform()` likely showed a graph where initial partition-wise computations fed into an aggregation phase for statistics, then back to partition-wise scaling."
#      - "The ML fit graph would be more complex, representing the algorithm's steps."
#    - **Influence of Changes (e.g., from Cell 9 experiments):**
#      - **`.persist()`:** "Using `.persist()` on intermediate Dask DataFrames/Arrays altered the graph execution. It added explicit tasks for computing and storing results in worker memory. On the dashboard, this appeared as a distinct computation phase. Subsequent operations on persisted data might show simpler upstream graphs as they read directly from memory, potentially speeding them up if the persisted data is accessed multiple times or if recomputing it is expensive. However, the act of persisting itself takes time and memory."
#      - **Repartitioning (if experimented with):** "Changing the number of partitions (e.g., `ddf.repartition()`) would directly alter the width of the graph. More partitions could increase parallelism but also scheduler overhead and task size granularity."
#      - "Breaking fusion by inserting `.compute()` calls would make the graph more fragmented, with Dask unable to optimize operations as effectively."

# ## 6. Development Process and Key Challenges
#    - **Your Process:**
#      - "I started by defining the data generation, then implemented the pipeline steps sequentially in concept. Then translated these to Dask API calls."
#      - "Benchmarking was done iteratively: first sequential, then parallel, then experiments."
#    - **Key Challenges:**
#      - **Understanding Lazy Evaluation:** "Initially, it was tricky to remember that Dask operations don't execute immediately, and results need a `.compute()` or similar trigger. This affects debugging and reasoning about execution flow."
#      - **Memory Management:** "With a 5-10GB dataset, ensuring that intermediate computations or persisted data fit into worker memory was a concern. The Dask dashboard was crucial for monitoring memory usage. Naively persisting too much data could lead to 'spilling to disk' or out-of-memory errors."
#      - **Interpreting the Dashboard:** "While powerful, correlating dashboard visualizations precisely with specific lines of code or pipeline stages took some practice."
#      - **Debugging:** "Debugging errors in a distributed Dask computation can be more complex than in standard Python, as stack traces might originate from workers and involve Dask's internal scheduling."
#      - **Choosing Optimal Partitions:** "Deciding on the right number of partitions can be non-trivial. Too few limits parallelism, too many increases overhead."
#      - **Data Generation Time:** "Generating the ~5GB dataset itself took a noticeable amount of time, which is a practical consideration for pipeline development."
#      - (If applicable) "Setting up the Dask environment or specific library versions."

In [None]:
# Cell 11: Cleanup (Optional)
# Description: Delete the generated data directory to free up space.


def cleanup_generated_data():
    if os.path.exists(DATA_DIR):
        try:
            shutil.rmtree(DATA_DIR)
            print(f"Successfully removed data directory: {DATA_DIR}")
        except Exception as e:
            print(f"Error removing data directory {DATA_DIR}: {e}")
    else:
        print(f"Data directory {DATA_DIR} not found. No cleanup needed.")


# Uncomment the line below to run the cleanup function
# cleanup_generated_data()

# Final shutdown of Dask client and cluster
try:
    client.close()
    cluster.close()
    print("Dask client and cluster shut down.")
except NameError:
    print("No active Dask client/cluster to shut down.")
except Exception as e:
    print(f"Error during final Dask shutdown: {e}")