In [190]:
"""@misc{catalyst,
    author = {Kolesnikov, Sergey},
    title = {Catalyst - Accelerated deep learning R&D},
    year = {2018},
    publisher = {GitHub},
    journal = {GitHub repository},
    howpublished = {\url{https://github.com/catalyst-team/catalyst}},
}"""

In [2]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

In [3]:
from os import path
import sys, os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
import numpy as np
import pandas as pd
# ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.24.106 -NTf
from kafka import KafkaConsumer, KafkaProducer
from sklearn.model_selection import train_test_split

# Update this for your demo otherwise you'll see my data :)
topic = 'movielog4'

In [6]:
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    # Read from the start of the topic; Default is latest
    auto_offset_reset='earliest',
    # auto_offset_reset='latest',
    # group_id='team13',
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

index = 0

print('Reading Kafka Broker...')
for message in consumer:

    message = message.value.decode()
    # Default message.value type is bytes!
    
    # print(message)
    
#     if "recommendation request 17645-team04" in message:
#         print(message)
    
    if "/rate/" in message:
#         print(message)
        os.system(f"echo {message} >> kafka_online_ratings_1APRIL23_log.csv")
        index += 1
        if index >= 1000:
            print("Breaking loop.")
            break

Reading Kafka Broker


In [188]:
""" READING DATA """

# Define column names
col_names = ["DTG", "UserID", "ratings"]

# Change to the appropriate directory
os.chdir("C:/Users/timjn/OneDrive/Documents/Class, 11695 ML For Production/Assignment_4")

# Read the acquired data using Pandas
vanilla_data = pd.read_csv("kafka_online_ratings_1APRIL23_log.csv", names=col_names, header=None)

# Check our data
print(vanilla_data.head())

                   DTG  UserID                                     ratings
0  2023-03-28T15:04:43  415014       GET /rate/simon+of+the+desert+1965=4 
1  2023-03-28T15:04:44  815048        GET /rate/the+wrong+trousers+1993=4 
2  2023-03-28T15:04:45  745126                 GET /rate/alexander+2004=1 
3  2023-03-28T15:04:46  204326             GET /rate/the+godfather+1972=5 
4  2023-03-28T15:04:46  370012  GET /rate/the+shawshank+redemption+1994=5 


In [189]:
# Split the data for the mvie name
movies = vanilla_data["ratings"].apply(lambda x: x.split("/rate/")[1].split("=")[0])

# get unique movie names
unique_movies = movies.unique()

# create new dataframe with unique movie IDs
movie_ID_dataframe = pd.DataFrame({'Movie_Name': unique_movies,
                       'Movie_ID': np.arange(len(unique_movies))})

# Append the movie data to the vanilla dataframe
vanilla_data = pd.concat([vanilla_data, movie_ID_dataframe], axis=1)
    
# Split the ratings line for the numer
vanilla_data["ratings"] = vanilla_data["ratings"].apply(lambda x: x.split("=")[1])

# Drop bad data; i.e. a letter being used for a rating [A, F, C, ...] in place a 1-5 numerical rating
vanilla_data["ratings"] = vanilla_data["ratings"][pd.to_numeric(vanilla_data["ratings"], errors='coerce').notnull()]

# Drop the Date Time Group "DTG" column
vanilla_data = vanilla_data.drop("DTG", axis=1)

# Check our data
print(vanilla_data.head())

   UserID ratings                     Movie_Name  Movie_ID
0  415014      4        simon+of+the+desert+1965       0.0
1  815048      4         the+wrong+trousers+1993       1.0
2  745126      1                  alexander+2004       2.0
3  204326      5              the+godfather+1972       3.0
4  370012      5   the+shawshank+redemption+1994       4.0


In [187]:
# Function to create the Sparse Matrix from the Pandas Dataframe
def load_review_data(dataframe: pd.DataFrame):
    """load_review_data
    Load movie review data.
    Input has format MovieID1,UserID11,rating_score_for_UserID11_to_MovieID1.

    :param path:
    returns final _rray
    """

    user_count = max(dataframe['UserID'].unique())
    movie_count = max(dataframe['Movie_ID'].unique())
    
    final_array = np.zeros((int(user_count),int(movie_count)))
    
    data, row, col = [], [], []
    for row in dataframe.iterrows():
        
        user_index = int(row[1].iloc[0])
        score = float(row[1].iloc[1])
        try:
            movie_index = int(row[1].iloc[3])
            
        except ValueError:  # Used to navigate an error with movieID 345
            final_array[user_index-1, movie_index-1] = score    
                
        final_array[user_index-1, movie_index-1] = score   

    return final_array

# Define the object to be sent into the Catalyst functions
data_for_Catalyst = load_review_data(vanilla_data)

In [144]:
""" IMPLEMENTING CATALYST RECOMMENDATION SYSTEM """

# prepare data
num_items = data_for_Catalyst.shape[1]  # --> my data set has this numer as 345
num_users = data_for_Catalyst.shape[0]  # --> my data set has this numer as 947686
num_features = data_for_Catalyst.shape[1]

# Define usable tensor objects
X = torch.tensor(data_for_Catalyst).float()  # --> torch.Size([num_users, num_features])
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)  # --> torch.Size([numn_users, num_items])

In [145]:
# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

In [146]:
# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(input_key="logits", target_key="targets", metric_key="loss"),
        dl.BackwardCallback(metric_key="loss"),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

1/3 * Epoch (train):   0%|          | 0/29616 [00:00<?, ?it/s]

train (1/3) loss: 0.6931734325195709 | loss/mean: 0.6931734325195709 | loss/std: 7.324247938279405e-05 | lr: 0.001 | momentum: 0.9


1/3 * Epoch (valid):   0%|          | 0/29616 [00:00<?, ?it/s]

valid (1/3) loss: 0.69314670917849 | loss/mean: 0.69314670917849 | loss/std: 0.0002584077909227108 | lr: 0.001 | momentum: 0.9
* Epoch (1/3) lr: 0.001 | momentum: 0.9


2/3 * Epoch (train):   0%|          | 0/29616 [00:00<?, ?it/s]

train (2/3) loss: 0.6931509151532923 | loss/mean: 0.6931509151532923 | loss/std: 0.0002578164998813689 | lr: 0.001 | momentum: 0.9


2/3 * Epoch (valid):   0%|          | 0/29616 [00:00<?, ?it/s]

valid (2/3) loss: 0.6931267836700175 | loss/mean: 0.6931267836700175 | loss/std: 0.0004866400981969976 | lr: 0.001 | momentum: 0.9
* Epoch (2/3) lr: 0.0001 | momentum: 0.9


3/3 * Epoch (train):   0%|          | 0/29616 [00:00<?, ?it/s]

train (3/3) loss: 0.693110140502279 | loss/mean: 0.693110140502279 | loss/std: 0.00048415166084610694 | lr: 0.0001 | momentum: 0.9


3/3 * Epoch (valid):   0%|          | 0/29616 [00:00<?, ?it/s]

valid (3/3) loss: 0.6931051888136126 | loss/mean: 0.6931051888136126 | loss/std: 0.0005066142662548302 | lr: 0.0001 | momentum: 0.9
* Epoch (3/3) lr: 0.0001 | momentum: 0.9
Top models:
./logs/model.0003.pth	0.6931


In [147]:
# Use any unique User ID to get preditions for that user
arbitrary_user_ID = 20345

 # Need to ensure the tensor passed into the runner model is 2D
user_information = torch.unsqueeze(X[arbitrary_user_ID, :], 0)    # --> will be torch.Size([1, ###])

In [156]:
# Produce the predictions for items for the input user ID
preds = runner.model(user_information).detach().numpy().flatten()
print(preds.shape)  # --> will be an array of shape == [num_items, 0]

(345,)


In [200]:
# Get the indices of the top recommended items with 20 == number recommended items
top_indices = np.argsort(preds)[::-1][:20]

# Map the indices back to the original item names using the combined movie_ID_dataframe object
top_items = [movie_ID_dataframe["Movie_Name"][i] for i in top_indices]

# Print them to ensure we have reasonable indices
print(top_indices)

[ 24 306  36 229  60 318 308 256 331  25 102 209 245  62 243 132  18  90
 184 142]


In [208]:
# Print the top items
recommendation_df = pd.DataFrame({'Movie_Name': top_items, 'Ranking_Position': range(1, len(top_items)+1)})
print(recommendation_df)


                                          Movie_Name  Ranking_Position
0                            the+usual+suspects+1995                 1
1                                   point+blank+2010                 2
2                             l.a.+confidential+1997                 3
3                               men+in+black+ii+2002                 4
4                                anchors+aweigh+1945                 5
5                        terror+beneath+the+sea+1966                 6
6                        love+valour+compassion+1997                 7
7                                        porkys+1981                 8
8                                excess+baggage+1997                 9
9   harry+potter+and+the+deathly+hallows+part+1+2010                10
10                                    twin+town+1997                11
11                         bloody+pit+of+horror+1965                12
12                                   fight+club+1999                13
13    

In [None]:
# for index, movie in enumerate(top_items):
    
#     print(f"The {index} recommended movie for user_ID << {arbitrary_user_ID} >> is movie_ID: << {movie} >> .")