In [1]:
import os
import math
import copy
from itertools import zip_longest

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch import optim

In [2]:
def set_random_seed(state=1):
    gens = (np.random.seed, torch.manual_seed, torch.cuda.manual_seed)
    for set_state in gens:
        set_state(state)

In [3]:
RANDOM_STATE = 42
set_random_seed(RANDOM_STATE)

In [4]:
DATASET_LINK='https://files.grouplens.org/datasets/movielens/ml-32m.zip'

In [5]:
import urllib.request
import zipfile

zip_filename = DATASET_LINK.split('/')[-1]
'''This script downloads the MovieLens 32M dataset if it is not already present, and extracts it
from the zip file. The dataset is used for building recommendation systems.

# Download the dataset if not already present
if not os.path.exists(zip_filename):
	urllib.request.urlretrieve(DATASET_LINK, zip_filename)

# Unzip the file if not already extracted
with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
	zip_ref.extractall()'''

"This script downloads the MovieLens 32M dataset if it is not already present, and extracts it\nfrom the zip file. The dataset is used for building recommendation systems.\n\n# Download the dataset if not already present\nif not os.path.exists(zip_filename):\n\turllib.request.urlretrieve(DATASET_LINK, zip_filename)\n\n# Unzip the file if not already extracted\nwith zipfile.ZipFile(zip_filename, 'r') as zip_ref:\n\tzip_ref.extractall()"

In [6]:
def read_data(path):
    files = {}
    for filename in os.listdir(path):
        stem, suffix =  os.path.splitext(filename)
        file_path = os.path.join(path,filename)
        print(filename)
        if suffix == '.csv':
            files[stem] = pd.read_csv(file_path)
        elif suffix == '.dat':
            if stem == 'ratings':
                columns = ['userId', 'movieId', 'rating', 'timestamp']
            else:
                columns = ['movieId', 'title', 'genres']
            data = pd.read_csv(file_path, sep='::', names=columns, engine='python')
            files[stem] = data
    return files['ratings'], files['movies']

In [7]:
ratings, movies = read_data('ml-32m')

checksums.txt
links.csv
movies.csv
ratings.csv
README.txt
tags.csv


In [8]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,17,4.0,944249077
1,1,25,1.0,944250228
2,1,29,2.0,943230976
3,1,30,5.0,944249077
4,1,32,5.0,943228858


In [9]:
minmax = ratings.rating.min(), ratings.rating.max()
print(f"Rating range: {minmax}")

Rating range: (0.5, 5.0)


In [10]:
movies.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [11]:
movies.drop_duplicates().shape

(87585, 3)

In [12]:
ratings = ratings.merge(movies[["movieId", "title"]], on="movieId")

In [13]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,title
0,1,17,4.0,944249077,Sense and Sensibility (1995)
1,3,17,5.0,1084485217,Sense and Sensibility (1995)
2,15,17,4.5,1289858271,Sense and Sensibility (1995)
3,28,17,4.0,961513829,Sense and Sensibility (1995)
4,29,17,4.0,845056111,Sense and Sensibility (1995)


In [14]:
def tabular_preview(ratings, n=15):
    """Creates a cross-tabular view of users vs movies."""
    
    user_groups = ratings.groupby('userId')['rating'].count()
    top_users = user_groups.sort_values(ascending=False)[:n]

    movie_groups = ratings.groupby('movieId')['rating'].count()
    top_movies = movie_groups.sort_values(ascending=False)[:n]

    top = (
        ratings.
        join(top_users, rsuffix='_r', how='inner', on='userId').
        join(top_movies, rsuffix='_r', how='inner', on='movieId'))

    return pd.crosstab(top.userId, top.movieId, top.rating, aggfunc=np.sum)

In [15]:
tabular_preview(ratings, n=15)

movieId,1,50,110,260,296,318,356,480,527,589,593,1196,2571,2959,4993
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
7858,5.0,5.0,3.5,4.5,5.0,5.0,2.0,3.0,5.0,3.5,2.5,5.0,5.0,4.0,5.0
10202,5.0,5.0,1.0,5.0,5.0,5.0,4.0,4.0,5.0,4.0,5.0,5.0,5.0,5.0,5.0
14674,3.0,4.5,5.0,5.0,2.0,2.5,5.0,2.0,3.5,5.0,3.0,5.0,4.0,3.5,5.0
17035,3.5,5.0,4.0,0.5,5.0,5.0,3.5,3.0,4.5,3.5,5.0,0.5,1.5,4.5,3.0
22744,5.0,4.0,4.0,5.0,5.0,3.0,4.0,5.0,0.5,5.0,5.0,5.0,5.0,5.0,5.0
49305,5.0,4.5,4.0,5.0,5.0,4.5,4.5,4.0,5.0,5.0,5.0,5.0,5.0,4.0,4.5
53192,,4.0,3.5,5.0,3.5,4.5,4.0,3.0,4.0,3.0,4.0,4.5,2.5,3.0,3.0
55653,4.0,4.5,5.0,4.0,4.5,4.0,4.0,3.0,4.0,4.0,4.0,4.0,4.0,3.0,4.0
57304,4.0,3.0,,2.5,4.0,2.5,,2.0,5.0,3.0,4.0,3.0,4.0,3.5,3.0
123465,5.0,2.5,2.0,3.5,5.0,5.0,4.0,2.0,4.5,3.0,4.0,4.5,2.0,4.5,5.0


In [16]:
def create_dataset(ratings, top=None):
    if top is not None:
        ratings.groupby('userId')['rating'].count()
    
    unique_users = ratings.userId.unique()
    user_to_index = {old: new for new, old in enumerate(unique_users)}
    new_users = ratings.userId.map(user_to_index)
    
    unique_movies = ratings.movieId.unique()
    movie_to_index = {old: new for new, old in enumerate(unique_movies)}
    new_movies = ratings.movieId.map(movie_to_index)
        
    n_users = unique_users.shape[0]
    n_movies = unique_movies.shape[0]
    
    X = pd.DataFrame({'user_id': new_users, 'movie_id': new_movies})
    y = ratings['rating'].astype(np.float32)
    return (n_users, n_movies), (X, y), (user_to_index, movie_to_index)

In [17]:
(n, m), (X, y), (user_to_index, movie_to_index) = create_dataset(ratings)
print(f'Embeddings: {n} users, {m} movies')
print(f'Dataset shape: {X.shape}')
print(f'Target shape: {y.shape}')

Embeddings: 200948 users, 84432 movies
Dataset shape: (32000204, 2)
Target shape: (32000204,)


# Creating the Dataloader

In [18]:
class ReviewsIterator:
    
    def __init__(self, X, y, batch_size=32, shuffle=True):
        X, y = np.asarray(X), np.asarray(y)
        
        if shuffle:
            index = np.random.permutation(X.shape[0])
            X, y = X[index], y[index]
            
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.n_batches = int(math.ceil(X.shape[0] // batch_size))
        self._current = 0
        
    def __iter__(self):
        return self
    
    def __next__(self):
        return self.next()
    
    def next(self):
        if self._current >= self.n_batches:
            raise StopIteration()
        k = self._current
        self._current += 1
        bs = self.batch_size
        return self.X[k*bs:(k + 1)*bs], self.y[k*bs:(k + 1)*bs]

In [19]:
def batches(X, y, bs=32, shuffle=True):
    for xb, yb in ReviewsIterator(X, y, bs, shuffle):
        xb = torch.LongTensor(xb)
        yb = torch.FloatTensor(yb)
        yield xb, yb.view(-1, 1) 

In [None]:
for x_batch, y_batch in batches(X, y, bs=4):
    print(x_batch)
    print(y_batch)
    break

tensor([[42214,   556],
        [30845,    65],
        [60687,   292],
        [ 2000,  1206]])
tensor([[3.5000],
        [4.0000],
        [5.0000],
        [2.0000]])


In [21]:
# Time-based split: 20% newest data for ETL pipeline testing
timestamp_threshold = ratings['timestamp'].quantile(0.8)
print(f"Timestamp threshold (80th percentile): {timestamp_threshold}")

# Split by timestamp
older_data_mask = ratings['timestamp'] <= timestamp_threshold
newer_data_mask = ratings['timestamp'] > timestamp_threshold

# Get indices for X and y
older_indices = X.index[older_data_mask]
newer_indices = X.index[newer_data_mask]

# Create newer dataset (20% most recent)
X_newer = X.loc[newer_indices].reset_index(drop=True)
y_newer = y.loc[newer_indices].reset_index(drop=True)

# Create older dataset (80% older data)
X_older = X.loc[older_indices].reset_index(drop=True)
y_older = y.loc[older_indices].reset_index(drop=True)

print(f"Newer data (for ETL pipeline): {len(X_newer):,} samples")
print(f"Older data (for train/val split): {len(X_older):,} samples")

# Now do normal train/test split on the older data
X_train, X_valid, y_train, y_valid = train_test_split(
   X_older, y_older, test_size=0.2, random_state=RANDOM_STATE
)

# Organize datasets
datasets = {
   'train': (X_train, y_train),
   'val': (X_valid, y_valid),
   'newer': (X_newer, y_newer)  # For ETL pipeline testing
}

dataset_sizes = {
'train': len(X_train),
   'val': len(X_valid),
   'newer': len(X_newer)
}

print(f"\nFinal split sizes:")
print(f"Training: {dataset_sizes['train']:,}")
print(f"Validation: {dataset_sizes['val']:,}")
print(f"Newer (ETL): {dataset_sizes['newer']:,}")
print(f"Total: {sum(dataset_sizes.values()):,}")

Timestamp threshold (80th percentile): 1538551302.0
Newer data (for ETL pipeline): 6,400,041 samples
Older data (for train/val split): 25,600,163 samples

Final split sizes:
Training: 20,480,130
Validation: 5,120,033
Newer (ETL): 6,400,041
Total: 32,000,204


# Building our RecSys Model

In [None]:
      class HybridVAE(nn.Module):
          """
          Hybrid Variational 
      Autoencoder + Embedding Network
       for Recommendation Systems
          
          Combines collaborative 
      filtering embeddings with 
      variational autoencoder
          to capture both user-item 
      interactions and generate new 
      recommendations.
          
          Args:
              n_users: Number of 
      unique users
              n_movies: Number of 
      unique movies  
              n_factors: Embedding 
      dimension
              hidden_dims: List of 
      hidden layer dimensions for 
      encoder/decoder
              latent_dim: Latent 
      space dimension
              dropout_rate: Dropout 
      rate for regularization
          """

          def __init__(self, n_users,
       n_movies, n_factors=150, 
      
      hidden_dims=[512, 256], 
      latent_dim=64, 
      
      dropout_rate=0.3):
              super(HybridVAE,
      self).__init__()

              # Embedding layers 
      (from original EmbeddingNet)
              self.user_embedding =
      nn.Embedding(n_users,
      n_factors)
              self.movie_embedding =
      nn.Embedding(n_movies,
      n_factors)
              self.embedding_dropout
      = nn.Dropout(dropout_rate *
      0.5)  # Lower dropout for 
      embeddings

              # Encoder network
              encoder_layers = []
              input_dim = n_factors *
       2  # Concatenated user + movie
       embeddings

              for hidden_dim in
      hidden_dims:

      encoder_layers.extend([

      nn.Linear(input_dim,
      hidden_dim),

      nn.BatchNorm1d(hidden_dim),
                      nn.ReLU(),

      nn.Dropout(dropout_rate)
                  ])
                  input_dim =
      hidden_dim

              self.encoder =
      nn.Sequential(*encoder_layers)

              # VAE latent space
              self.mu_layer =
      nn.Linear(hidden_dims[-1],
      latent_dim)
              self.logvar_layer =
      nn.Linear(hidden_dims[-1],
      latent_dim)

              # Decoder network  
              decoder_layers = []
              input_dim = latent_dim

              for hidden_dim in
      reversed(hidden_dims):

      decoder_layers.extend([

      nn.Linear(input_dim,
      hidden_dim),

      nn.BatchNorm1d(hidden_dim),
                      nn.ReLU(),

      nn.Dropout(dropout_rate)
                  ])
                  input_dim =
      hidden_dim

              # Final output layer
              decoder_layers.append(n
      n.Linear(input_dim, 1))
              self.decoder =
      nn.Sequential(*decoder_layers)

              # Initialize weights
              self._init_weights()

          def _init_weights(self):
              """Initialize weights 
      with proper scaling"""
              # Embedding 
      initialization
              nn.init.uniform_(self.u
      ser_embedding.weight, -0.05,
      0.05)
              nn.init.uniform_(self.m
      ovie_embedding.weight, -0.05,
      0.05)

              # Xavier initialization
       for linear layers
              for module in
      self.modules():
                  if
      isinstance(module, nn.Linear):
                      nn.init.xavier_
      uniform_(module.weight)
                      if module.bias
      is not None:

      nn.init.constant_(module.bias,
      0.01)

          def encode(self, users, 
      movies):
              """Encode user-movie 
      pairs to latent space"""
              # Get embeddings
              user_emb =
      self.user_embedding(users)
              movie_emb =
      self.movie_embedding(movies)

              # Concatenate and apply
       dropout
              features =
      torch.cat([user_emb,
      movie_emb], dim=1)
              features = self.embeddi
      ng_dropout(features)

              # Pass through encoder
              encoded =
      self.encoder(features)

              # Get mean and log 
      variance for latent space
              mu =
      self.mu_layer(encoded)
              logvar =
      self.logvar_layer(encoded)

              return mu, logvar

          def reparameterize(self, 
      mu, logvar):
              """Reparameterization 
      trick for VAE"""
              if self.training:
                  std = torch.exp(0.5
       * logvar)
                  eps =
      torch.randn_like(std)
                  return mu + eps *
      std
              else:
                  return mu  # Use 
      mean during inference

          def decode(self, z):
              """Decode latent 
      representation to rating 
      prediction"""
              decoded =
      self.decoder(z)
              return
      torch.sigmoid(decoded)  # 
      Sigmoid for rating 
      normalization

          def forward(self, users, 
      movies, minmax=None):
              """Forward pass through
       hybrid VAE"""
              # Encode to latent 
      space
              mu, logvar =
      self.encode(users, movies)

              # Reparameterize
              z =
      self.reparameterize(mu, logvar)

              # Decode to rating 
      prediction
              rating_pred =
      self.decode(z)

              # Scale to rating range
       if provided
              if minmax is not None:
                  min_rating,
      max_rating = minmax
                  rating_pred =
      rating_pred * (max_rating -
      min_rating) + min_rating

              return rating_pred, mu,
       logvar

          def 
      generate_recommendations(self, 
      user_ids, n_recommendations=10,
       
      
      movie_candidates=None, 
      minmax=None):
              """Generate 
      recommendations for given 
      users"""
              self.eval()
              recommendations = []

              with torch.no_grad():
                  for user_id in
      user_ids:
                      if
      movie_candidates is None:
                          # Use all 
      movies if not specified

      movie_candidates =
      torch.arange(self.movie_embeddi
      ng.num_embeddings)

                      # Create 
      user-movie pairs
                      users = torch.f
      ull((len(movie_candidates),),
      user_id, dtype=torch.long)
                      movies =
      movie_candidates

                      # Get 
      predictions
                      ratings, _, _ =
       self.forward(users, movies,
      minmax)

                      # Get top 
      recommendations
                      _, top_indices
      = torch.topk(ratings.squeeze(),
       n_recommendations)
                      top_movies =
      movie_candidates[top_indices]
                      top_ratings =
      ratings.squeeze()[top_indices]


      recommendations.append({
                          'user_id':
      user_id,

      'recommended_movies':
      top_movies.cpu().numpy(),

      'predicted_ratings':
      top_ratings.cpu().numpy()
                      })

              return recommendations


In [None]:
def 
      vae_loss_function(predictions, 
      targets, mu, logvar, 
      kl_weight=1.0):
          """
          VAE loss combining 
      reconstruction loss and KL 
      divergence
          
          Args:
              predictions: Model 
      predictions
              targets: Ground truth 
      ratings
              mu: Mean of latent 
      distribution
              logvar: Log variance of
       latent distribution
              kl_weight: Weight for 
      KL divergence term (beta-VAE)
          """
          # Reconstruction loss (MSE 
      for ratings)
          recon_loss = F.mse_loss(pre
      dictions.squeeze(), targets,
      reduction='sum')

          # KL divergence loss
          kl_loss = -0.5 *
      torch.sum(1 + logvar -
      mu.pow(2) - logvar.exp())

          # Total loss
          total_loss = recon_loss +
      kl_weight * kl_loss

          return total_loss,
      recon_loss, kl_loss

      def train_hybrid_vae(model, 
      train_loader, val_loader, 
      n_epochs=100, 
                           lr=1e-4, 
      weight_decay=1e-5, 
      device='cuda',
      
      patience=15, kl_weight=1.0, 
      minmax=(0.5, 5.0)):
          """
          Training loop for Hybrid 
      VAE model
          """
          model = model.to(device)
          optimizer =
      optim.Adam(model.parameters(),
      lr=lr,
      weight_decay=weight_decay)
          scheduler =
      optim.lr_scheduler.ReduceLROnPl
      ateau(optimizer, 'min',
      patience=5, factor=0.5)

          best_val_loss =
      float('inf')
          patience_counter = 0
          best_weights = None

          history = {
              'train_loss': [],
      'val_loss': [], 'train_recon':
      [],
              'val_recon': [],
      'train_kl': [], 'val_kl': []
          }

          for epoch in
      range(n_epochs):
              # Training phase
              model.train()
              train_total_loss = 0.0
              train_recon_loss = 0.0
              train_kl_loss = 0.0
              train_batches = 0

              for batch_x, batch_y in
       train_loader:
                  batch_x, batch_y =
      batch_x.to(device),
      batch_y.to(device)

                  # Forward pass
                  predictions, mu,
      logvar = model(batch_x[:, 0],
      batch_x[:, 1], minmax)

                  # Calculate loss
                  total_loss,
      recon_loss, kl_loss =
      vae_loss_function(
                      predictions,
      batch_y.squeeze(), mu, logvar,
      kl_weight
                  )

                  # Backward pass

      optimizer.zero_grad()

      total_loss.backward()
                  torch.nn.utils.clip
      _grad_norm_(model.parameters(),
       max_norm=1.0)
                  optimizer.step()

                  # Accumulate losses
                  batch_size =
      batch_x.size(0)
                  train_total_loss +=
       total_loss.item()
                  train_recon_loss +=
       recon_loss.item()
                  train_kl_loss +=
      kl_loss.item()
                  train_batches += 1

              # Validation phase
              model.eval()
              val_total_loss = 0.0
              val_recon_loss = 0.0
              val_kl_loss = 0.0
              val_batches = 0

              with torch.no_grad():
                  for batch_x,
      batch_y in val_loader:
                      batch_x,
      batch_y = batch_x.to(device),
      batch_y.to(device)

                      predictions,
      mu, logvar = model(batch_x[:,
      0], batch_x[:, 1], minmax)
                      total_loss,
      recon_loss, kl_loss =
      vae_loss_function(

      predictions, batch_y.squeeze(),
       mu, logvar, kl_weight
                      )

                      val_total_loss
      += total_loss.item()
                      val_recon_loss
      += recon_loss.item()
                      val_kl_loss +=
      kl_loss.item()
                      val_batches +=
      1

              # Average losses
              avg_train_loss =
      train_total_loss /
      train_batches
              avg_val_loss =
      val_total_loss / val_batches
              avg_train_recon =
      train_recon_loss /
      train_batches
              avg_val_recon =
      val_recon_loss / val_batches
              avg_train_kl =
      train_kl_loss / train_batches
              avg_val_kl =
      val_kl_loss / val_batches

              # Update history
              history['train_loss'].a
      ppend(avg_train_loss)
              history['val_loss'].app
      end(avg_val_loss)
              history['train_recon'].
      append(avg_train_recon)
              history['val_recon'].ap
      pend(avg_val_recon)
              history['train_kl'].app
      end(avg_train_kl)
              history['val_kl'].appen
      d(avg_val_kl)

              # Learning rate 
      scheduling

      scheduler.step(avg_val_loss)

              # Early stopping
              if avg_val_loss <
      best_val_loss:
                  best_val_loss =
      avg_val_loss
                  best_weights = copy
      .deepcopy(model.state_dict())
                  patience_counter =
      0
                  print(f'✓ Epoch 
      {epoch+1:03d}: Val loss 
      improved to 
      {avg_val_loss:.4f}')
              else:
                  patience_counter +=
       1

              # Print progress
              if (epoch + 1) % 10 ==
      0 or epoch == 0:
                  print(f'Epoch 
      {epoch+1:03d}/{n_epochs:03d} | 
      '
                        f'Train: 
      {avg_train_loss:.4f} | Val: 
      {avg_val_loss:.4f} | '
                        f'Recon: 
      {avg_val_recon:.4f} | KL: 
      {avg_val_kl:.4f} | '
                        f'LR: 
      {optimizer.param_groups[0]["lr"
      ]:.2e}')

              # Early stopping check
              if patience_counter >=
      patience:
                  print(f'Early 
      stopping at epoch {epoch+1}')
                  break

          # Load best weights
          if best_weights is not
      None:
              model.load_state_dict(b
      est_weights)

          return model, history