In [1]:
# Cell 1
"""
    CS181DV Assignment 4: High-Performance Visualization with Big Data

    Author: AIKO KATO

    Date: 04/06/2025
    
"""

import os
import pandas as pd
from typing import List, Tuple, Dict

In [2]:
# Cell 2
# Function to read a specific chunk from a large tsv file into a dataframe
def read_chunk(filename, start_pos: int, size: int):
    # Read the header once to get column names
    with open(filename, 'r', encoding='utf-8') as f:
        header = f.readline().strip().split('\t')
    data = []
    with open(filename, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            # Skip lines before the chunk's start position (+1 for header)
            if i < start_pos + 1:
                continue
            # Stop once we reach the end of the chunk
            if i >= start_pos + 1 + size:
                break
            data.append(line.strip().split('\t'))
    # Convert list of rows to a dataframe with the correct column names
    return pd.DataFrame(data, columns=header)

# Function to divide the file into roughly equal-sized chunks
def distribute_workload(total_size: int, num_chunks: int) -> List[Tuple[int, int]]:
    base_chunk = total_size // num_chunks  # minimum size per chunk
    extra = total_size % num_chunks  # extra rows to distribute evenly
    chunks = []
    start = 0
    for i in range(num_chunks):
        size = base_chunk + (1 if i < extra else 0)
        chunks.append((start, size))
        start += size
    return chunks

In [3]:
# Cell 3
# Utility function to replace IMDB-style missing values ("\N") with proper nulls
def replace_missing(df):
    return df.replace('\\N', pd.NA)

# Cleaning function for title.basics.tsv
def clean_basics(df):
    df = replace_missing(df)
    # Convert 'startYear' to numeric and drop nulls
    df['startYear'] = pd.to_numeric(df['startYear'], errors='coerce')
    df = df[df['startYear'].notnull()]
    df['startYear'] = df['startYear'].astype('int16')  # I used ChatGPT for this line.
    
    # Convert 'isAdult' to int8 with fallback to 0
    df['isAdult'] = pd.to_numeric(df['isAdult'], errors='coerce').fillna(0).astype('int8')
    
    # Handle missing runtimes using median, then cast to int16
    df['runtimeMinutes'] = pd.to_numeric(df['runtimeMinutes'], errors='coerce')
    df['runtimeMinutes'] = df['runtimeMinutes'].fillna(df['runtimeMinutes'].median()).astype('int16')  # I used ChatGPT for this line.
    
    # Add column to flag if 'isAdult' was originally missing
    df['wasIsAdultMissing'] = df['isAdult'].isnull()
    
    return df

# Cleaning function for title.ratings.tsv
def clean_ratings(df):
    df = replace_missing(df)
    df['averageRating'] = pd.to_numeric(df['averageRating'], errors='coerce').astype('float32')  # I used ChatGPT for this line.
    df['numVotes'] = pd.to_numeric(df['numVotes'], errors='coerce').astype('int32')  # I used ChatGPT for this line.
    return df

# Cleaning function for title.crew.tsv
def clean_crew(df):
    df = replace_missing(df)
    df["directors"] = df["directors"].fillna("").astype(str)
    df["writers"] = df["writers"].fillna("").astype(str)
    return df

# Cleaning function for name.basics.tsv
def clean_names(df):
    # Only replace missing values, keep all other columns as-is for flexibility
    df = df.replace("\\N", pd.NA)
    return df  # Keep all columns for later use

In [4]:
# Cell 4
# Core function to process large files in chunks and write them as parquet
def process_large_file(filename: str, output_path: str, clean_func, chunk_size: int = 1_000_000):
    print(f"Processing: {filename}")
    
    # Count the total number of rows (excluding header)
    with open(filename, 'r', encoding='utf-8') as f:  # I used ChatGPT for this line.
        total_lines = sum(1 for _ in f) - 1

    # Divide total lines into manageable chunks
    chunks = distribute_workload(total_lines, total_lines // chunk_size + 1)
    dfs = []

    for start, size in chunks:
        print(f"Chunk {start} → {start + size}")
        try:
            # Read and clean each chunk
            chunk_df = read_chunk(filename, start, size)
            chunk_df = clean_func(chunk_df)
            dfs.append(chunk_df)
        except Exception as e:
            print(f"Error in chunk {start}: {e}")
            break

    # Concatenate all chunks and save as a Parquet file
    final_df = pd.concat(dfs, ignore_index=True)
    final_df.to_parquet(output_path)
    print(f"Saved cleaned file to: {output_path}")

In [5]:
# Cell 5
# Map each imdb file to its corresponding cleaning function
files_to_clean = {
    "title.basics.tsv": clean_basics,
    "title.ratings.tsv": clean_ratings,
    "title.crew.tsv": clean_crew,
    "name.basics.tsv": clean_names
}

In [6]:
# Cell 6
# Input/output directories
data_path = "share/dataset/imdb/"
output_path = "share/processed_data/"

# Ensure the output directory exists
os.makedirs(output_path, exist_ok=True)

# Run the pipeline for all files
for fname, cleaner in files_to_clean.items():
    input_file = os.path.join(data_path, fname)
    out_file = os.path.join(output_path, f"processed_{fname.replace('.tsv', '')}.parquet")
    process_large_file(input_file, out_file, cleaner)

Processing: share/dataset/imdb/title.basics.tsv
Chunk 0 → 941645
Chunk 941645 → 1883290
Chunk 1883290 → 2824934
Chunk 2824934 → 3766578
Chunk 3766578 → 4708222
Chunk 4708222 → 5649866
Chunk 5649866 → 6591510
Chunk 6591510 → 7533154
Chunk 7533154 → 8474798
Chunk 8474798 → 9416442
Chunk 9416442 → 10358086
Chunk 10358086 → 11299730
Saved cleaned file to: share/processed_data/processed_title.basics.parquet
Processing: share/dataset/imdb/title.ratings.tsv
Chunk 0 → 754818
Chunk 754818 → 1509635
Saved cleaned file to: share/processed_data/processed_title.ratings.parquet
Processing: share/dataset/imdb/title.crew.tsv
Chunk 0 → 966518
Chunk 966518 → 1933036
Chunk 1933036 → 2899554
Chunk 2899554 → 3866072
Chunk 3866072 → 4832590
Chunk 4832590 → 5799108
Chunk 5799108 → 6765626
Chunk 6765626 → 7732143
Chunk 7732143 → 8698660
Chunk 8698660 → 9665177
Chunk 9665177 → 10631694
Saved cleaned file to: share/processed_data/processed_title.crew.parquet
Processing: share/dataset/imdb/name.basics.tsv
Chunk 