In [7]:
import polars as pl
import os
import sys
import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv

# Set Polars to use a larger string cache, which can be beneficial
# for high-cardinality string columns found in large datasets.
pl.enable_string_cache()

print(f"Polars version: {pl.__version__}")

Polars version: 1.34.0


In [8]:
def analyze_and_clean_imdb(url: str, output_filename: str = "imdb_cleaned_ml_ready.parquet"):
    """
    Loads, analyzes, and cleans the large IMDB Parquet dataset efficiently
    using Polars' lazy API.

    - We fill missing numeric values with -1 (temporary)
    - We keep list columns as lists
    """

    try:
        # --- Step 1: Efficient Loading (Lazy Scan) ---
        print("1. Creating LazyFrame (scanning schema)...")
        lf = pl.scan_parquet(url)
        print("   ...Collecting schema.")
        all_columns = lf.collect_schema().names()
        print(f"   ...Schema loaded. Found {len(all_columns)} columns.")

        # --- Step 2: Automated Analysis ---
        print("\n2. Building analysis query (nulls, uniques, redundancy)...")

        def are_cols_identical(col_a: str, col_b: str) -> pl.Expr:
            """Helper expression for null-safe equality check."""
            return (
                (pl.col(col_a) == pl.col(col_b)).fill_null(False) |
                (pl.col(col_a).is_null() & pl.col(col_b).is_null())
            ).all()

        # Build a single, parallel query to get all metadata at once
        analysis_lf = lf.select(
            pl.len().alias("total_rows"),
            *[pl.col(c).null_count().alias(f"{c}_null_count") for c in all_columns],
            *[pl.col(c).n_unique().alias(f"{c}_n_unique") for c in all_columns]
        )

        print("   ...Executing analysis (1 full pass). This is the slowest step.")
        start_analysis_time = time.time()
        # .collect(engine='streaming') runs the analysis in chunks
        analysis_results_raw = analysis_lf.collect(engine='streaming')
        analysis_dict = analysis_results_raw.to_dicts()[0]
        print(f"   ...Analysis complete in {time.time() - start_analysis_time:.2f} seconds.\n")

        # --- Process Analysis Results ---
        total_rows = analysis_dict["total_rows"]
        null_percentages = {}
        unique_counts = {}

        for col in all_columns:
            null_count = analysis_dict.get(f"{col}_null_count", 0)
            null_percentages[col] = (null_count / total_rows) * 100
            unique_counts[col] = analysis_dict.get(f"{col}_n_unique", 0)


        # --- Step 3: Build Lazy Cleaning Pipeline ---
        print("\n3. Building data cleaning and transformation pipeline...")
        lf_cleaned = lf.lazy()

        # --- 3a. Column Dropping ---
        cols_to_drop = set()
        
        # Drop columns with only one unique value (no variance)
        low_variance_cols = {c for c, u in unique_counts.items() if u <= 1}
        print(f"Low variance (1 unique value) cols: {low_variance_cols or 'None'}")
        cols_to_drop.update(low_variance_cols)

        # Drop columns that are almost entirely null
        high_null_cols = {c for c, p in null_percentages.items() if p > 95.0}
        high_null_cols.discard('endYear') # Specific to keep endYear
        print(f"High null (>95%) cols: {high_null_cols or 'None'}")
        cols_to_drop.update(high_null_cols)


        # User-specified domain drops (e.g., foreign keys, IDs)
        domain_drops = {
            'tconst', 'directors', 'writers', 'titleId',
            'tconst_1', 'nconst', 'nconst_1', 'parentTconst', 'knownForTitles'
        }
        print(f"User-specified domain-based drops: {domain_drops}")
        cols_to_drop.update(domain_drops)
        
        final_cols_to_drop = list(cols_to_drop.intersection(set(all_columns)))
        lf_cleaned = lf_cleaned.drop(final_cols_to_drop)
        cleaned_columns = [c for c in all_columns if c not in final_cols_to_drop]
        print(f"   ...Dropping {len(final_cols_to_drop)} columns.")

        # --- 3b. Missing Value Imputation & Cleaning ---
        print("   ...Applying user-specified imputations and cleaning.")
        cleaning_and_imputation_steps = []

        # Numeric columns: fill missing with -1
        numeric_cols = [
            'startYear', 'runtimeMinutes', 'seasonNumber', 'episodeNumber',
            'birthYear', 'deathYear'
        ]
        for col in numeric_cols:
            if col in cleaned_columns:
                cleaning_and_imputation_steps.append(pl.col(col).fill_null(-1))

        if 'averageRating' in cleaned_columns:
            cleaning_and_imputation_steps.append(pl.col('averageRating').fill_null(-1.0))
        if 'numVotes' in cleaned_columns:
            cleaning_and_imputation_steps.append(pl.col('numVotes').fill_null(-1))
        if 'isAdult' in cleaned_columns:
            cleaning_and_imputation_steps.append(pl.col('isAdult').fill_null(-1))

        # List columns (keep as list, clean commas)
        list_cols = ['genres', 'primaryProfession']
        # This regex keeps letters, numbers, and commas, removing other junk.
        list_junk_removal_regex = r"[^\p{L}\p{N},]"
        for col in list_cols:
            if col in cleaned_columns:
                cleaning_and_imputation_steps.append(
                    pl.col(col)
                      .str.replace_all(list_junk_removal_regex, "")
                      .str.split(',')
                      .fill_null([]) # Fill missing with an empty list
                )

        # Characters column: extract all quoted strings into a list
        if 'characters' in cleaned_columns:
            cleaning_and_imputation_steps.append(
                pl.col('characters')
                  .str.extract_all(r'"(.*?)"')
                  .fill_null([])
            )

        # General strings: strip whitespace, fill nulls, keep symbols
        general_string_cols = [
            'titleType', 'primaryTitle', 'title',
            'region', 'language', 'types', 'category', 'job'
        ]
        for col in general_string_cols:
            if col in cleaned_columns:
                cleaning_and_imputation_steps.append(
                    pl.col(col)
                      .str.strip_chars()
                      .fill_null('Unknown')
                )

        lf_cleaned = lf_cleaned.with_columns(cleaning_and_imputation_steps)

        # --- 3c. Data Type Optimization ---
        print("   ...Applying data type optimizations (casting).")
        type_casting_steps = []

        if 'isAdult' in cleaned_columns:
            type_casting_steps.append(pl.col('isAdult').cast(pl.UInt8))
        if 'isOriginalTitle' in cleaned_columns:
            type_casting_steps.append(pl.col('isOriginalTitle').cast(pl.Boolean))

        for col in numeric_cols:
            if col in cleaned_columns:
                type_casting_steps.append(pl.col(col).cast(pl.Int32))

        # Specific V14 logic for 'endYear'
        if 'endYear' in cleaned_columns:
            # Fill nulls with -1 AND cast to Int32.
            # cast(strict=False) handles weird formats like "2007.0"
            type_casting_steps.append(
                pl.col('endYear').fill_null(-1).cast(pl.Int32, strict=False)
            )
            
        if 'numVotes' in cleaned_columns:
            type_casting_steps.append(pl.col('numVotes').cast(pl.Int64))
        if 'averageRating' in cleaned_columns:
            type_casting_steps.append(pl.col('averageRating').cast(pl.Float32))

        # Convert moderate-cardinality strings to Categorical for memory savings
        current_schema = lf_cleaned.collect_schema()
        for col in cleaned_columns:
            
            # Check if it's a string AND not 'endYear' (which has special handling)
            if col != 'endYear' and col in current_schema and current_schema[col] == pl.String:
            
                # Use pre-calculated unique counts to make decisions
                if 1 < unique_counts.get(col, 10001) < 10000:
                    print(f"   ...Casting '{col}' to Categorical ({unique_counts[col]} unique).")
                    type_casting_steps.append(pl.col(col).cast(pl.Categorical))

        lf_cleaned = lf_cleaned.with_columns(type_casting_steps)

        # --- Print final schema before saving ---
        print("\nFinal schema that will be saved:")
        print(lf_cleaned.collect_schema())

        # --- Step 4: Execute Pipeline and Sink to Parquet ---
        print(f"\n4. Executing full pipeline and sinking to '{output_filename}' (streaming)...")
        start_sink_time = time.time()
        
        # This is the second and final full pass over the data.
        # It reads, cleans, and writes all in one streaming operation.
        lf_cleaned.sink_parquet(
            output_filename,
            compression="zstd",
            engine='streaming'
        )
        print(f"   ...Streaming sink complete in {time.time() - start_sink_time:.2f} seconds.")
        print("\n--- Pipeline Finished! ---")
        return True

    except Exception as e:
        print(f"\n--- An Error Occurred ---", file=sys.stderr)
        print(f"Error: {e}", file=sys.stderr)
        print("Please check the URL and your network connection.", file=sys.stderr)
        return False

## Data Pipeline Summary from above

### **Step 1: Lazy Scan (`pl.scan_parquet`)**
- **Instantaneous.**  
- Polars reads only the metadata (schema) of the Parquet file, not the data itself.  
- This allows us to plan operations on massive files (e.g., 400M or 40B rows) **without loading any data into RAM**.

---

### **Step 2: Automated Analysis (`analysis_lf.collect`)**
- **First full pass over the data.**  
- Builds a **single, complex query** to compute:
  - `total_rows`
  - `null_count` for every column  
  - `n_unique` for every column  
- Using `engine='streaming'`, Polars runs this query in **small chunks**, aggregating results efficiently.  
- The output is a small dictionary (`analysis_dict`) used to dynamically construct the cleaning plan.

---

### **Step 3: Build Lazy Cleaning Pipeline**
- **Instantaneous.** Only defines the plan — no data is processed yet.

#### **3a. Column Dropping**
Automatically removes columns based on:
- **Low Variance:** Columns with only 1 unique value  
- **High Nulls:** Columns with >95% missing values  

#### **3b. Imputation & Cleaning**
Defines business logic:
- Filling nulls  
- Splitting strings into lists  
- Extracting regex patterns  

#### **3c. Type Optimization**
- Downcasts numeric types (e.g., `Int64 → Int32`, `UInt8`) to save memory.  
- Converts string columns with medium cardinality (2–10k unique values) to **Categorical**, improving memory efficiency.

---

### **Step 4: Execute and Sink (`lf_cleaned.sink_parquet`)**
- **Second and final full pass over the data.**  
- Executes the entire optimized plan (Steps 1, 3a, 3b, and 3c).  
- With `engine='streaming'`, the **read → transform → write** process runs in chunks, keeping RAM usage minimal from start to finish.


In [None]:
# --- Configuration ---

load_dotenv("config/.env")
# This URL points to the dataset
DATASET_URL = os.getenv("IMDB_RAW_MERGED_URL")

# The final, ML-ready file that will be created
OUTPUT_FILE = "imdb_cleaned_full.parquet"

# --- Run the Pipeline ---
start_total_time = time.time()
success = analyze_and_clean_imdb(DATASET_URL, OUTPUT_FILE)

if success:
    print(f"\nSuccessfully created and saved the final, ML-ready DataFrame to '{OUTPUT_FILE}'.")
    print(f"Total pipeline time: {time.time() - start_total_time:.2f} seconds.")
else:
    print(f"\nPipeline failed. Please see error messages above.")

1. Creating LazyFrame (scanning schema)...
   ...Collecting schema.
   ...Schema loaded. Found 36 columns.

2. Building analysis query (nulls, uniques, redundancy)...
   ...Executing analysis (1 full pass). This is the slowest step.
   ...Analysis complete in 511.34 seconds.


3. Building data cleaning and transformation pipeline...
Low variance (1 unique value) cols: None
High null (>95%) cols: {'attributes'}
User-specified domain-based drops: {'tconst', 'nconst_1', 'knownForTitles', 'writers', 'directors', 'tconst_1', 'nconst', 'parentTconst', 'titleId'}
   ...Dropping 10 columns.
   ...Applying user-specified imputations and cleaning.
   ...Applying data type optimizations (casting).
   ...Casting 'titleType' to Categorical (11 unique).
   ...Casting 'region' to Categorical (250 unique).
   ...Casting 'language' to Categorical (111 unique).
   ...Casting 'types' to Categorical (24 unique).
   ...Casting 'category' to Categorical (14 unique).

Final schema that will be saved:
Schema(

In [None]:
# This script generates .csv file out of the .parquet file for preview purposes.
# Only top rows are taken, not a random sample.

# --- Configuration ---
n_rows = 5000  # Set the desired number of sample rows.
OUTPUT_FILE = OUTPUT_FILE # Source Parquet file name.
CSV_FILE = "preview.csv"             # Output CSV file name.

print(f"Taking first {n_rows} rows from '{OUTPUT_FILE}'...")

try:
    # 1. Open the Parquet file
    pq_file = pq.ParquetFile(OUTPUT_FILE)

    # 2. Initialize variables
    rows_collected = 0  # Counter for rows collected.
    dfs = []              # List to store DataFrame batches.

    # 3. Iterate over file in batches
    # Read file in chunks for memory efficiency.
    for batch in pq_file.iter_batches(batch_size=100_000):
        
        # Convert the raw batch into a PyArrow Table.
        table = pa.Table.from_batches([batch])

        # 4. Convert dictionary/categorical columns
        # Cast dictionary types (Polars' 'Categorical') to strings for Pandas compatibility.
        for col_name, col_type in zip(table.schema.names, table.schema.types):
            if pa.types.is_dictionary(col_type):
                # Cast the column to string.
                table = table.set_column(
                    table.schema.get_field_index(col_name),
                    col_name,
                    table[col_name].cast(pa.string())
                )
        
        # Convert batch to Pandas DataFrame.
        df = table.to_pandas()
        
        # 5. Slice and collect rows
        
        # Calculate rows still needed.
        remaining = n_rows - rows_collected
        
        # Get the slice of rows needed from this batch.
        df_slice = df.iloc[:remaining]
        
        # Add the slice to our list.
        dfs.append(df_slice)
        
        # Update our counter.
        rows_collected += len(df_slice)
        
        # 6. Check if done
        # Exit loop if we have enough rows.
        if rows_collected >= n_rows:
            break

    # 7. Combine and save to CSV
    print("\nConcatenating collected batches...")
    
    # Concatenate all collected DataFrame slices.
    sample_df = pd.concat(dfs, ignore_index=True)

    # Save the sample DataFrame to a CSV file.
    # `index=False` avoids saving the Pandas index.
    sample_df.to_csv(CSV_FILE, index=False)
    
    print(f"Success! CSV '{CSV_FILE}' created with first {n_rows} rows.")

except Exception as e:
    print(f"An error occurred: {e}")