In [11]:
import datetime
import json
import time

import pandas as pd

## Processing of Movie Data

In [12]:
# Segment the data into the three different records
# 1. <time>,<userid>,recommendation request <server>, status <200 for success>, result: <recommendations>, <responsetime>
# 2. <time>,<userid>,GET /data/m/<movieid>/<minute>.mpg
# 3. <time>,<userid>,GET /rate/<movieid>=<rating>

def sort_records(records):
    """
    Sorts records into the three different entry types
    --------------------------------------------------
    Args
    -----
    1. records: list
        records obtained from kafka
            
    Returns
    --------
    1. recommendation_records: list
        records of movie recommendations to users
    2. watching_records: list
        records of what users watched
    3. rating records: list
        records of user movie ratings
    """
    recommendation_records = []
    watching_records = []
    rating_records = []
    for record in records:
        if "recommendation" in record:
            recommendation_records.append(record)
        elif record[-4:] == ".mpg":
            watching_records.append(record)
        elif "rate" in record:
            rating_records.append(record)
        else:
            print(f"Unknown record: {record}")
            
    return recommendation_records, watching_records, rating_records

data_files = ['data.txt']
records = []
for data_file in data_files:
    with open(data_file) as f:
        lines = f.readlines()
    records.extend([line[2:-2] for line in lines])

recommendation_records, watching_records, rating_records = sort_records(records)

In [13]:
def datetime_to_unix(timestamp):
    """
    Converts normal timestamp to unix time
    ----------------------------------------
    Args
    -----
    1. timestamp: str
        format - YYYY-MM-DDTHH:MM:SS (SS is optional)
        
    Returns
    --------
    1. unix_timestamp: float
        The equivalent unix epoch time
    """
    date, day_time = timestamp.split("T")
    year, month, day = date.split("-")
    times = day_time.split(":")
    hour, minute = times[0], times[1]

    date_time_obj = datetime.datetime(int(year), int(month),
                                      int(day), int(hour),
                                      int(minute)
                                      )
    unix_timestamp = time.mktime(date_time_obj.timetuple())
    return unix_timestamp

def rating_records_to_df(rating_records):
    """
    Converts the rating records into a dataframe
    Also converts timestamp to unix time
    ---------------------------------------------
    Args
    -----
    1. rating_records: list
        records of user movie ratings
    
    Returns
    ---------
    1. rating_records_df: DataFrame
        cols - userId, movieId, rating, timestamp
    """
    ratings_dict = {
        'userId': [],
        'movieId': [],
        'rating': [],
        'timestamp': []
    }
    
    for record in rating_records:
        timestamp, userId, get_request = record.split(",")
        movieId_rating = get_request.split("/")[-1]
        movieId, rating = movieId_rating.split("=")
        unix_timestamp = datetime_to_unix(timestamp)
        
        ratings_dict['userId'].append(userId)
        ratings_dict['movieId'].append(movieId)
        ratings_dict['rating'].append(rating)
        ratings_dict['timestamp'].append(unix_timestamp)
        
    ratings_df = pd.DataFrame.from_dict(ratings_dict)
    return ratings_df

def encode_movieIds(ratings_df):
    """
    Converts movie names to a numerical Id
    ---------------------------------------
    Args
    -----
    1. ratings_df: DataFrame
        records of user movie ratings (with movie names)
    
    Returns
    --------
    1. ratings_df: DataFrame
        records of user movie ratings (with numerical movie Ids)
    2. numerical_movie_id_encoding: dict
        maps movie name to a numerical id
    """
    unique_movies = list(set(ratings_df.loc[:, 'movieId']))
    numerical_movie_id_encoding = {unique_movies[i]: i for i in range(len(unique_movies))}
    ratings_df['movieId'] = ratings_df['movieId'].apply(lambda x: numerical_movie_id_encoding[x])
    
    return ratings_df, numerical_movie_id_encoding

ratings_df = rating_records_to_df(rating_records)
ratings_df, numerical_movie_id_encoding = encode_movieIds(ratings_df)
ratings_df = ratings_df.astype('int64')
data = ratings_df

In [14]:
with open('movie_name_to_id.json', 'w') as f:
    json.dump(numerical_movie_id_encoding, f, indent=4)

In [15]:
def create_train_test_split(data, test_ratio=0.2):
    """
    Splits the data of each user 80-20 (Not the entire set 80-20)
    The split is done such that the test set are the 20% most recent entries
    -------------------------------------------------------------------------
    Args
    -----
    1. data: DataFrame
        Contains user movie ratings
    
    Returns
    --------
    1. train: DataFrame
        Train set of user movie ratings
    2. test: DataFrame
        Test set of user movie ratings
    Note: Both share the same format as data
    """
    data['userId'] = data['userId'].astype('str')
    data['movieId'] = data['movieId'].astype('str')
    users = data['userId'].unique() # list of all users
    movies = data['movieId'].unique() # list of all movies
    test = pd.DataFrame(columns=data.columns)
    train = pd.DataFrame(columns=data.columns)
    
    for u in users:
        temp = data[data['userId'] == u]
        n = len(temp)
        test_size = int(test_ratio*n)

        temp = temp.sort_values('timestamp').reset_index()
        temp.drop('index', axis=1, inplace=True)
            
        # If there is only one entry for the user, it goes to test
        dummy_test = temp.loc[n-1-test_size:]
        dummy_train = temp.loc[:n-2-test_size]

        test = pd.concat([test, dummy_test])
        train = pd.concat([train, dummy_train])
        
    return train, test

def check_correct_sorting(data, train, test):
    """
    Confirms that the data was split properly (more recent entries in test)
    ------------------------------------------------------------------------
    Args
    -----
    1. data: DataFrame
        Full dataframe of user movie ratings (train + test)
    2. train: DataFrame
        Train set
    3. test: DataFrame
        Test set
        
    Returns
    --------
    1. is_sorted_correctly: boolean
        Indicates whether all users were split correctly
    """
    users = set(data['userId'])
    is_sorted_correctly = True
    for user in users:
        num_entries = data[data['userId'] == user].shape[0]
        if num_entries < 2:
            continue

        latest_train_timestamp = train[train['userId'] == user].iloc[-1]['timestamp']
        earliest_test_timestamp = test[test['userId'] == user].iloc[0]['timestamp']
        if latest_train_timestamp > earliest_test_timestamp:
            is_sorted_correctly = False
            break

    return is_sorted_correctly

train, test = create_train_test_split(data)
assert True == check_correct_sorting(data, train, test)

In [16]:
data = data.astype('int64')

In [17]:
train = train.astype('int64')
test = test.astype('int64')

In [18]:
train.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)
data.reset_index(drop=True, inplace=True)

In [30]:
train.to_csv("train_df.csv", index=False)
test.to_csv("test_df.csv", index=False)
data.to_csv("data.csv", index=False)

In [19]:
data.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,268254,211,4,1641723840
1,740774,238,3,1641724320
2,834892,1084,2,1641724560
3,337247,1043,4,1643005200
4,179302,333,1,1643005560


## Setup PyTorch Lightning Recommendation System

In [7]:
import torch
import torchmetrics

import pytorch_lightning as pl
import torch.nn as nn
import torch.nn.functional as F

from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import DeviceStatsMonitor
from torch.utils.data import random_split, DataLoader, Dataset

In [8]:
class RecommendationDataSet(Dataset):
    def __init__(self, df):
        self.df = df
        self.data_indices = list(df.index)
        
    def __len__(self):
        return len(self.data_indices)
    
    def __getitem__(self, idx):
        entry = self.df.iloc[idx]
        x = torch.LongTensor([entry['userId'], entry['movieId']])
        y = torch.Tensor([entry['rating']])
        return x, y

In [9]:
class MF(pl.LightningModule):
    def __init__(self, num_users, num_items, train_df, val_df, emb_size=100):
        super(MF, self).__init__()
        self.user_emb = nn.Embedding(num_users, emb_size)
        self.item_emb = nn.Embedding(num_items, emb_size)
        # initializing our matrices with a positive number generally will yield better results
        self.user_emb.weight.data.uniform_(0, 0.5)
        self.item_emb.weight.data.uniform_(0, 0.5)
        
        self.train_df = train_df
        self.val_df = val_df
        
    def forward(self, u, v):
        u = self.user_emb(u)
        v = self.item_emb(v)
        return (u*v).sum(1)  # taking the dot product
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.01, weight_decay=0.0)
        return optimizer
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        usernames = x[:, 0]
        movies = x[:, 1]
        ratings = y[:, 0]
        y_hat = self(usernames, movies)
        loss = F.mse_loss(y_hat, ratings)
        
        mse = torchmetrics.MeanSquaredError(ratings, y_hat)
        
        # Each training step is one epoch
        self.logger.experiment.add_scalar("mse_loss/train", loss, self.current_epoch)
        
        return {'loss': loss}

    def validation_step(self, batch, batch_idx):
        results = self.training_step(batch, batch_idx)
        return results
    
    def validation_epoch_end(self, val_outputs):
        val_loss = val_outputs[0]['loss']
        
        progress_bar = {'val_loss': val_loss}
        
        self.logger.experiment.add_scalar("mse_loss/val", val_loss, self.current_epoch)
        
        return {'val_loss': val_loss, 'progress_bar': progress_bar}

    def train_dataloader(self):
        train_data = RecommendationDataSet(self.train_df)
        train_loader = DataLoader(train_data, batch_size=len(train_data))
        return train_loader
    
    def val_dataloader(self):
        val_data = RecommendationDataSet(self.val_df)
        val_loader = DataLoader(val_data, batch_size=len(val_data))
        return val_loader

In [20]:
print("num users: ", max(data['userId'].unique()))
print("num items: ", max(data['movieId'].unique()))
num_users = max(data['userId'].unique())
num_items = max(data['movieId'].unique())

num users:  999627
num items:  1882


In [21]:
model = MF(num_users+1, num_items+1, train, test)

In [23]:
logger = TensorBoardLogger('lightning_logs', name='lr_01')

In [24]:
trainer = pl.Trainer(max_epochs=10, log_every_n_steps=1, logger=logger, profiler="simple")
trainer.fit(model)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
Missing logger folder: lightning_logs\device_stats

  | Name           | Type             | Params 
-----------------------------------------------------
0 | user_emb       | Embedding        | 100.0 M
1 | item_emb       | Embedding        | 188 K  
2 | mse_calculator | MeanSquaredError | 0      
-----------------------------------------------------
100 M     Trainable params
0         Non-trainable params
100 M     Total params
400.604   Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

FIT Profiler Report

Action                             	|  Mean duration (s)	|Num calls      	|  Total time (s) 	|  Percentage %   	|
----------------------------------------------------------------------------------------------------------------------------------------
Total                              	|  -              	|_              	|  53.672         	|  100 %          	|
----------------------------------------------------------------------------------------------------------------------------------------
run_training_epoch                 	|  5.339          	|10             	|  53.39          	|  99.475         	|
on_train_epoch_end                 	|  3.7841         	|10             	|  37.841         	|  70.504         	|
optimizer_step_with_closure_0      	|  0.7328         	|10             	|  7.328          	|  13.653         	|
run_training_batch                 	|  0.7328         	|10             	|  7.328          	|  13.653         	|
fetch_next_train_batc

## Logging

In [4]:
!Taskkill /IM "tensorboard.exe" /F

ERROR: The process "tensorboard.exe" not found.


In [26]:
%reload_ext tensorboard
%tensorboard --logdir=lightning_logs/

Reusing TensorBoard on port 6006 (pid 11788), started 0:05:04 ago. (Use '!kill 11788' to kill it.)

## Pre PyTorch Lightning Code

In [None]:
class MF(nn.Module):
    def __init__(self, num_users, num_items, emb_size=100):
        super(MF, self).__init__()
        self.user_emb = nn.Embedding(num_users, emb_size)
        self.item_emb = nn.Embedding(num_items, emb_size)
        # initializing our matrices with a positive number generally will yield better results
        self.user_emb.weight.data.uniform_(0, 0.5)
        self.item_emb.weight.data.uniform_(0, 0.5)
        
        self.mse_calculator = torchmetrics.MeanSquaredError()
        
    def forward(self, u, v):
        u = self.user_emb(u)
        v = self.item_emb(v)
        return (u*v).sum(1)  # taking the dot product

In [None]:
def train_epochs(model, train_df, epochs=10, lr=0.01, wd=0.0):
    
    model.train()
    for i in range(epochs):
        usernames = torch.LongTensor(train_df.userId.values)
        game_titles = torch.LongTensor(train_df.movieId.values)
        ratings = torch.FloatTensor(train_df.rating.values)
        y_hat = model(usernames, game_titles)
        loss = F.mse_loss(y_hat, ratings)
        optimizer.zero_grad()  # reset gradient
        loss.backward()
        optimizer.step()
        print(loss.item())

In [None]:
def test_model(model, test_df):
    model.eval()
    usernames = torch.LongTensor(test_df.userId.values)
    game_titles = torch.LongTensor(test_df.movieId.values)
    ratings = torch.FloatTensor(test_df.rating.values)
    y_hat = model(usernames, game_titles)
    loss = F.mse_loss(y_hat, ratings)
    print("test loss %.3f " % loss.item())