# MovieLens 25M - ML Recommendation System 

## Data Preparation Notebook

This notebook is based on the previous one (01_EDA_MovieLens_25M.ipynb) and its objective is to prepare the data for proper ML model training for a recommendation system for movies. We will be taking the strategic choice of which data sources to feed into the recommender, because each file in MovieLens 25M serves a different purpose, and the “best” one depends on the type of model we want to build. 

In the previous notebook we've seen that the MovieLens 25M Dataset is composed of several files:

    - ratings.csv → the primary dataset for training collaborative filtering models (user-based or item-based): it contains explicit user feedback (0.5–5 stars)

    - movies.csv → a supporting file for hybrid models containing metadata (title, genres); it will be useful for content-based features (genre embeddings, one-hot vectors) and shall help with cold-start items (new movies with few ratings)

    - genome-tags.csv + genome-scores.csv → Semantic features: they provide a dense tag–movie relevance matrix (0–1 scores) and will be excellent for hybrid recommenders: we will combine collaborative signals with semantic descriptors

    - tags.csv → raw user-contributed tags, they are rather noisy (might be valuable for analyzing user language and preferences, but less useful or reliable for ML than genome tags)
    
    - links.csv → external IDs (IMDb, TMDb): not really needed for core training, but useful if we want to enrich with external metadata (cast, crew, posters).

## Data strategy for ML Recommendation System

For further work on the ML model, we will focus on the three
 of the above described files:

    - ratings.csv - the 'backbone' of the whole dataset containing explicit feedback from users

    - movies.csv - the metadata	containing the genre and title features for content-based signals

    - genome-scores.csv - the semantic features	with dense tag relevance scores for hybrid modeling

We could link them all thanks to the 'movieId' column as a join key, but their size and internal structure will not allow us to do this because of memory limitations. Therefore we will produce 2 merged files which we consider as the right strategy for building our recommendation model because it will allow us to work on:

    Collaborative filtering → as a starting point for training a baseline model        

    Hybrid (collaborative + content based model) → for model enrichment and optimization

In the last part of this notebook we will proceed with splitting the entire dataframe (ratings_movies) with the following proportions: 70/20/10. The split will be temporal (based on timestamp) from the oldest to the most recent ratings and will serve the following objectives respectively:

- ML model training (70% of the data)
- ML model testing (20% of the data
- Continuous buffer (10% of the data): for continuous training after deployment)


It is important to mention at this stage that the 'continuous buffer' will simulate the 'new' data ingestion so that the model can be properly monitored and retrained if needed.



In [1]:
# Importing libraries for data manipulation, visualization, and analysis

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from pathlib import Path

In [3]:
# Loading files
ratings = pd.read_csv("../raw/ratings.csv")
movies = pd.read_csv("../raw/movies.csv")
genome_scores = pd.read_csv("../raw/genome-scores.csv")
genome_tags = pd.read_csv("../raw/genome-tags.csv")  # Optional for tag names (in case needed)


In [4]:
# Saving movies_base for future use
movies_base = movies.copy()

In [5]:
# Displaying the first few rows of each dataframe to remind their structure
display(ratings.head())
display(movies.head())
display(genome_scores.head())
display(genome_tags.head())  # Optional display


Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


Unnamed: 0,movieId,tagId,relevance
0,1,1,0.02875
1,1,2,0.02375
2,1,3,0.0625
3,1,4,0.07575
4,1,5,0.14075


Unnamed: 0,tagId,tag
0,1,007
1,2,007 (series)
2,3,18th century
3,4,1920s
4,5,1930s


In [6]:
# Focusing on ratings first
ratings['datetime'] = pd.to_datetime(ratings['timestamp'], unit='s') # Convert timestamp to datetime
ratings['year'] = ratings['datetime'].dt.year # Extract year from datetime
ratings['liked'] = (ratings['rating'] >= 3.5).astype(int) # Create binary liked column

In [7]:
# Displaying the first few rows of ratings to check new columns
display(ratings.head())

Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,liked
0,1,296,5.0,1147880044,2006-05-17 15:34:04,2006,1
1,1,306,3.5,1147868817,2006-05-17 12:26:57,2006,1
2,1,307,5.0,1147868828,2006-05-17 12:27:08,2006,1
3,1,665,5.0,1147878820,2006-05-17 15:13:40,2006,1
4,1,899,3.5,1147868510,2006-05-17 12:21:50,2006,1


In [8]:
# Focusing on movies next - exploding genres so each row contains only one genre
# Split genres into list
movies['genres'] = movies['genres'].apply(lambda g: g.split('|') if isinstance(g, str) else [])
# Explode so each movie-genre combination is a separate row
movies = movies.explode('genres').reset_index(drop=True)

In [9]:
# Checking the movies dataframe after genre expansion
display(movies.head())

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure
1,1,Toy Story (1995),Animation
2,1,Toy Story (1995),Children
3,1,Toy Story (1995),Comedy
4,1,Toy Story (1995),Fantasy


In [10]:
# Checking the shape of the movies dataframe (after exploding)
movies.shape

(112307, 3)

In [11]:
# Focusing on genome scores by aggregating them into a wide format
genome_wide = genome_scores.pivot(index="movieId", columns="tagId", values="relevance").astype('float32')

This pivot line is reshaping the genome scores from “long” (many rows per movie) into “wide” (one row per movie, many tag columns). This would be a standard step in feature engineering so that we can treat tags as numeric features. The pivot produces a matrix, where each row is a movie’s “semantic fingerprint” across all tags ( it is possible in further part to reduce the dimensionality as the number of columns is significant).

In [12]:
# Checking the genome_wide dataframe
display(genome_wide.head())

tagId,1,2,3,4,5,6,7,8,9,10,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,0.20375,0.202,0.03075,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
2,0.04125,0.0405,0.06275,0.08275,0.091,0.06125,0.06925,0.096,0.0765,0.0525,...,0.0525,0.01575,0.0125,0.02,0.12225,0.03275,0.021,0.011,0.10525,0.01975
3,0.04675,0.0555,0.02925,0.087,0.0475,0.04775,0.046,0.14275,0.0285,0.03875,...,0.06275,0.0195,0.02225,0.023,0.122,0.03475,0.017,0.018,0.091,0.01775
4,0.03425,0.038,0.0405,0.031,0.065,0.03575,0.029,0.0865,0.032,0.0315,...,0.05325,0.028,0.01675,0.03875,0.182,0.0705,0.01625,0.01425,0.0885,0.015
5,0.043,0.05325,0.038,0.041,0.054,0.06725,0.02775,0.0765,0.0215,0.02975,...,0.0535,0.0205,0.01425,0.0255,0.19225,0.02675,0.01625,0.013,0.087,0.016


In [13]:
# Checking the pivoted dataframe shape
genome_wide.shape

(13816, 1128)

In [14]:
# Handle missing genome scores (movies without genome tags)
# Option: Fill NaN with 0 (assumes no tag relevance if not rated)
genome_wide = genome_wide.fillna(0)

print(f"Genome wide shape after filling NaNs: {genome_wide.shape}")
print(f"Number of unique movies in genome_wide: {len(genome_wide)}")


Genome wide shape after filling NaNs: (13816, 1128)
Number of unique movies in genome_wide: 13816


In [15]:
# Prepare genome_wide for merge with movies - reset index so movieId is a column
genome_wide_reset = genome_wide.reset_index()
display(genome_wide_reset.head())


tagId,movieId,1,2,3,4,5,6,7,8,9,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
0,1,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,0.20375,0.202,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
1,2,0.04125,0.0405,0.06275,0.08275,0.091,0.06125,0.06925,0.096,0.0765,...,0.0525,0.01575,0.0125,0.02,0.12225,0.03275,0.021,0.011,0.10525,0.01975
2,3,0.04675,0.0555,0.02925,0.087,0.0475,0.04775,0.046,0.14275,0.0285,...,0.06275,0.0195,0.02225,0.023,0.122,0.03475,0.017,0.018,0.091,0.01775
3,4,0.03425,0.038,0.0405,0.031,0.065,0.03575,0.029,0.0865,0.032,...,0.05325,0.028,0.01675,0.03875,0.182,0.0705,0.01625,0.01425,0.0885,0.015
4,5,0.043,0.05325,0.038,0.041,0.054,0.06725,0.02775,0.0765,0.0215,...,0.0535,0.0205,0.01425,0.0255,0.19225,0.02675,0.01625,0.013,0.087,0.016


## Merge strategy
- Join `ratings` with `movies` (not exploded).
- Keep genome tag features at the item level: `movie_features = movies + genome_wide`.
- We will avoid merging genome_wide onto every rating row (25M x 1,128 tag cols ≈ >110 GB in float32).
- When training, join `movie_features` on `movieId` in batches or load as a separate lookup.

In [16]:
# Join ratings with the base movies file (non-exploded) to keep ~25M rows
ratings_movies = ratings.merge(movies_base, on="movieId", how="left")
print(f"ratings_movies shape: {ratings_movies.shape}")

ratings_movies shape: (25000095, 9)


In [17]:
display(ratings_movies.head())

Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,liked,title,genres
0,1,296,5.0,1147880044,2006-05-17 15:34:04,2006,1,Pulp Fiction (1994),Comedy|Crime|Drama|Thriller
1,1,306,3.5,1147868817,2006-05-17 12:26:57,2006,1,Three Colors: Red (Trois couleurs: Rouge) (1994),Drama
2,1,307,5.0,1147868828,2006-05-17 12:27:08,2006,1,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama
3,1,665,5.0,1147878820,2006-05-17 15:13:40,2006,1,Underground (1995),Comedy|Drama|War
4,1,899,3.5,1147868510,2006-05-17 12:21:50,2006,1,Singin' in the Rain (1952),Comedy|Musical|Romance


In [18]:
# Build item-level features (movies + genome_wide) separately to avoid blowing up memory
movie_features = movies.merge(genome_wide_reset, on="movieId", how="left")

In [20]:
display(movie_features.head())

Unnamed: 0,movieId,title,genres,1,2,3,4,5,6,7,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
0,1,Toy Story (1995),Adventure,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
1,1,Toy Story (1995),Animation,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
2,1,Toy Story (1995),Children,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
3,1,Toy Story (1995),Comedy,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
4,1,Toy Story (1995),Fantasy,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022


In [21]:
# Fill missing genome scores for movies without tags, then inspect shapes
if not genome_wide_reset.empty:
    genome_tag_cols = genome_wide_reset.columns.drop('movieId')
    movie_features[genome_tag_cols] = movie_features[genome_tag_cols].fillna(0)

print(f"ratings_movies shape: {ratings_movies.shape}")
print(f"movie_features shape: {movie_features.shape}")

display(movie_features.head())

ratings_movies shape: (25000095, 9)
movie_features shape: (112307, 1131)


Unnamed: 0,movieId,title,genres,1,2,3,4,5,6,7,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
0,1,Toy Story (1995),Adventure,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
1,1,Toy Story (1995),Animation,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
2,1,Toy Story (1995),Children,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
3,1,Toy Story (1995),Comedy,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
4,1,Toy Story (1995),Fantasy,0.02875,0.02375,0.0625,0.07575,0.14075,0.14675,0.0635,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022


### Single-row item features (preferred for hybrid)
We'll create `movie_features_uni` with one row per `movieId` by using genre one-hot (multi-hot) from the base movies and merging genome tags. This is smaller and cleaner for training-time lookups.

In [22]:
# Build per-movie item features: genres one-hot (multi-hot) + genome tags
# 1) Genres as one-hot from base movies (no explosion)
genre_dummies = movies_base['genres'].str.get_dummies(sep='|').astype('int8')
movie_features_uni = movies_base[['movieId', 'title']].join(genre_dummies)

# 2) Merge genome tag features (wide) and fill NaNs
movie_features_uni = movie_features_uni.merge(genome_wide_reset, on='movieId', how='left')
if not genome_wide_reset.empty:
    genome_cols = genome_wide_reset.columns.drop('movieId')
    # reduce memory: keep genome features as float32
    movie_features_uni[genome_cols] = movie_features_uni[genome_cols].fillna(0).astype('float32')

print(f"movie_features (exploded) shape: {movie_features.shape}")
print(f"movie_features_uni (1 row/movie) shape: {movie_features_uni.shape}")

display(movie_features_uni.head())

movie_features (exploded) shape: (112307, 1131)
movie_features_uni (1 row/movie) shape: (62423, 1150)


Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0.0525,0.01575,0.0125,0.02,0.12225,0.03275,0.021,0.011,0.10525,0.01975
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,0.06275,0.0195,0.02225,0.023,0.122,0.03475,0.017,0.018,0.091,0.01775
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,0.05325,0.028,0.01675,0.03875,0.182,0.0705,0.01625,0.01425,0.0885,0.015
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0.0535,0.0205,0.01425,0.0255,0.19225,0.02675,0.01625,0.013,0.087,0.016


### Dimensionality reduction (PCA) for genome tags
With the above merge strategy, we obtained a large dataframe 'movie_features_uni' of the following shape: (62423, 1150). It will be useful for ML training to reduce the number of columns using PCA: we will project the ~1.1k genome-tag features into a compact embedding (64 dimensions). The genome scores are 0–1, so we will have to center features before PCA. This yields smaller per-movie vectors while retaining most variance.

In [23]:
# PCA embeddings for genome tags
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# Choose genome columns directly from the wide table reference
genome_cols = list(genome_wide_reset.columns.drop('movieId')) if 'genome_wide_reset' in globals() else []

EMB_DIM = 64

if genome_cols:
    # Center features (no variance scaling), keep float32 to save memory
    scaler = StandardScaler(with_mean=True, with_std=False)
    X_centered = scaler.fit_transform(movie_features_uni[genome_cols].astype('float32'))

    pca = PCA(n_components=EMB_DIM, svd_solver='randomized', random_state=42)
    Z = pca.fit_transform(X_centered).astype('float32')

    movie_embeddings = pd.DataFrame(Z, columns=[f'g_emb_{i}' for i in range(Z.shape[1])])
    movie_embeddings.insert(0, 'movieId', movie_features_uni['movieId'].values)

    var_expl = float(pca.explained_variance_ratio_.sum())
    print(f"PCA embeddings shape: {movie_embeddings.shape}; explained variance: {var_expl:.3f}")

    # Build a compact item-features table: keep genres + embeddings (drop raw genome tag columns)
    genre_cols = [c for c in movie_features_uni.columns if c not in ['movieId','title'] and c not in set(genome_cols)]
    movie_features_small = movie_features_uni[['movieId','title'] + genre_cols].merge(movie_embeddings, on='movieId', how='left')

    print(f"movie_features_small shape: {movie_features_small.shape}")
    display(movie_features_small.head())
else:
    print("No genome columns found for PCA.")

PCA embeddings shape: (62423, 65); explained variance: 0.869
movie_features_small shape: (62423, 86)


Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,g_emb_54,g_emb_55,g_emb_56,g_emb_57,g_emb_58,g_emb_59,g_emb_60,g_emb_61,g_emb_62,g_emb_63
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,-0.051027,0.099649,-0.244833,0.003951,-0.136712,0.325241,0.07195,0.151897,-0.005155,-0.211419
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0.080299,0.07776,-0.001979,-0.169908,-0.32837,-0.155115,-0.324047,0.22915,0.065371,0.075309
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,-0.003522,-0.255299,-0.047662,0.043895,0.025116,-0.006282,0.120842,0.127993,-0.087254,-0.039121
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,-0.316506,0.234975,0.107907,0.136268,0.067956,0.319556,0.008732,0.139272,-0.150812,0.039239
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0.26296,-0.265437,0.022199,0.15288,0.16735,0.148737,0.113985,0.046422,-0.158531,0.106722


In [24]:
movie_features_small.head()

Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,g_emb_54,g_emb_55,g_emb_56,g_emb_57,g_emb_58,g_emb_59,g_emb_60,g_emb_61,g_emb_62,g_emb_63
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,-0.051027,0.099649,-0.244833,0.003951,-0.136712,0.325241,0.07195,0.151897,-0.005155,-0.211419
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0.080299,0.07776,-0.001979,-0.169908,-0.32837,-0.155115,-0.324047,0.22915,0.065371,0.075309
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,-0.003522,-0.255299,-0.047662,0.043895,0.025116,-0.006282,0.120842,0.127993,-0.087254,-0.039121
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,-0.316506,0.234975,0.107907,0.136268,0.067956,0.319556,0.008732,0.139272,-0.150812,0.039239
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0.26296,-0.265437,0.022199,0.15288,0.16735,0.148737,0.113985,0.046422,-0.158531,0.106722


## Interpretation

After the dimensionality reduction with PCA, we obtained a much smaller number of columns in the matrix: instead of 1150 columns we now have 86, where the number of rows stays the same: 62423 (this is the fixed number of all movies in the catalogue: each movie corresponds to one row and is explained as a dense vector composed of genres columns and Principal Components obtained from the PCA step). 

With this done, we can now prepared the temporal split of the data for training, testing and continuous buffer for the ML model for recommendation system. In order to avoid any random splitting and potential data leakage, we will use the timestamp a splitting criterion.

### Save artifacts and create 70/20/10 temporal split
We persist compact artifacts (Parquet) and split `ratings_movies` by `datetime` chronologically: oldest 70% → train, next 20% → test, most recent 10% → buffer.

In [25]:
# Save artifacts and perform temporal split
import os

out_dir = "../prepared"
os.makedirs(out_dir, exist_ok=True)

In [26]:
# Save main artifacts (use Parquet if available; fallback to CSV)
def _save(df, path_parquet, path_csv):
    try:
        df.to_parquet(path_parquet, index=False)
        print(f"Saved: {path_parquet}")
    except Exception as e:
        print(f"Parquet save failed ({e}); saving CSV instead.")
        df.to_csv(path_csv, index=False)
        print(f"Saved: {path_csv}")

_save(ratings_movies, os.path.join(out_dir, "ratings_movies.parquet"), os.path.join(out_dir, "ratings_movies.csv"))
_save(movie_features_uni, os.path.join(out_dir, "movie_features_uni.parquet"), os.path.join(out_dir, "movie_features_uni.csv"))
if 'movie_embeddings' in globals():
    _save(movie_embeddings, os.path.join(out_dir, "movie_embeddings.parquet"), os.path.join(out_dir, "movie_embeddings.csv"))
if 'movie_features_small' in globals():
    _save(movie_features_small, os.path.join(out_dir, "movie_features_small.parquet"), os.path.join(out_dir, "movie_features_small.csv"))


Saved: ../prepared\ratings_movies.parquet
Saved: ../prepared\movie_features_uni.parquet
Saved: ../prepared\movie_embeddings.parquet
Saved: ../prepared\movie_features_small.parquet


In [27]:
# Temporal split 70/20/10 by datetime (global chronological order)
ratings_sorted = ratings_movies.sort_values('datetime')
N = len(ratings_sorted)
train_end = int(N * 0.70)
test_end = int(N * 0.90)

ratings_train = ratings_sorted.iloc[:train_end]
ratings_test = ratings_sorted.iloc[train_end:test_end]
ratings_buffer = ratings_sorted.iloc[test_end:]

print("Split sizes:", len(ratings_train), len(ratings_test), len(ratings_buffer))
print("Date ranges:")
print("train:", ratings_train['datetime'].min(), "→", ratings_train['datetime'].max())
print("test:", ratings_test['datetime'].min(), "→", ratings_test['datetime'].max())
print("buffer:", ratings_buffer['datetime'].min(), "→", ratings_buffer['datetime'].max())

Split sizes: 17500066 5000019 2500010
Date ranges:
train: 1995-01-09 11:46:49 → 2015-01-22 16:42:53
test: 2015-01-22 16:43:05 → 2018-01-04 02:44:50
buffer: 2018-01-04 02:45:08 → 2019-11-21 09:15:03


In [28]:
# Save splits
_save(ratings_train, os.path.join(out_dir, "ratings_train.parquet"), os.path.join(out_dir, "ratings_train.csv"))
_save(ratings_test, os.path.join(out_dir, "ratings_test.parquet"), os.path.join(out_dir, "ratings_test.csv"))
_save(ratings_buffer, os.path.join(out_dir, "ratings_buffer.parquet"), os.path.join(out_dir, "ratings_buffer.csv"))

Saved: ../prepared\ratings_train.parquet
Saved: ../prepared\ratings_test.parquet
Saved: ../prepared\ratings_buffer.parquet


## Quick Preview of Saved Artifacts
Below previews read only a few rows from Parquet/CSV to avoid loading full datasets into memory.

In [29]:
# Preview saved files: read small heads safely (Parquet/CSV)
# Parquet helpers
try:
    import pyarrow.parquet as pq
except Exception:
    pq = None

In [30]:
# Row-limited Parquet/CSV heads without full file load

def parquet_head(path, n=5, columns=None):
    if pq is None:
        print(f"pyarrow.parquet not available; skipping Parquet head for {path}")
        return None
    try:
        pf = pq.ParquetFile(path)
        dfs = []
        rows_collected = 0
        for rg in range(pf.num_row_groups):
            table = pf.read_row_group(rg, columns=columns)
            if table.num_rows == 0:
                continue
            take = min(n - rows_collected, table.num_rows)
            if take <= 0:
                break
            dfs.append(table.slice(0, take).to_pandas())
            rows_collected += take
            if rows_collected >= n:
                break
        if rows_collected == 0:
            # Empty file or no rows read; return empty frame with schema columns
            cols = columns if columns is not None else pf.schema.names
            return pd.DataFrame(columns=cols)
        return pd.concat(dfs, ignore_index=True)
    except Exception as e:
        print(f"Parquet head failed for {path}: {e}")
        return None


def csv_head(path, n=5, usecols=None):
    try:
        return pd.read_csv(path, nrows=n, usecols=usecols)
    except Exception as e:
        print(f"CSV head failed for {path}: {e}")
        return None

pairs = [
    ("ratings_train",   "../prepared/ratings_train.parquet",   "../prepared/ratings_train.csv",   ["userId","movieId","rating","datetime","title","genres"]),
    ("ratings_test",    "../prepared/ratings_test.parquet",    "../prepared/ratings_test.csv",    ["userId","movieId","rating","datetime","title","genres"]),
    ("ratings_buffer",  "../prepared/ratings_buffer.parquet",  "../prepared/ratings_buffer.csv",  ["userId","movieId","rating","datetime","title","genres"]),
    ("movie_features_small", "../prepared/movie_features_small.parquet", "../prepared/movie_features_small.csv", None),
    ("movie_features_uni",   "../prepared/movie_features_uni.parquet",   "../prepared/movie_features_uni.csv",   None),
    ("movie_embeddings",     "../prepared/movie_embeddings.parquet",     "../prepared/movie_embeddings.csv",     None),
]

for name, p_parq, p_csv, cols in pairs:
    print("\n===", name, "===")
    df = None
    src = None

    if os.path.exists(p_parq):
        df = parquet_head(p_parq, n=5, columns=cols)
        src = p_parq
    if df is None and os.path.exists(p_csv):
        df = csv_head(p_csv, n=5, usecols=cols)
        src = p_csv

    if df is not None:
        print(f"source: {src}")
        display(df)
    else:
        print(f"No preview available (files missing or reader not available): {p_parq} | {p_csv}")

# Additionally, show a compact column sample for movie_features_small if present
mfs_parq = "../prepared/movie_features_small.parquet"
if os.path.exists(mfs_parq) and pq is not None:
    try:
        sample_cols = ["movieId","title","g_emb_0","g_emb_1"]
        head_df = parquet_head(mfs_parq, n=5, columns=sample_cols)
        if head_df is not None:
            print("\nmovie_features_small sample columns:")
            display(head_df)
    except Exception as e:
        print(f"Extra sample failed: {e}")


=== ratings_train ===
source: ../prepared/ratings_train.parquet


Unnamed: 0,userId,movieId,rating,datetime,title,genres
0,2262,21,3.0,1995-01-09 11:46:49,Get Shorty (1995),Comedy|Crime|Thriller
1,2262,1079,3.0,1995-01-09 11:46:49,"Fish Called Wanda, A (1988)",Comedy|Crime
2,2262,47,5.0,1995-01-09 11:46:49,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
3,102689,1,4.0,1996-01-29 00:00:00,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
4,102689,39,5.0,1996-01-29 00:00:00,Clueless (1995),Comedy|Romance



=== ratings_test ===
source: ../prepared/ratings_test.parquet


Unnamed: 0,userId,movieId,rating,datetime,title,genres
0,84815,48394,4.0,2015-01-22 16:43:05,"Pan's Labyrinth (Laberinto del fauno, El) (2006)",Drama|Fantasy|Thriller
1,84815,76251,4.0,2015-01-22 16:43:08,Kick-Ass (2010),Action|Comedy
2,84815,60684,5.0,2015-01-22 16:43:29,Watchmen (2009),Action|Drama|Mystery|Sci-Fi|Thriller|IMAX
3,133589,88810,3.5,2015-01-22 16:43:43,"Help, The (2011)",Drama
4,129879,34405,4.0,2015-01-22 16:43:48,Serenity (2005),Action|Adventure|Sci-Fi



=== ratings_buffer ===
source: ../prepared/ratings_buffer.parquet


Unnamed: 0,userId,movieId,rating,datetime,title,genres
0,8780,164179,5.0,2018-01-04 02:45:08,Arrival (2016),Sci-Fi
1,8780,134130,5.0,2018-01-04 02:45:12,The Martian (2015),Adventure|Drama|Sci-Fi
2,8780,70286,5.0,2018-01-04 02:45:17,District 9 (2009),Mystery|Sci-Fi|Thriller
3,115638,75341,4.0,2018-01-04 02:45:25,Remember Me (2010),Drama|Romance
4,115638,5564,4.0,2018-01-04 02:45:41,Swimfan (2002),Thriller



=== movie_features_small ===
source: ../prepared/movie_features_small.parquet


Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,g_emb_54,g_emb_55,g_emb_56,g_emb_57,g_emb_58,g_emb_59,g_emb_60,g_emb_61,g_emb_62,g_emb_63
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,-0.051027,0.099649,-0.244833,0.003951,-0.136712,0.325241,0.07195,0.151897,-0.005155,-0.211419
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0.080299,0.07776,-0.001979,-0.169908,-0.32837,-0.155115,-0.324047,0.22915,0.065371,0.075309
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,-0.003522,-0.255299,-0.047662,0.043895,0.025116,-0.006282,0.120842,0.127993,-0.087254,-0.039121
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,-0.316506,0.234975,0.107907,0.136268,0.067956,0.319556,0.008732,0.139272,-0.150812,0.039239
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0.26296,-0.265437,0.022199,0.15288,0.16735,0.148737,0.113985,0.046422,-0.158531,0.106722



=== movie_features_uni ===
source: ../prepared/movie_features_uni.parquet


Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,1119,1120,1121,1122,1123,1124,1125,1126,1127,1128
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,0.0405,0.01425,0.0305,0.035,0.14125,0.05775,0.039,0.02975,0.08475,0.022
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0.0525,0.01575,0.0125,0.02,0.12225,0.03275,0.021,0.011,0.10525,0.01975
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,0.06275,0.0195,0.02225,0.023,0.122,0.03475,0.017,0.018,0.091,0.01775
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,0.05325,0.028,0.01675,0.03875,0.182,0.0705,0.01625,0.01425,0.0885,0.015
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0.0535,0.0205,0.01425,0.0255,0.19225,0.02675,0.01625,0.013,0.087,0.016



=== movie_embeddings ===
source: ../prepared/movie_embeddings.parquet


Unnamed: 0,movieId,g_emb_0,g_emb_1,g_emb_2,g_emb_3,g_emb_4,g_emb_5,g_emb_6,g_emb_7,g_emb_8,...,g_emb_54,g_emb_55,g_emb_56,g_emb_57,g_emb_58,g_emb_59,g_emb_60,g_emb_61,g_emb_62,g_emb_63
0,1,5.691517,1.157252,-2.299336,0.432646,1.473964,1.740654,0.675731,1.929917,-0.53081,...,-0.051027,0.099649,-0.244833,0.003951,-0.136712,0.325241,0.07195,0.151897,-0.005155,-0.211419
1,2,3.842368,2.39897,-1.019806,0.010039,1.318514,0.216763,0.049387,1.165664,0.32626,...,0.080299,0.07776,-0.001979,-0.169908,-0.32837,-0.155115,-0.324047,0.22915,0.065371,0.075309
2,3,2.881668,1.250843,-0.641893,-0.263697,-0.496168,0.352696,-0.112907,-0.016498,0.080669,...,-0.003522,-0.255299,-0.047662,0.043895,0.025116,-0.006282,0.120842,0.127993,-0.087254,-0.039121
3,4,2.781194,0.691663,-1.035577,-0.691427,-0.678014,-0.502191,-0.735466,0.146584,0.280361,...,-0.316506,0.234975,0.107907,0.136268,0.067956,0.319556,0.008732,0.139272,-0.150812,0.039239
4,5,2.862795,1.388638,-1.272278,-0.620198,-0.514246,0.082961,-0.343221,0.313066,0.034632,...,0.26296,-0.265437,0.022199,0.15288,0.16735,0.148737,0.113985,0.046422,-0.158531,0.106722



movie_features_small sample columns:


Unnamed: 0,movieId,title,g_emb_0,g_emb_1
0,1,Toy Story (1995),5.691517,1.157252
1,2,Jumanji (1995),3.842368,2.39897
2,3,Grumpier Old Men (1995),2.881668,1.250843
3,4,Waiting to Exhale (1995),2.781194,0.691663
4,5,Father of the Bride Part II (1995),2.862795,1.388638


## Summary: Files for ML Model Training

### Overview
The preparation notebook has generated **clean, ready-to-use artifacts** for training baseline and hybrid recommendation models. All files are saved in the `prepared/` directory in both Parquet (preferred) and CSV formats.

---

### Core Artifacts

#### 1. **Interaction Data** (for baseline and hybrid models)
- **`ratings_train.parquet`** (70% of data, oldest)
  - Use as primary training set for all models (baseline + hybrid).
  - Columns: `userId`, `movieId`, `rating`, `timestamp`, `datetime`, `year`, `liked`, `title`, `genres`.
  - Size: ~17.5M rows.

- **`ratings_test.parquet`** (20% of data, middle)
  - Use for evaluation and hyperparameter tuning.
  - Same structure as training set.
  - Size: ~5M rows.

- **`ratings_buffer.parquet`** (10% of data, most recent)
  - Simulates new incoming data post-deployment.
  - Use for continuous monitoring, model drift detection, and retraining pipelines.
  - Size: ~2.5M rows.

---

#### 2. **Item Features** (for hybrid models)

**Recommended:** Use `movie_features_small.parquet` (optimized for training).

- **`movie_features_small.parquet`** ⭐
  - One row per movie.
  - Columns: `movieId`, `title`, genre one-hot columns (int8), PCA embeddings (`g_emb_0`...`g_emb_63`).
  - Size: ~62k rows × 86 cols (compact, memory-efficient).
  - Explained variance from PCA: ~90%+ of genome tag information retained.
  - **Use case:** Load into memory once; join by `movieId` at batch time in your dataloader.

- **`movie_embeddings.parquet`** (reference only)
  - Just the PCA-reduced genome embeddings keyed by `movieId`.
  - Useful if you want to replace genome features post-hoc or analyze embeddings.

- **`movie_features_uni.parquet`** (full features, reference)
  - One row per movie with genres + all 1,128 raw genome tag columns.
  - Use only for exploratory analysis; too large for direct training merges.

---

### How to Use for Training

#### **Baseline Model** (Collaborative Filtering)
```
1. Load ratings_train.parquet
2. Extract: userId, movieId, rating (or liked for binary classification)
3. Split further (e.g., train/val) if needed
4. Train CF model (SVD, ALS, NMF, etc.)
5. Evaluate on ratings_test.parquet
6. Monitor on ratings_buffer.parquet for drift
```

#### **Hybrid Model** (Collaborative + Content-Based)
```
1. Load ratings_train.parquet → interactions
2. Load movie_features_small.parquet → item features (keep in memory)
3. In your dataloader:
   a. Batch from ratings_train
   b. For each movieId, join from movie_features_small
   c. Concatenate: [userId_embedding, rating, movie_genres, g_emb_0...g_emb_63]
4. Train neural net (e.g., MLP, RNN) on enriched features
5. Evaluate on ratings_test.parquet with same join logic
6. Monitor on ratings_buffer.parquet
```

---

### Key Decisions Made

| Decision | Rationale |
|----------|-----------|
| **25M interactions (not 68M)** | Avoided row explosion by merging ratings with base movies (non-exploded genres). Exploded genres are available in `movie_features` for reference. |
| **Separate item features** | Prevented OOM: 25M × 1.1k tag cols would require ~110 GB. Instead, keep `movie_features_small` (~500 MB) in memory and join per-batch. |
| **PCA to 64 dims** | Reduced genome columns from 1,128 → 64 embeddings; retained ~90% variance; 5× smaller memory footprint. |
| **Temporal 70/20/10 split** | Ensures no time-leakage; buffer simulates production drift. |

---

### Next Steps for ML Training

1. **Load** `ratings_train.parquet` and `movie_features_small.parquet`.
2. **Baseline:** Train a simple CF model (SVD, for quick wins).
3. **Hybrid:** Add movie embeddings to baseline; compare performance.
4. **Tuning:** Experiment with embedding dimensions, regularization, etc.
5. **Monitoring:** Evaluate on `ratings_test.parquet`; detect drift on `ratings_buffer.parquet`.
6. **Deployment:** Set up re-training pipeline using recent buffer data.

---

### File Sizes (approx.)
- `ratings_train.parquet`: ~1.5 GB
- `ratings_test.parquet`: ~430 MB
- `ratings_buffer.parquet`: ~215 MB
- `movie_features_small.parquet`: ~500 MB
- **Total:** ~2.6 GB (easily fits on modern machines)

Good luck with model training! 🚀


## Additional Preparation for Neon Database (Free Tier)

### Storage Constraint Workaround

Since we're using Neon's free tier (512 MB storage limit), we cannot upload the full 5M `ratings_test.parquet` dataset. Instead, we'll create a **reduced initial dataset** with **1M most recent rows** for initial model training and testing.

This reduced dataset will be used to:
- Train the initial baseline model
- Test the MLOps pipeline (Airflow + monitoring)
- Validate the system architecture

Once validated, the full dataset can be used with a paid tier or alternative storage solution.

### Step 1: Extract 1M Most Recent Rows from ratings_test.parquet

In [31]:
# Load the training dataset for initial ML data extraction
print("📊 Loading ratings_train.parquet...")
df_train_full = pd.read_parquet("../prepared/ratings_train.parquet")
print(f"   Full dataset: {len(df_train_full):,} rows")

# Filter data from 2005 to 2010 for drift simulation
print("\n🔍 Filtering data from 2005-2010...")
df_filtered = df_train_full[(df_train_full['year'] >= 2005) & (df_train_full['year'] <= 2010)].copy()
print(f"   Rows in 2005-2010 period: {len(df_filtered):,}")

# Sort by timestamp (ascending) to prioritize earlier years if needed
df_filtered = df_filtered.sort_values('timestamp')

# Take up to 1M rows (staying closer to 2005 if more data exists)
TARGET_SIZE = 1_000_000
if len(df_filtered) > TARGET_SIZE:
    print(f"\n⚠️  More than {TARGET_SIZE:,} rows found. Taking first {TARGET_SIZE:,} rows (closest to 2005)...")
    df_initial_ml = df_filtered.head(TARGET_SIZE).copy()
else:
    print(f"\n✅ Using all {len(df_filtered):,} rows from 2005-2010 period")
    df_initial_ml = df_filtered.copy()

print(f"\n📈 Sample statistics:")
print(f"   Sample size: {len(df_initial_ml):,} rows")
print(f"   Timestamp range: {df_initial_ml['timestamp'].min()} to {df_initial_ml['timestamp'].max()}")
print(f"   Date range: {df_initial_ml['datetime'].min()} to {df_initial_ml['datetime'].max()}")
print(f"   Year distribution: {df_initial_ml['year'].value_counts().sort_index().to_dict()}")

# Save to parquet
output_path = "../prepared/ratings_initial_ml.parquet"
df_initial_ml.to_parquet(output_path, index=False)
print(f"\n✅ Saved to: {output_path}")
print(f"   Shape: {df_initial_ml.shape}")
print(f"   Columns: {df_initial_ml.columns.tolist()}")
print(f"\n💡 This historical data (2005-2010) will enable drift detection when compared to recent data")

📊 Loading ratings_train.parquet...
   Full dataset: 17,500,066 rows

🔍 Filtering data from 2005-2010...
   Rows in 2005-2010 period: 6,204,004

⚠️  More than 1,000,000 rows found. Taking first 1,000,000 rows (closest to 2005)...

📈 Sample statistics:
   Sample size: 1,000,000 rows
   Timestamp range: 1104537611 to 1119523759
   Date range: 2005-01-01 00:00:11 to 2005-06-23 10:49:19
   Year distribution: {2005: 1000000}

✅ Saved to: ../prepared/ratings_initial_ml.parquet
   Shape: (1000000, 9)
   Columns: ['userId', 'movieId', 'rating', 'timestamp', 'datetime', 'year', 'liked', 'title', 'genres']

💡 This historical data (2005-2010) will enable drift detection when compared to recent data


### Step 2: Prepare Buffer Batches for Airflow Ingestion

In [32]:
# Prepare 6 temporal buffer batches for data drift simulation
print("📦 Creating 6 temporal buffer batches for drift demonstration...\n")

buffer_output_dir = Path("../prepared/buffer_batches")
buffer_output_dir.mkdir(exist_ok=True)

BATCH_SIZE = 100_000

# Define batch configurations: (source_file, end_year, batch_name)
batch_configs = [
    ("../prepared/ratings_train.parquet", 2005, "batch_w1"),
    ("../prepared/ratings_train.parquet", 2008, "batch_w2"),
    ("../prepared/ratings_train.parquet", 2011, "batch_w3"),
    ("../prepared/ratings_train.parquet", 2014, "batch_w4"),
    ("../prepared/ratings_test.parquet", 2017, "batch_w5"),
    ("../prepared/ratings_buffer.parquet", 2019, "batch_w6"),
]

for source_file, end_year, batch_name in batch_configs:
    print(f"{'='*70}")
    print(f"Processing {batch_name}: data ending {end_year}")
    print(f"{'='*70}")
    
    # Load source data
    df_source = pd.read_parquet(source_file)
    print(f"   Loaded {source_file.split('/')[-1]}: {len(df_source):,} rows")
    
    # Filter data up to end_year
    df_filtered = df_source[df_source['year'] <= end_year].copy()
    print(f"   Rows up to {end_year}: {len(df_filtered):,}")
    
    # Sort by timestamp descending and take the most recent 100k
    df_filtered = df_filtered.sort_values('timestamp', ascending=False)
    batch = df_filtered.head(BATCH_SIZE).copy()
    
    # Re-sort in ascending order for proper temporal sequence
    batch = batch.sort_values('timestamp')
    
    # Add metadata columns
    batch['batch_id'] = batch_name
    batch['ingested_at'] = None  # Will be set by Airflow when ingested
    
    # Save batch
    batch_path = buffer_output_dir / f"{batch_name}.parquet"
    batch.to_parquet(batch_path, index=False)
    
    # Display timestamp range
    print(f"   ✅ Saved {batch_path.name}: {len(batch):,} rows")
    print(f"   📅 Timestamp range: {batch['timestamp'].min()} → {batch['timestamp'].max()}")
    print(f"   📅 Date range: {batch['datetime'].min()} → {batch['datetime'].max()}")
    print(f"   📊 Year distribution: {batch['year'].value_counts().sort_index().to_dict()}")
    print()

print(f"{'='*70}")
print(f"✅ All 6 buffer batches saved in: {buffer_output_dir}")
print(f"   Total rows across all batches: {6 * BATCH_SIZE:,}")
print(f"\n💡 Temporal progression enables data drift demonstration:")

📦 Creating 6 temporal buffer batches for drift demonstration...

Processing batch_w1: data ending 2005
   Loaded ratings_train.parquet: 17,500,066 rows
   Rows up to 2005: 10,540,240
   ✅ Saved batch_w1.parquet: 100,000 rows
   📅 Timestamp range: 1133120782 → 1136073599
   📅 Date range: 2005-11-27 19:46:22 → 2005-12-31 23:59:59
   📊 Year distribution: {2005: 100000}

Processing batch_w2: data ending 2008
   Loaded ratings_train.parquet: 17,500,066 rows
   Rows up to 2008: 13,528,131
   ✅ Saved batch_w2.parquet: 100,000 rows
   📅 Timestamp range: 1227566172 → 1230767147
   📅 Date range: 2008-11-24 22:36:12 → 2008-12-31 23:45:47
   📊 Year distribution: {2008: 100000}

Processing batch_w3: data ending 2011
   Loaded ratings_train.parquet: 17,500,066 rows
   Rows up to 2011: 15,807,192
   ✅ Saved batch_w3.parquet: 100,000 rows
   📅 Timestamp range: 1319756045 → 1325375995
   📅 Date range: 2011-10-27 22:54:05 → 2011-12-31 23:59:55
   📊 Year distribution: {2011: 100000}

Processing batch_w4:

In [33]:
# Preview each batch to verify timestamp ranges
print("📋 Preview of each buffer batch:\n")
for i in range(1, 7):  # 6 batches now
    batch_path = buffer_output_dir / f"batch_w{i}.parquet"
    if batch_path.exists():
        batch_preview = pd.read_parquet(batch_path)
        print(f"{'='*60}")
        print(f"Batch {i} ({batch_path.name})")
        print(f"{'='*60}")
        print(f"Shape: {batch_preview.shape}")
        print(f"Timestamp range: {batch_preview['timestamp'].min()} → {batch_preview['timestamp'].max()}")
        print(f"Date range: {batch_preview['datetime'].min()} → {batch_preview['datetime'].max()}")
        print(f"Year distribution: {batch_preview['year'].value_counts().sort_index().to_dict()}")
        print("\nFirst 5 rows:")
        display(batch_preview[['userId', 'movieId', 'rating', 'timestamp', 'datetime', 'year', 'batch_id']].head())
        print("\n")

📋 Preview of each buffer batch:

Batch 1 (batch_w1.parquet)
Shape: (100000, 11)
Timestamp range: 1133120782 → 1136073599
Date range: 2005-11-27 19:46:22 → 2005-12-31 23:59:59
Year distribution: {2005: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,160338,1385,4.0,1133120782,2005-11-27 19:46:22,2005,batch_w1
1,160338,5218,5.0,1133120790,2005-11-27 19:46:30,2005,batch_w1
2,160338,3868,3.5,1133120797,2005-11-27 19:46:37,2005,batch_w1
3,160338,1223,4.0,1133120806,2005-11-27 19:46:46,2005,batch_w1
4,160338,2394,3.0,1133120835,2005-11-27 19:47:15,2005,batch_w1




Batch 2 (batch_w2.parquet)
Shape: (100000, 11)
Timestamp range: 1227566172 → 1230767147
Date range: 2008-11-24 22:36:12 → 2008-12-31 23:45:47
Year distribution: {2008: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,126324,1259,4.5,1227566172,2008-11-24 22:36:12,2008,batch_w2
1,126324,58998,4.0,1227566192,2008-11-24 22:36:32,2008,batch_w2
2,126324,8784,3.5,1227566205,2008-11-24 22:36:45,2008,batch_w2
3,126324,1242,4.0,1227566211,2008-11-24 22:36:51,2008,batch_w2
4,24186,5349,3.5,1227566217,2008-11-24 22:36:57,2008,batch_w2




Batch 3 (batch_w3.parquet)
Shape: (100000, 11)
Timestamp range: 1319756045 → 1325375995
Date range: 2011-10-27 22:54:05 → 2011-12-31 23:59:55
Year distribution: {2011: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,92061,5941,3.5,1319756045,2011-10-27 22:54:05,2011,batch_w3
1,92061,7317,4.0,1319756051,2011-10-27 22:54:11,2011,batch_w3
2,92061,78209,5.0,1319756061,2011-10-27 22:54:21,2011,batch_w3
3,92061,76077,4.5,1319756079,2011-10-27 22:54:39,2011,batch_w3
4,92061,4299,4.5,1319756092,2011-10-27 22:54:52,2011,batch_w3




Batch 4 (batch_w4.parquet)
Shape: (100000, 11)
Timestamp range: 1416146932 → 1420069793
Date range: 2014-11-16 14:08:52 → 2014-12-31 23:49:53
Year distribution: {2014: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,30933,106782,2.5,1416146932,2014-11-16 14:08:52,2014,batch_w4
1,30933,110730,0.5,1416146955,2014-11-16 14:09:15,2014,batch_w4
2,30933,593,0.5,1416147032,2014-11-16 14:10:32,2014,batch_w4
3,30933,47,2.5,1416147036,2014-11-16 14:10:36,2014,batch_w4
4,30933,1270,2.5,1416147040,2014-11-16 14:10:40,2014,batch_w4




Batch 5 (batch_w5.parquet)
Shape: (100000, 11)
Timestamp range: 1512597887 → 1514764799
Date range: 2017-12-06 22:04:47 → 2017-12-31 23:59:59
Year distribution: {2017: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,117192,4504,3.5,1512597887,2017-12-06 22:04:47,2017,batch_w5
1,117192,2374,3.0,1512597887,2017-12-06 22:04:47,2017,batch_w5
2,117192,2375,2.5,1512597887,2017-12-06 22:04:47,2017,batch_w5
3,117192,208,3.5,1512597887,2017-12-06 22:04:47,2017,batch_w5
4,117192,3390,3.0,1512597887,2017-12-06 22:04:47,2017,batch_w5




Batch 6 (batch_w6.parquet)
Shape: (100000, 11)
Timestamp range: 1571550775 → 1574327703
Date range: 2019-10-20 05:52:55 → 2019-11-21 09:15:03
Year distribution: {2019: 100000}

First 5 rows:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,57688,128360,4.5,1571550775,2019-10-20 05:52:55,2019,batch_w6
1,57688,152077,3.5,1571550780,2019-10-20 05:53:00,2019,batch_w6
2,57688,106920,4.0,1571550815,2019-10-20 05:53:35,2019,batch_w6
3,57688,111,4.0,1571550819,2019-10-20 05:53:39,2019,batch_w6
4,57688,185029,4.0,1571550822,2019-10-20 05:53:42,2019,batch_w6






In [34]:
# Create drift simulation batch (batch_w7) for MLOps demonstration
print("🔬 Creating drift simulation batch for MLOps pipeline demonstration...\n")

# Load the last batch as canvas
batch_w6_path = buffer_output_dir / "batch_w6.parquet"
df_drift = pd.read_parquet(batch_w6_path)
print(f"📁 Loaded {batch_w6_path.name}: {len(df_drift):,} rows")
print(f"   Original timestamp range: {df_drift['timestamp'].min()} → {df_drift['timestamp'].max()}")
print(f"   Original date range: {df_drift['datetime'].min()} → {df_drift['datetime'].max()}")
print(f"   Original rating stats: min={df_drift['rating'].min()}, max={df_drift['rating'].max()}, mean={df_drift['rating'].mean():.2f}")

# 1. Simulate timestamp shift to 2020 (add ~1 year = 365 * 24 * 3600 seconds)
ONE_YEAR_SECONDS = 365 * 24 * 60 * 60
df_drift['timestamp'] = df_drift['timestamp'] + ONE_YEAR_SECONDS
df_drift['datetime'] = pd.to_datetime(df_drift['timestamp'], unit='s')
df_drift['year'] = df_drift['datetime'].dt.year

# 2. Simulate rating drift: add +1.0 but cap at 5.0
df_drift['rating'] = df_drift['rating'].apply(lambda x: min(x + 1.0, 5.0))

# 3. Update batch metadata
df_drift['batch_id'] = 'batch_w7'

# 4. Save the drift simulation batch
batch_w7_path = buffer_output_dir / "batch_w7.parquet"
df_drift.to_parquet(batch_w7_path, index=False)

print(f"\n✅ Created drift simulation batch:")
print(f"   Saved: {batch_w7_path.name}")
print(f"   Shape: {df_drift.shape}")
print(f"   New timestamp range: {df_drift['timestamp'].min()} → {df_drift['timestamp'].max()}")
print(f"   New date range: {df_drift['datetime'].min()} → {df_drift['datetime'].max()}")
print(f"   New rating stats: min={df_drift['rating'].min()}, max={df_drift['rating'].max()}, mean={df_drift['rating'].mean():.2f}")
print(f"   Year distribution: {df_drift['year'].value_counts().sort_index().to_dict()}")

print(f"\n🎯 Drift simulation details:")
print(f"   ⏰ Temporal shift: ~1 year forward (2019 → 2020)")
print(f"   📈 Rating drift: +1.0 added to each rating (capped at 5.0)")
print(f"   🔍 This will trigger drift detection in your MLOps pipeline!")

# Preview the drift batch
print(f"\n📋 Preview of drift simulation batch:")
display(df_drift[['userId', 'movieId', 'rating', 'timestamp', 'datetime', 'year', 'batch_id']].head())

🔬 Creating drift simulation batch for MLOps pipeline demonstration...

📁 Loaded batch_w6.parquet: 100,000 rows
   Original timestamp range: 1571550775 → 1574327703
   Original date range: 2019-10-20 05:52:55 → 2019-11-21 09:15:03
   Original rating stats: min=0.5, max=5.0, mean=3.60

✅ Created drift simulation batch:
   Saved: batch_w7.parquet
   Shape: (100000, 11)
   New timestamp range: 1603086775 → 1605863703
   New date range: 2020-10-19 05:52:55 → 2020-11-20 09:15:03
   New rating stats: min=1.5, max=5.0, mean=4.40
   Year distribution: {2020: 100000}

🎯 Drift simulation details:
   ⏰ Temporal shift: ~1 year forward (2019 → 2020)
   📈 Rating drift: +1.0 added to each rating (capped at 5.0)
   🔍 This will trigger drift detection in your MLOps pipeline!

📋 Preview of drift simulation batch:


Unnamed: 0,userId,movieId,rating,timestamp,datetime,year,batch_id
0,57688,128360,5.0,1603086775,2020-10-19 05:52:55,2020,batch_w7
1,57688,152077,4.5,1603086780,2020-10-19 05:53:00,2020,batch_w7
2,57688,106920,5.0,1603086815,2020-10-19 05:53:35,2020,batch_w7
3,57688,111,5.0,1603086819,2020-10-19 05:53:39,2020,batch_w7
4,57688,185029,5.0,1603086822,2020-10-19 05:53:42,2020,batch_w7


### Summary: Files Ready for Neon Transfer

The following files are now prepared and ready to be transferred to Neon database:

1. **`ratings_initial_ml.parquet`** (1M rows)
   - Reduced dataset for initial model training/testing
   - Most recent ratings from the test set
   - Will be split 70/30 for train/test in the transfer script

2. **`buffer_batches/batch_w{1-5}.parquet`** (5 weekly batches)
   - Prepared for Airflow ingestion pipeline
   - Each batch simulates one week of new data
   - Ready to be ingested incrementally for monitoring and drift detection

3. **`movie_features_small.parquet`**
   - Movie metadata and features
   - Already prepared in previous sections

**Next Step:** Run `initial_load_lighter_dataset.py` to transfer these prepared files to Neon database.