In [44]:
import modin.pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.recommendation import ALS as newALS
from sklearn.metrics import mean_squared_error
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
from sortedcontainers import SortedList, SortedDict
import time

import pandas as pd, numpy as np, matplotlib.pyplot as plt

import sys 
sys.path.insert(1, "../")
from workloads.util import use_results, use_dataset, read_config, log_dataset

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [5]:
num_features = 50
learning_rate = .02

In [8]:
dataset_dir = use_dataset("ml-latest-small", download=True)
ratings_path = f"{dataset_dir}/ratings.csv"
tags_path = f"{dataset_dir}/tags.csv" 
movies_path = f"{dataset_dir}/movies.csv"

/data/wooders/ralf-vldb//datasets/ml-latest-small
Downloading from aws: vldb


In [53]:
np.random.seed(100)
initial_features = np.random.rand(num_features)

In [14]:
#dataset_dir = "../../Downloads/ml-1m"

def split_data(df):  
    start_ts = df['timestamp'].min()
    med_ts = df['timestamp'].quantile(.5)
    end_ts = df['timestamp'].max()
    train_df = df[df['timestamp'] <= med_ts]
    stream_df = df[df['timestamp'] > med_ts]
    seen_movies = set(train_df['movie_id'])
    print(len(seen_movies), len(set(stream_df['movie_id'])), len(stream_df))
    stream_df = stream_df.drop(stream_df[stream_df['movie_id'].map(lambda x: x not in seen_movies)].index)
    '''
    print(len(set(stream_df['movie_id'])), len(stream_df))
    seen_users = set(train_df['user_id'])
    num_stream_users = set(stream_df['user_id'])
    stream_df = stream_df.drop(stream_df[stream_df['user_id'].map(lambda x: x not in seen_users)].index)
    print(len(seen_users), len(num_stream_users), len(set(stream_df['user_id'])), len(stream_df))
    '''
    train_df.to_csv(f'{dataset_dir}/train.csv', header=True, index = False)
    stream_df.to_csv(f'{dataset_dir}/stream.csv', header=True, index = False)
    return start_ts, med_ts, end_ts

In [9]:
tags = pd.read_csv(tags_path)
tags.columns = ['user_id', 'movie_id', 'tag', 'timestamp']
ratings = pd.read_csv(ratings_path)
ratings.columns = ['user_id', 'movie_id', 'rating', 'timestamp']
movies = pd.read_csv(movies_path)
movies.columns = ['movie_id', 'title', 'genres']

In [15]:
#df = pd.read_csv(f'{path}/ratings.dat', sep = "::", names=['user_id', 'movie_id', 'rating', 'timestamp']).sort_values('timestamp')
start_ts, med_ts, end_ts = split_data(ratings)   

5567 7720 50418


In [6]:
split_data(path)



3551 3643 500104


(956703932, 973018006.0, 1046454590)

In [16]:
spark = SparkSession.builder.master('local').appName('als').getOrCreate()

22/03/25 16:38:46 WARN Utils: Your hostname, flaminio resolves to a loopback address: 127.0.1.1; using 169.229.48.114 instead (on interface enp1s0f0)
22/03/25 16:38:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/25 16:38:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [18]:
spark_df = spark.read.options(delimiter=",", inferSchema=True, header=True).csv(f'{dataset_dir}/train.csv')

In [19]:
spark_df = spark_df.drop('timestamp')
spark_df.show()
new_model = newALS.train(ratings = spark_df.rdd, rank = num_features, iterations=15, lambda_=learning_rate)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|       1|   4.0|
|      1|       3|   4.0|
|      1|       6|   4.0|
|      1|      47|   5.0|
|      1|      50|   5.0|
|      1|      70|   3.0|
|      1|     101|   5.0|
|      1|     110|   4.0|
|      1|     151|   5.0|
|      1|     157|   5.0|
|      1|     163|   5.0|
|      1|     216|   5.0|
|      1|     223|   3.0|
|      1|     231|   5.0|
|      1|     235|   4.0|
|      1|     260|   5.0|
|      1|     296|   3.0|
|      1|     316|   3.0|
|      1|     333|   5.0|
|      1|     349|   4.0|
+-------+--------+------+
only showing top 20 rows



                                                                                

In [20]:
user_features = new_model.userFeatures().toDF().toPandas()
user_features = user_features.rename(columns={'_1': 'id', '_2': 'features'})
product_features = new_model.productFeatures().toDF().toPandas()
product_features = product_features.rename(columns={'_1': 'id', '_2': 'features'})

22/03/25 16:39:18 WARN BlockManager: Task 49 already completed, not releasing lock for rdd_318_0
                                                                                

In [23]:
product_features.sort_values('id').to_csv(f'{dataset_dir}/movie_features.csv', index = False)
user_features.sort_values('id').to_csv(f'{dataset_dir}/user_features.csv', index = False)

In [24]:
def get_feature_dict(features):
    feature_dict = dict()
    for row in features.itertuples():
        feature_dict[int(row.id)] = np.array(row.features)
    return feature_dict

In [25]:
class FIFOScheduler:
    
    def __init__(self):
        self.queue = []
        self.name = "fifo"
        
    def push(self, row):
        self.queue.append(row)
    
    def pop(self):
        if len(self.queue) == 0:
            return None
        return self.queue.pop()

In [26]:
class FewestUpdateScheduler:
    
    def __init__(self):
        self.updated_list = SortedList(key = lambda x: x[1])
        self.row_dict = dict()
        self.name = "fewest"

    def push(self, row):
        user = row.user_id
        if user not in self.row_dict:
            self.updated_list.add([user, 0])
            self.row_dict[user] = [row]
        else:
            self.row_dict[user].append(row)
          
    def pop(self):
        user_data = self.updated_list.pop(0)
        user = user_data[0]
        row = self.row_dict[user].pop()
        if len(self.row_dict[user]) != 0:
            user_data[1] = user_data[1] + 1
            self.updated_list.add(user_data)
        #self.updated_list[0][1] = self.updated_list[0][1] + 1
        return row

In [62]:
class MaxPendingScheduler:
    
    def __init__(self):
        # items = [key, num_pending]
        self.updated_list = SortedList(key = lambda x: x[1])
        self.row_dict = dict()
        self.name = "pending"

    def push(self, row):
        user = row.user_id
        if user not in self.row_dict:
            self.updated_list.add([user, 1])
            self.row_dict[user] = [row]
        else:
            pending_updates = len(self.row_dict[user])
            self.updated_list.remove([user, pending_updates])
            self.updated_list.add([user, pending_updates + 1])
            self.row_dict[user].append(row)
        #print(self.row_dict, self.updated_list)
        
    def pop(self):
        user = self.updated_list[-1][0]
        row = self.row_dict[user].pop()
        if len(self.row_dict[user]) == 0:
            self.updated_list.pop(-1)
            del self.row_dict[user]
        else:
            self.updated_list[-1][1] = self.updated_list[-1][1] - 1
        return row

In [28]:
class SGDUpdater:
    
    def __init__(self, user_features, movie_features, learning_rate, u_reg, m_reg, update_movies = True):
        self.user_dict = get_feature_dict(user_features)
        self.movie_dict = get_feature_dict(movie_features) 
        self.l = learning_rate
        self.u_reg = u_reg
        self.m_reg = m_reg
        self.update_movies = update_movies
        self.name = "sgd"
    
    def predict(self, row):
        movie_id = row.movie_id
        user_id = row.user_id
         
        if user_id not in self.user_dict:
            self.user_dict[user_id] = np.copy(initial_features)
        
        return self.user_dict[user_id].dot(self.movie_dict[movie_id].T)
    
    def update(self, rows):
        for row in rows:
            movie_id = row.movie_id
            user_id = row.user_id
            rating = row.rating

            prediction = self.predict(row)
            error = rating - prediction

            self.user_dict[user_id] = self.user_dict[user_id] + self.l * (error * self.movie_dict[movie_id] - self.u_reg  * self.user_dict[user_id])
            if self.update_movies:
                self.movie_dict[movie_id] = self.movie_dict[movie_id] + self.l * (error * self.user_dict[user_id] - self.m_reg * self.movie_dict[movie_id])

In [51]:
def get_feature_matrix(num_rows, features):
    matrix = np.zeros((int(num_rows), int(num_features)))
    feature_dict = dict()
    for row in features.itertuples():
        feature_dict[int(row.id)] = np.array(row.features)
    for i in range(int(num_rows)):
        if i in feature_dict:
            matrix[i] = feature_dict[i]
        else:
            matrix[i] = np.copy(initial_features)
    return matrix

In [30]:
class ALSUpdater:
    
    def __init__(self, user_features, movie_features, rating_matrix, confidence_matrix, max_user, max_movie, u_reg, m_reg):
        self.user_matrix = get_feature_matrix(max_user, user_features)
        self.movie_matrix = get_feature_matrix(max_movie, movie_features)
        self.rating_matrix = rating_matrix
        self.confidence_matrix = confidence_matrix
        self.u_reg = u_reg
        self.m_reg = m_reg
        self.name = "als"
        
    def predict(self, row):
        movie_id = row.movie_id
        user_id = row.user_id
        prediction = np.dot(self.movie_matrix[movie_id], self.user_matrix[user_id])
        return prediction
    
    def update(self, rows):
        for row in rows:
            movie_id = row.movie_id
            user_id = row.user_id
            rating = row.rating
            self.confidence_matrix[user_id][movie_id] = 1
            self.rating_matrix[user_id][movie_id] = rating
            user_confidence_row = self.confidence_matrix[user_id]
            left = np.dot(self.movie_matrix.T, np.dot(np.diag(user_confidence_row), self.movie_matrix)) + self.u_reg * np.eye(num_features)
            right = np.dot(self.movie_matrix.T, np.dot(np.diag(user_confidence_row), self.rating_matrix[user_id].T))
            print(self.movie_matrix.T.shape, np.dot(np.diag(user_confidence_row), self.rating_matrix[user_id].T).shape, left.shape, right.shape)
            self.user_matrix[user_id] = np.linalg.solve(
                    left,
                    right
                ).T       
        

In [31]:
stream_df = pd.read_csv(f'{dataset_dir}/stream.csv')
data_to_send = dict()
for ts in stream_df['timestamp'].unique():
    curr_data = stream_df[stream_df['timestamp'] == ts]
    data_to_send[ts] = curr_data

In [32]:
sorted(list(data_to_send.keys()))

[1186086685,
 1186086690,
 1186086693,
 1186086696,
 1186086746,
 1186086761,
 1186086798,
 1186086892,
 1186086895,
 1186086929,
 1186087030,
 1186087037,
 1186087063,
 1186087085,
 1186087091,
 1186087094,
 1186087158,
 1186087165,
 1186087180,
 1186087183,
 1186087220,
 1186087228,
 1186087236,
 1186087237,
 1186087251,
 1186087261,
 1186087284,
 1186087291,
 1186087300,
 1186087317,
 1186087346,
 1186087350,
 1186087392,
 1186087440,
 1186087471,
 1186087473,
 1186087476,
 1186087487,
 1186087517,
 1186087520,
 1186087545,
 1186087548,
 1186087552,
 1186087570,
 1186087576,
 1186087590,
 1186087614,
 1186087665,
 1186087669,
 1186087676,
 1186087693,
 1186087704,
 1186087713,
 1186087717,
 1186087735,
 1186087745,
 1186087747,
 1186087752,
 1186087762,
 1186087795,
 1186087800,
 1186087802,
 1186087816,
 1186087834,
 1186087840,
 1186087848,
 1186087850,
 1186087858,
 1186087866,
 1186087867,
 1186087888,
 1186087932,
 1186087939,
 1186088034,
 1186088040,
 1186088046,
 1186088053,

In [35]:
stream_df.min()

user_id      2.000000e+00
movie_id     1.000000e+00
rating       5.000000e-01
timestamp    1.186087e+09
dtype: float64

In [36]:
def simulate(scheduler, updater, stream, df, update_cadence, num_update):
    ts = df['timestamp'].min()
    end_ts = df['timestamp'].max()
    
    predictions = []
    next_feature_update_time = ts + update_cadence
    num_updates = 0
    num_processed = 0
    
    while ts <= end_ts:
        if ts in stream:
            curr_data = stream[ts]
            for row in curr_data.itertuples():
                scheduler.push(row)
                predictions.append(updater.predict(row))
            num_processed += len(curr_data)
        if ts >= next_feature_update_time:
            # how many records to process
            rows = []
            for i in range(num_update):
                row = scheduler.pop()
                if row:
                    rows.append(row)
            updater.update(rows)
            num_updates += len(rows)
            next_feature_update_time += update_cadence
        ts += 1
    return predictions

In [37]:
def run_simulations(scheduler_list, updater_list, stream, df, update_cadences, num_updates):
    experiments = []
    for scheduler in scheduler_list:
        for updater in updater_list:
            print("Scheduler")
            for update_cadence in update_cadences:
                for num_update in num_updates:
                    experiment_name = f'{scheduler.name}_{updater.name}_{update_cadence}_{num_update}_predictions'
                    start = time.time()
                    predictions = simulate(scheduler, updater, stream, df, update_cadence, num_update)
                    elapsed = time.time() - start
                    stream[experiment_name] = predictions
                    mse = mean_squared_error(stream['rating'], stream[experiment_name])
                    experiments.append([experiment_name, elapsed, mse])
    return stream, experiments

In [38]:
train_df = spark_df.toPandas()

In [39]:
max_user = train_df.max()[0]
if stream_df.max()[0] > max_user:
    max_user = stream_df.max()[0]
max_user += 1

max_movie = train_df.max()[1]
if stream_df.max()[1] > max_movie:
    max_movie = stream_df.max()[1]
max_movie += 1

max_user, max_movie

(611.0, 54273.0)

In [40]:
stream_df.max(), train_df.max()

(user_id      6.100000e+02
 movie_id     5.427200e+04
 rating       5.000000e+00
 timestamp    1.537799e+09
 dtype: float64,
 user_id       609.0
 movie_id    54272.0
 rating          5.0
 dtype: float64)

In [46]:
def construct_matrices(df, max_user, max_movie):
    confidence_matrix = np.zeros((int(max_user), int(max_movie)))
    ratings_matrix = np.copy(confidence_matrix)
    for row in df.itertuples():
        confidence_matrix[row.user_id][row.movie_id] = 1
        ratings_matrix[row.user_id][row.movie_id] = row.rating
    return ratings_matrix, confidence_matrix
    
rating_matrix, confidence_matrix = construct_matrices(train_df, max_user, max_movie)

In [65]:
#scheduler_list = [MaxPendingScheduler(), FewestUpdateScheduler(), FIFOScheduler()]
scheduler_list = [FewestUpdateScheduler(), FIFOScheduler()]
sgd_updater = SGDUpdater(user_features, product_features, .02, .01, .01)
als_updater = ALSUpdater(user_features, product_features, rating_matrix, confidence_matrix, max_user, max_movie, 0, 0)
updater_list = [als_updater, sgd_updater]

In [66]:
stream_prediction_df, experiments = run_simulations(scheduler_list, updater_list, data_to_send, stream_df, [1000], [1])

Scheduler
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273

(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50) (50,)
(50, 54273) (54273,) (50, 50

KeyboardInterrupt: 