In [1]:
# Cell 1: setup and verify MovieLens files

import os
import zipfile
import shutil

data_folder = "data/ml-latest-small"
os.makedirs(data_folder, exist_ok=True)

if "movies.csv" not in os.listdir(data_folder):
    print("movies.csv not found, trying to extract from zip...")

    zip_candidates = [
        "data/ml-latest-small.zip",
        os.path.join(data_folder, "ml-latest-small.zip"),
    ]

    zip_path = None
    for z in zip_candidates:
        if os.path.exists(z):
            zip_path = z
            break

    if zip_path is not None:
        print(f"Extracting from: {zip_path}")
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(data_folder)
    else:
        print("No zip file found, please check paths.")

nested = os.path.join(data_folder, "ml-latest-small")
if os.path.exists(nested):
    for file in os.listdir(nested):
        shutil.move(os.path.join(nested, file), data_folder)
    shutil.rmtree(nested)

print("Files in ml-latest-small:")
print(os.listdir(data_folder))


movies.csv not found, trying to extract from zip...
Extracting from: data/ml-latest-small.zip
Files in ml-latest-small:
['links.csv', 'movies.csv', 'ratings.csv', 'README.txt', 'tags.csv']


In [2]:
# Cell 2: install apache-beam, imports, helper functions

!pip install apache-beam

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import os

movies_path = "data/ml-latest-small/movies.csv"
ratings_path = "data/ml-latest-small/ratings.csv"
output_dir = "output"

def parse_csv_line(line):
    """Safely parse a CSV line using Python's csv.reader."""
    return next(csv.reader([line]))

def parse_movie(line):
    """Parse one line from movies.csv into a dict."""
    cols = parse_csv_line(line)
    if len(cols) < 3:
        return None
    
    movie_id = cols[0]
    title = cols[1]
    genres = cols[2].split("|") if cols[2] != "(no genres listed)" else []
    
    return {
        "movieId": movie_id,
        "title": title,
        "genres": genres,
    }

def parse_rating(line):
    """Parse one line from ratings.csv into a dict."""
    cols = parse_csv_line(line)
    if len(cols) < 4:
        return None
    
    try:
        rating = float(cols[2])
    except ValueError:
        return None
    
    return {
        "userId": cols[0],
        "movieId": cols[1],
        "rating": rating,
        "timestamp": cols[3],
    }

def assign_rating_band(avg):
    """Categorize movies based on avg rating."""
    if avg is None:
        return "unknown"
    if avg >= 4.0:
        return "high"
    if avg >= 3.0:
        return "medium"
    return "low"





[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
# Cell 3: Apache Beam pipeline (movie & genre stats + filters + top 10 genres)

os.makedirs(output_dir, exist_ok=True)

pipeline_options = PipelineOptions(flags=[], save_main_session=True)

with beam.Pipeline(options=pipeline_options) as p:

    movies = (
        p
        | "Read Movies CSV" >> beam.io.ReadFromText(movies_path, skip_header_lines=1)
        | "Parse Movies" >> beam.Map(parse_movie)
        | "Keep Valid Movies" >> beam.Filter(lambda m: m is not None)
        | "Movies as KV" >> beam.Map(lambda m: (m["movieId"], m))
    )

    ratings = (
        p
        | "Read Ratings CSV" >> beam.io.ReadFromText(ratings_path, skip_header_lines=1)
        | "Parse Ratings" >> beam.Map(parse_rating)
        | "Keep Valid Ratings" >> beam.Filter(lambda r: r is not None)
        | "MovieId -> Rating" >> beam.Map(lambda r: (r["movieId"], r["rating"]))
    )

    movie_avg = (
        ratings
        | "Mean Rating per Movie" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
    )

    movie_count = (
        ratings
        | "Count Ratings per Movie" >> beam.combiners.Count.PerKey()
    )

    movie_stats = (
        {"avg": movie_avg, "count": movie_count}
        | "Join Avg & Count" >> beam.CoGroupByKey()
        | "Build Movie Stats Dict" >> beam.Map(
            lambda kv: (
                kv[0],
                {
                    "avg_rating": round(kv[1]["avg"][0], 2) if kv[1]["avg"] else None,
                    "num_ratings": kv[1]["count"][0] if kv[1]["count"] else 0,
                },
            )
        )
    )

    movie_summary = (
        {"movie_info": movies, "rating_stats": movie_stats}
        | "Join Movies & Stats" >> beam.CoGroupByKey()
        | "Build Movie Summary Dict" >> beam.Map(
            lambda kv: {
                "movieId": kv[0],
                "title": kv[1]["movie_info"][0]["title"]
                if kv[1]["movie_info"] else None,
                "genres": kv[1]["movie_info"][0]["genres"]
                if kv[1]["movie_info"] else [],
                "avg_rating": kv[1]["rating_stats"][0]["avg_rating"]
                if kv[1]["rating_stats"] else None,
                "num_ratings": kv[1]["rating_stats"][0]["num_ratings"]
                if kv[1]["rating_stats"] else 0,
            }
        )
        
        | "Keep Popular Movies" >> beam.Filter(lambda d: d["num_ratings"] >= 50)
        | "Assign Rating Band" >> beam.Map(
            lambda d: {**d, "rating_band": assign_rating_band(d["avg_rating"])}
        )
        
    )

    genre_stats = (
        movie_summary
        | "Expand Genres" >> beam.FlatMap(
            lambda d: [
                (genre, d["avg_rating"])
                for genre in d["genres"]
                if d["avg_rating"] is not None
            ]
        )
    )

    genre_avg = (
        genre_stats
        | "Mean Rating per Genre" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
    )

    genre_count = (
        genre_stats
        | "Count Movies per Genre" >> beam.combiners.Count.PerKey()
    )

    genre_summary = (
        {"avg": genre_avg, "count": genre_count}
        | "Join Genre Avg & Count" >> beam.CoGroupByKey()
        | "Build Genre Summary Dict" >> beam.Map(
            lambda kv: {
                "genre": kv[0],
                "avg_rating": round(kv[1]["avg"][0], 2) if kv[1]["avg"] else None,
                "num_rated_movies": kv[1]["count"][0] if kv[1]["count"] else 0,
            }
        )
    )

    _ = (
        movie_summary
        | "Movie Summary -> String" >> beam.Map(lambda d: str(d))
        | "Write Movie Summary" >> beam.io.WriteToText(
            file_path_prefix=os.path.join(output_dir, "movie_rating_summary"),
            file_name_suffix=".txt",
            shard_name_template=""
        )
    )

    _ = (
        genre_summary
        | "Genre Summary -> String" >> beam.Map(lambda d: str(d))
        | "Write Genre Summary" >> beam.io.WriteToText(
            file_path_prefix=os.path.join(output_dir, "genre_rating_summary"),
            file_name_suffix=".txt",
            shard_name_template=""
        )
    )

    top10_genres = (
        genre_summary
        | "Map for Sorting" >> beam.Map(lambda d: (d["avg_rating"], d))
        | "Top 10 Genres" >> beam.combiners.Top.Of(10, key=lambda x: x[0])
        | "Extract Genre Dicts" >> beam.FlatMap(lambda items: [i[1] for i in items])
    )

    _ = (
        top10_genres
        | "Top 10 -> String" >> beam.Map(lambda d: str(d))
        | "Write Top 10 Genres" >> beam.io.WriteToText(
            file_path_prefix=os.path.join(output_dir, "top10_genres_by_rating"),
            file_name_suffix=".txt",
            shard_name_template=""
        )
    )






In [4]:
# Cell 4: inspect output files

print("Output files:", os.listdir(output_dir))

print("\nSample movie summary lines:")
with open(os.path.join(output_dir, "movie_rating_summary.txt")) as f:
    for i, line in enumerate(f):
        if i >= 5:
            break
        print(line.strip())

print("\nSample genre summary lines:")
with open(os.path.join(output_dir, "genre_rating_summary.txt")) as f:
    for i, line in enumerate(f):
        if i >= 5:
            break
        print(line.strip())

print("\nTop 10 genres by rating:")
with open(os.path.join(output_dir, "top10_genres_by_rating.txt")) as f:
    for i, line in enumerate(f):
        if i >= 10:
            break
        print(line.strip())


Output files: ['genre_rating_summary.txt', 'movie_rating_summary.txt', 'top10_genres_by_rating.txt']

Sample movie summary lines:
{'movieId': '1', 'title': 'Toy Story (1995)', 'genres': ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy'], 'avg_rating': 3.92, 'num_ratings': 215, 'rating_band': 'medium'}
{'movieId': '2', 'title': 'Jumanji (1995)', 'genres': ['Adventure', 'Children', 'Fantasy'], 'avg_rating': 3.43, 'num_ratings': 110, 'rating_band': 'medium'}
{'movieId': '3', 'title': 'Grumpier Old Men (1995)', 'genres': ['Comedy', 'Romance'], 'avg_rating': 3.26, 'num_ratings': 52, 'rating_band': 'medium'}
{'movieId': '6', 'title': 'Heat (1995)', 'genres': ['Action', 'Crime', 'Thriller'], 'avg_rating': 3.95, 'num_ratings': 102, 'rating_band': 'medium'}
{'movieId': '7', 'title': 'Sabrina (1995)', 'genres': ['Comedy', 'Romance'], 'avg_rating': 3.19, 'num_ratings': 54, 'rating_band': 'medium'}

Sample genre summary lines:
{'genre': 'Adventure', 'avg_rating': 3.62, 'num_rated_movies':