# MovieLens Data Pipeline with Apache Beam

This notebook demonstrates a complete ETL pipeline using Apache Beam to analyze movie data from the MovieLens dataset.

## Pipeline Overview
- **Input**: MovieLens CSV files (movies.csv, ratings.csv)
- **Processing**: Data cleaning, joining, aggregations
- **Output**: 5 analytical CSV reports

## Outputs Generated
1. Average rating by genre
2. Top-N highest rated movies per genre
3. Movie statistics by decade
4. Rating distribution analysis
5. Popularity analysis (Popular/Moderate/Niche)

## Setup and Imports

In [None]:
# Install Apache Beam (run once)
!pip install apache-beam

In [None]:
import csv
import io
from typing import Dict, Iterable, List

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

## Helper Functions

These functions handle:
- Safe type conversions (str → float, str → int)
- CSV formatting for output files

In [None]:
def to_float_safe(x: str) -> float:
    """Safely convert string to float, returns NaN on error"""
    try:
        return float(x)
    except Exception:
        return float("nan")


def to_int_safe(x: str):
    """Safely convert string to int, returns None on error"""
    try:
        return int(x)
    except Exception:
        return None


def format_csv_line(fields: Iterable) -> str:
    """Format a tuple/list as a CSV line"""
    out = io.StringIO()
    csv.writer(out).writerow(list(fields))
    return out.getvalue().rstrip("\r\n")

## DoFn Classes

Apache Beam uses DoFn (Do Function) classes to define transformations on data.

### 1. ParseCSV - Converts CSV lines to dictionaries

In [None]:
class ParseCSV(beam.DoFn):
    """
    Parses CSV lines to dicts using a known header.
    Skips the header line and blank lines.
    """
    def __init__(self, header_line: str):
        self.header_line = header_line
        self._fieldnames: List[str] = []

    def setup(self):
        # Parse header to get column names
        self._fieldnames = next(csv.reader([self.header_line]))

    def process(self, line: str) -> Iterable[Dict]:
        line = line.strip()
        if not line or line == self.header_line:
            return
        
        # Parse CSV row and create dictionary
        values = next(csv.reader([line]))
        if len(values) < len(self._fieldnames):
            values = values + [""] * (len(self._fieldnames) - len(values))
        row = dict(zip(self._fieldnames, values))
        yield row

### 2. PreprocessMovies - Cleans and enriches movie data

In [None]:
class PreprocessMovies(beam.DoFn):
    """
    Preprocesses movie data:
    - Extracts year from title (e.g., "Toy Story (1995)" → 1995)
    - Parses genres (e.g., "Action|Adventure|Sci-Fi" → ["Action", "Adventure", "Sci-Fi"])
    - Creates decade field (e.g., 1995 → 1990)
    - Filters out invalid entries
    """
    def process(self, row: Dict) -> Iterable[Dict]:
        title = row.get("title", "")
        
        # Extract year from title (format: "Movie Title (1999)")
        year = None
        if "(" in title and ")" in title:
            try:
                year_str = title.split("(")[-1].split(")")[0]
                year = int(year_str)
                if year < 1900 or year > 2025:
                    year = None
            except:
                pass
        
        if year is None:
            return  # Skip movies without valid year
        
        row["year"] = year
        row["decade"] = (year // 10) * 10
        
        # Parse genres (format: "Action|Adventure|Sci-Fi")
        genres = row.get("genres", "")
        if not genres or genres == "(no genres listed)":
            return  # Skip movies without genres
        
        genre_list = genres.split("|")
        row["primaryGenre"] = genre_list[0]
        row["allGenres"] = genre_list
        
        yield row

### 3. JoinWithRatings - Joins movies with their ratings

In [None]:
class JoinWithRatings(beam.DoFn):
    """
    Enriches movie data with rating information:
    - Joins movies with their ratings using CoGroupByKey results
    - Calculates average rating from all user ratings
    - Counts number of ratings per movie
    - Filters movies with < 10 ratings (quality threshold)
    """
    def process(self, element) -> Iterable[Dict]:
        movie_id, data = element
        movies_list = data.get("movies", [])
        ratings_list = data.get("ratings", [])
        
        # Skip if no movie or ratings found
        if not movies_list or not ratings_list:
            return
        
        movie = movies_list[0]
        
        # Calculate average rating from all user ratings
        total_rating = 0
        count = 0
        for rating_row in ratings_list:
            rating = to_float_safe(rating_row.get("rating", ""))
            if rating == rating:  # Check if not NaN
                total_rating += rating
                count += 1
        
        if count == 0:
            return
        
        movie["averageRating"] = round(total_rating / count, 2)
        movie["numRatings"] = count
        
        # Only keep movies with at least 10 ratings
        if count >= 10:
            yield movie

## Pipeline Configuration

In [None]:
# Configuration parameters
MOVIES_FILE = "data/movies.csv"
RATINGS_FILE = "data/ratings.csv"
OUTPUT_DIR = "outputs"
TOP_N = 10  # Top N movies per genre

# Read headers from files
with open(MOVIES_FILE, "r", encoding="utf-8") as f:
    movies_header = f.readline().strip()

with open(RATINGS_FILE, "r", encoding="utf-8") as f:
    ratings_header = f.readline().strip()

print(f"Movies header: {movies_header}")
print(f"Ratings header: {ratings_header}")

## Main Pipeline

The pipeline consists of:
1. **Data Ingestion**: Read CSV files
2. **Data Preprocessing**: Clean and enrich data
3. **Data Join**: Combine movies with ratings
4. **Analysis**: Generate 5 different analytical outputs

In [None]:
# Create pipeline options
opts = PipelineOptions()

# Run the pipeline
with beam.Pipeline(options=opts) as p:
    
    # ========================================
    # STEP 1: Read and preprocess movies
    # ========================================
    movies_raw = (
        p 
        | "ReadMovies" >> beam.io.ReadFromText(MOVIES_FILE)
        | "ParseMovies" >> beam.ParDo(ParseCSV(movies_header))
        | "PreprocessMovies" >> beam.ParDo(PreprocessMovies())
        | "KeyMoviesByID" >> beam.Map(lambda r: (r["movieId"], r))
    )
    
    # ========================================
    # STEP 2: Read ratings
    # ========================================
    ratings_raw = (
        p
        | "ReadRatings" >> beam.io.ReadFromText(RATINGS_FILE)
        | "ParseRatings" >> beam.ParDo(ParseCSV(ratings_header))
        | "KeyRatingsByID" >> beam.Map(lambda r: (r["movieId"], r))
    )
    
    # ========================================
    # STEP 3: Join movies with ratings
    # ========================================
    movies = (
        {"movies": movies_raw, "ratings": ratings_raw}
        | "CoGroupByKey" >> beam.CoGroupByKey()
        | "JoinWithRatings" >> beam.ParDo(JoinWithRatings())
    )

    # ========================================
    # OUTPUT 1: Average rating by genre
    # ========================================
    genre_rating = (
        movies
        | "KeyByGenre" >> beam.Map(lambda r: (r["primaryGenre"], r["averageRating"]))
        | "GroupRatingByGenre" >> beam.GroupByKey()
        | "AvgRating" >> beam.Map(
            lambda kv: (kv[0], sum(kv[1]) / len(list(kv[1])))
        )
    )

    rating_header = p | "RatingHeader" >> beam.Create(
        [format_csv_line(("genre", "avg_rating"))]
    )

    rating_rows = (
        genre_rating
        | "FormatRatingRows" >> beam.Map(
            lambda kv: format_csv_line((kv[0], f"{kv[1]:.2f}"))
        )
    )

    _ = ((rating_header, rating_rows)
         | "RatingFlatten" >> beam.Flatten()
         | "WriteRatingCSV" >> beam.io.WriteToText(
                file_path_prefix=f"{OUTPUT_DIR}/genre_avg_rating",
                file_name_suffix=".csv",
                num_shards=1
            )
    )

    # ========================================
    # OUTPUT 2: Top-N highest rated movies per genre
    # ========================================
    genre_movie_ratings = (
        movies
        | "KeyByGenreMovie" >>
            beam.Map(lambda r: (r["primaryGenre"], 
                               (r["title"], r["averageRating"], r["numRatings"])))
        | "GroupByGenre" >> beam.GroupByKey()
        | "SortByRating" >> beam.Map(
            lambda kv: (kv[0], sorted(kv[1], key=lambda x: x[1], reverse=True))
        )
        | "TakeTopN" >> beam.Map(lambda kv: (kv[0], kv[1][:TOP_N]))
        | "ExplodeTopN" >> beam.FlatMap(
            lambda kv: [(kv[0], title, rating, votes) for (title, rating, votes) in kv[1]]
        )
    )

    topn_header = p | "TopNHeader" >> beam.Create(
        [format_csv_line(("genre", "title", "rating", "num_ratings"))]
    )
    
    topn_rows = (
        genre_movie_ratings
        | "FormatTopNRows" >> beam.Map(
            lambda t: format_csv_line((t[0], t[1], f"{t[2]:.2f}", t[3]))
        )
    )

    _ = ((topn_header, topn_rows)
         | "TopNFlatten" >> beam.Flatten()
         | "WriteTopNCSV" >> beam.io.WriteToText(
                file_path_prefix=f"{OUTPUT_DIR}/top{TOP_N}_movies_by_genre",
                file_name_suffix=".csv",
                num_shards=1
            )
    )

    # ========================================
    # OUTPUT 3: Movie count and average rating by decade
    # ========================================
    decade_stats = (
        movies
        | "KeyByDecade" >> beam.Map(lambda r: (r["decade"], 
                                               (r["averageRating"], 1)))
        | "GroupByDecade" >> beam.GroupByKey()
        | "ComputeDecadeStats" >> beam.Map(
            lambda kv: (
                kv[0], 
                len(list(kv[1])),
                sum(x[0] for x in kv[1]) / len(list(kv[1]))
            )
        )
    )

    decade_header = p | "DecadeHeader" >> beam.Create(
        [format_csv_line(("decade", "movie_count", "avg_rating"))]
    )

    decade_rows = (
        decade_stats
        | "FormatDecadeRows" >> beam.Map(
            lambda t: format_csv_line((f"{t[0]}s", t[1], f"{t[2]:.2f}"))
        )
    )

    _ = ((decade_header, decade_rows)
         | "DecadeFlatten" >> beam.Flatten()
         | "WriteDecadeCSV" >> beam.io.WriteToText(
                file_path_prefix=f"{OUTPUT_DIR}/decade_statistics",
                file_name_suffix=".csv",
                num_shards=1
            )
    )

    # ========================================
    # OUTPUT 4: Rating distribution buckets
    # ========================================
    rating_buckets = (
        movies
        | "CreateRatingBuckets" >> beam.Map(
            lambda r: (int(r["averageRating"]), 1)
        )
        | "SumRatingBuckets" >> beam.CombinePerKey(sum)
    )

    rating_bucket_header = p | "RatingBucketHeader" >> beam.Create(
        [format_csv_line(("rating_bucket", "movie_count"))]
    )

    rating_bucket_rows = (
        rating_buckets
        | "FormatRatingBuckets" >> beam.Map(
            lambda kv: format_csv_line((f"{kv[0]}-{kv[0]+1}", kv[1]))
        )
    )

    _ = ((rating_bucket_header, rating_bucket_rows)
         | "RatingBucketFlatten" >> beam.Flatten()
         | "WriteRatingBucketCSV" >> beam.io.WriteToText(
                file_path_prefix=f"{OUTPUT_DIR}/rating_distribution",
                file_name_suffix=".csv",
                num_shards=1
            )
    )

    # ========================================
    # OUTPUT 5: Popular vs Niche movies analysis
    # ========================================
    popularity_category = (
        movies
        | "CategorizePopularity" >> beam.Map(
            lambda r: (
                "Popular" if r["numRatings"] >= 100 
                else "Moderate" if r["numRatings"] >= 50 
                else "Niche",
                (r["averageRating"], 1)
            )
        )
        | "GroupByPopularity" >> beam.GroupByKey()
        | "ComputePopularityStats" >> beam.Map(
            lambda kv: (
                kv[0],
                len(list(kv[1])),
                sum(x[0] for x in kv[1]) / len(list(kv[1]))
            )
        )
    )

    popularity_header = p | "PopularityHeader" >> beam.Create(
        [format_csv_line(("popularity_category", "movie_count", "avg_rating"))]
    )

    popularity_rows = (
        popularity_category
        | "FormatPopularityRows" >> beam.Map(
            lambda t: format_csv_line((t[0], t[1], f"{t[2]:.2f}"))
        )
    )

    _ = ((popularity_header, popularity_rows)
         | "PopularityFlatten" >> beam.Flatten()
         | "WritePopularityCSV" >> beam.io.WriteToText(
                file_path_prefix=f"{OUTPUT_DIR}/popularity_analysis",
                file_name_suffix=".csv",
                num_shards=1
            )
    )

print("\n✅ Pipeline completed successfully!")
print(f"\nOutput files created in '{OUTPUT_DIR}/' directory:")
print("  1. genre_avg_rating-00000-of-00001.csv")
print(f"  2. top{TOP_N}_movies_by_genre-00000-of-00001.csv")
print("  3. decade_statistics-00000-of-00001.csv")
print("  4. rating_distribution-00000-of-00001.csv")
print("  5. popularity_analysis-00000-of-00001.csv")

## View Results

Let's preview the generated output files.

In [None]:
import pandas as pd

# Read and display output files
output_files = [
    "genre_avg_rating-00000-of-00001.csv",
    f"top{TOP_N}_movies_by_genre-00000-of-00001.csv",
    "decade_statistics-00000-of-00001.csv",
    "rating_distribution-00000-of-00001.csv",
    "popularity_analysis-00000-of-00001.csv"
]

for file in output_files:
    print(f"\n{'='*60}")
    print(f"📊 {file}")
    print('='*60)
    df = pd.read_csv(f"{OUTPUT_DIR}/{file}")
    print(df.head(10))
    print(f"\nTotal rows: {len(df)}")

## Summary Statistics

In [None]:
# Load all outputs for summary
genre_ratings = pd.read_csv(f"{OUTPUT_DIR}/genre_avg_rating-00000-of-00001.csv")
decade_stats = pd.read_csv(f"{OUTPUT_DIR}/decade_statistics-00000-of-00001.csv")
rating_dist = pd.read_csv(f"{OUTPUT_DIR}/rating_distribution-00000-of-00001.csv")
popularity = pd.read_csv(f"{OUTPUT_DIR}/popularity_analysis-00000-of-00001.csv")

print("📈 PIPELINE SUMMARY")
print("="*60)
print(f"Total genres analyzed: {len(genre_ratings)}")
print(f"Decades covered: {len(decade_stats)}")
print(f"Rating buckets: {len(rating_dist)}")
print(f"\nTop rated genre: {genre_ratings.loc[genre_ratings['avg_rating'].idxmax(), 'genre']}")
print(f"Most productive decade: {decade_stats.loc[decade_stats['movie_count'].idxmax(), 'decade']}")
print(f"\nPopularity breakdown:")
print(popularity)