# Module 6.3: Working with Larger Datasets & Basic Optimization

**Dataset for this module:**

We need a large dataset to demonstrate these concepts. **The NYC Taxi & Limousine Commission dataset** is a perfect real-world example. We will download one month of data, which is large enough to be slow in Pandas but manageable for this project.

In [5]:
# In 6.3.1_scaling_and_optimization.ipynb
import requests
import os

# URL for a single month of Yellow Taxi trip data (e.g., Jan 2023)
# File size is ~300-400 MB
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
filename = "nyc_taxi_jan_2023.parquet"

if not os.path.exists(filename):
    print(f"Downloading {filename}...")
    response = requests.get(url)
    if response.status_code == 200:
        with open(filename, "wb") as f:
            f.write(response.content)
        print("Download complete.")
    else:
        print(f"Failed to download file. Status code: {response.status_code}")
else:
    print(f"'{filename}' already exists.")

Downloading nyc_taxi_jan_2023.parquet...
Download complete.


# Topic 1: Pandas Memory Optimization
Before switching to other libraries, let's see how much we can optimize Pandas.

In [10]:
import pandas as pd
import numpy as np

# Load the data with Pandas to check initial memory usage
# Note: We are loading a Parquet file, which is already more efficient than CSV!
df_pandas = pd.read_parquet("nyc_taxi_jan_2023.parquet")

# Check initial memory usage
print("--- Initial Pandas Memory Usage ---")
df_pandas.info(memory_usage='deep')

# --- Strategy 1: Downcasting Numerical Columns ---
# Convert int64/float64 to smaller types if the value range allows
df_optimized = df_pandas.copy()
for col in df_optimized.select_dtypes(include=['int', 'float']).columns:
    df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='integer')
    df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')

# --- Strategy 2: Converting Object Columns to Category ---
# Best for columns with low cardinality (few unique values)
for col in ['VendorID', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type']:
    if col in df_optimized.columns:
        df_optimized[col] = df_optimized[col].astype('category')

print("\n--- Optimized Pandas Memory Usage ---")
df_optimized.info(memory_usage='deep')

# --- Strategy 3: Reading in Chunks (for files too big to load at all) ---
# This is how you would process a massive CSV file that doesn't fit in RAM.
# We'll simulate this conceptually.
# chunk_iter = pd.read_csv("massive_file.csv", chunksize=100000)
# for chunk in chunk_iter:
#     # Process each chunk here (e.g., calculate aggregations)
#     print(f"Processing a chunk of size {len(chunk)}")

--- Initial Pandas Memory Usage ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3066766 entries, 0 to 3066765
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float6

# Topic 2: Efficient Data Storage Formats (Parquet, Feather)

The format you save your data in matters.

---

### CSV
- **Type**: Text-based  
- **Pros**: Simple, universally supported  
- **Cons**: Slow to parse, large file size  

---

### Parquet
- **Type**: Binary, columnar format  
- **Pros**:  
  - Highly compressed  
  - Preserves data types  
  - Much faster to read (especially when selecting only a subset of columns)  
- **Use Case**: Industry standard for **big data**  

---

### Feather
- **Type**: Binary format  
- **Pros**:  
  - Designed for maximum **read/write speed** between Python (Pandas) and R  
- **Cons**:  
  - Not as compressed as Parquet  


In [13]:
# We already loaded from Parquet. Let's save our optimized DataFrame to compare.
# Note: You need pyarrow installed for this.

# Save to Parquet
df_optimized.to_parquet("optimized_taxi_data.parquet")

# Save to Feather
df_optimized.to_feather("optimized_taxi_data.feather")

print("\n--- File Sizes on Disk ---")
print(f"Original Parquet: {os.path.getsize('nyc_taxi_jan_2023.parquet') / 1e6:.2f} MB")
print(f"Optimized Parquet: {os.path.getsize('optimized_taxi_data.parquet') / 1e6:.2f} MB")
print(f"Optimized Feather: {os.path.getsize('optimized_taxi_data.feather') / 1e6:.2f} MB")
# To see the real difference, let's save a CSV
# df_optimized.to_csv("optimized_taxi_data.csv", index=False)
# print(f"Optimized CSV: {os.path.getsize('optimized_taxi_data.csv') / 1e6:.2f} MB") # This will be much larger!


--- File Sizes on Disk ---
Original Parquet: 47.67 MB
Optimized Parquet: 63.07 MB
Optimized Feather: 86.43 MB


# Topic 3: Dask - Parallel Computing with DataFrames

### Concept
Dask is a **parallel computing library** that scales your existing Python ecosystem.  
- A **Dask DataFrame** is a collection of smaller **Pandas DataFrames (partitions)**.  
- These partitions can be processed **in parallel** across your CPU cores.  

---

### Lazy Evaluation
- Dask builds a **"task graph"** of your operations.  
- Nothing is actually computed until you explicitly call **`.compute()`**.  


In [14]:
import dask.dataframe as dd

# Read the data with Dask. It reads it lazily.
# npartitions can be set to the number of cores you have.
ddf = dd.read_parquet("nyc_taxi_jan_2023.parquet")
print("\n--- Dask DataFrame ---")
print(ddf) # Note it doesn't show data, just the structure.

# Operations look just like Pandas, but they are lazy
mean_trip_distance = ddf['trip_distance'].mean()
print(f"\nMean trip distance (lazy object): {mean_trip_distance}")

# To get the result, we call .compute()
print("Computing mean trip distance with Dask...")
result = mean_trip_distance.compute()
print(f"Result: {result:.2f}")


--- Dask DataFrame ---
Dask DataFrame Structure:
              VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount    extra  mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee
npartitions=1                                                                                                                                                                                                                                                                                 
                 int64       datetime64[us]        datetime64[us]         float64       float64    float64             string        int64        int64        int64     float64  float64  float64    float64      float64               float64      float64              float64     float64
                   ...                  ...                   ...             ...        

# Topic 4: joblib - Parallelizing Scikit-learn

### Concept
Joblib provides the **simplest form of parallelization** in Scikit-learn.  
Many Scikit-learn algorithms and utilities have an **`n_jobs`** parameter.  

---

### n_jobs = -1
- Tells Scikit-learn to use **all available CPU cores** to parallelize the task.  

---

### How it works
- **RandomForestClassifier**:  
  Trains the different trees in the forest on **different CPU cores simultaneously**.  

- **GridSearchCV**:  
  Evaluates different hyperparameter combinations on **different cores in parallel**.  

👉 This can lead to **massive speedups** in training and model selection.  


In [15]:
# Conceptual example (no need to run a full training)
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# When training, this would use all cores to build trees in parallel:
# rf = RandomForestClassifier(n_estimators=500, n_jobs=-1)

# When tuning, this would run CV folds in parallel:
# grid_search = GridSearchCV(estimator=rf, param_grid=my_params, n_jobs=-1)
print("\n--- joblib Example ---")
print("Using 'n_jobs=-1' in Scikit-learn parallelizes training and tuning.")


--- joblib Example ---
Using 'n_jobs=-1' in Scikit-learn parallelizes training and tuning.


# Topic 5: Apache Spark (PySpark) - Conceptual Overview

### An Overview of Apache Spark (PySpark)

*   **What it is:** Apache Spark is a powerful, open-source, **distributed computing system** designed for big data processing and analytics. PySpark is the Python API for Spark.

*   **When it's needed:** Spark is the tool of choice when your data is truly **"Big Data"**—typically hundreds of gigabytes, terabytes, or even petabytes—and is stored across a **cluster of multiple machines**. It is not designed to be used on a single laptop for datasets that Dask can handle.

*   **Dask vs. Spark ("Amaze Factor" explanation):**
    *   **Scale:** Dask is designed to scale Python code (like Pandas and Scikit-learn) from a single core to multiple cores on **one machine**, or to a small/medium-sized cluster. Spark is designed from the ground up for massive, multi-machine clusters.
    *   **Ecosystem:** Dask is a lightweight, pure Python library that integrates directly with the existing Python data science ecosystem. Spark is a more heavyweight, self-contained ecosystem written in Scala (running on the JVM), which has its own libraries for dataframes, machine learning (MLlib), etc.
    *   **Use Case:** If you love the Pandas API and your data is in the 1-100 GB range, Dask is often the more natural and easier choice to scale your work on your local machine or a single server. If your organization's data lives in a massive, distributed data lake and you need to run complex ETL and ML jobs across a large cluster, Spark is the industry standard.

# Mini-Project: Dask on a Large CSV
Goal: Compare the performance of Pandas vs. Dask for a basic aggregation task on our large taxi dataset.

In [19]:
import time
import pandas as pd
import dask.dataframe as dd

# --- Pandas Attempt ---
print("\n--- Timing Pandas ---")
start = time.time()
df_pandas = pd.read_parquet("nyc_taxi_jan_2023.parquet")
pandas_result = df_pandas.groupby("payment_type")["tip_amount"].mean()
end = time.time()
print("Pandas groupby result:")
print(pandas_result)
print(f"Pandas execution time: {end - start:.2f} seconds")

# --- Dask Attempt ---
print("\n--- Timing Dask ---")
start = time.time()
ddf = dd.read_parquet("nyc_taxi_jan_2023.parquet")
dask_aggregation = ddf.groupby("payment_type")["tip_amount"].mean()
dask_result = dask_aggregation.compute()
end = time.time()
print("Dask groupby result:")
print(dask_result)
print(f"Dask execution time: {end - start:.2f} seconds")



--- Timing Pandas ---
Pandas groupby result:
payment_type
0    3.733109
1    4.170799
2    0.001675
3    0.029469
4    0.051490
Name: tip_amount, dtype: float64
Pandas execution time: 0.85 seconds

--- Timing Dask ---
Dask groupby result:
payment_type
0    3.733109
1    4.170799
2    0.001675
3    0.029469
4    0.051490
Name: tip_amount, dtype: float64
Dask execution time: 0.38 seconds


### Mini-Project Summary: Dask vs. Pandas

In this project, we compared the performance of Pandas and Dask for a standard `groupby-mean` aggregation on a ~45.5 MB Parquet file of NYC taxi data.

**Observations:**

*   **Pandas:** Using the `time` module, we measured the time taken to load the entire Parquet file into memory and then perform the aggregation. Pandas is straightforward and fast for medium-sized datasets that fit comfortably in RAM.  
*   **Dask:** The timing with `time` captures both the creation of the lazy task graph and the actual parallel computation triggered by `.compute()`. While Dask may not always beat Pandas for ~300 MB files, it shines when scaling to larger datasets.

*Pandas execution time: 0.85 seconds*, 
*Dask execution time: 0.38 seconds*

**Key Takeaways & "Amaze Factor" Explanation:**

*   **Scalability:** Pandas handled the ~45.5 MB file fine, but it would likely fail with a `MemoryError` on a 5 GB or 50 GB dataset. Dask, by operating on partitioned chunks of the data, can seamlessly handle datasets much larger than RAM. **This is Dask’s biggest advantage: out-of-core computation.**
*   **Lazy Evaluation:** Dask builds a task graph and defers execution until `.compute()` is called. This lets it optimize the execution plan before running, reducing wasted work and enabling parallelization across cores.
*   **Trade-offs:** For smaller files, Pandas often outperforms Dask due to Dask’s task graph overhead. But as soon as datasets grow large relative to memory, Dask’s parallel and chunked computation provides a massive advantage in both speed and feasibility.  

👉 **Final Note:** This project shows not just raw performance differences, but more importantly, the *scalability mindset* — knowing when to switch from Pandas to Dask as data grows.
