# <font color='green'> ALS Recommender - Matrix Factorization Algorithm <font>

#### Import common libraries

In [1]:
import time
import numpy as np

## <font color='green'> 1. Data Preparation<font>

#### Download and unzip data

In [2]:
# Please uncomment the below lines to download and unzip the dataset.
# !wget -N http://files.grouplens.org/datasets/movielens/ml-10m.zip
# !unzip -o ml-10m.zip
# !mv ml-10m/ratings.dat datasets/ratings_10M.dat

#### Read the data

In [3]:
import pandas as pd
df = pd.read_csv("./datasets/ratings_10M.dat",
                 sep="::",
                 engine="python",
                 usecols=[0,1,2],
                 names=["userId", "movieId", "rating"])
print("shape of loaded dataframe: {}".format(df.shape))

# adjusting the dataframe to transform zero-based userId and movieId
userId = df['userId'].values
movieId = df['movieId'].values
UID, unqUID = np.unique(userId, return_inverse=True)
MID, unqMID = np.unique(movieId, return_inverse=True)
df['userId'] = unqUID
df['movieId'] = unqMID

shape of loaded dataframe: (10000054, 3)


#### Check Data Types

In [4]:
df.dtypes

userId       int64
movieId      int64
rating     float64
dtype: object

#### Check total number of users

In [5]:
all_users = df.userId.unique().tolist()
len(all_users)

69878

#### Collect test data

In [6]:
import random
test_ids = random.sample(all_users, 100)

all_actual_items = []
for user in test_ids:
    user_actual_items = df.loc[(df["userId"]==user) & (df["rating"]>=3), "movieId"].tolist()
    all_actual_items.append(user_actual_items)

#### Convert the data to COO and CSR format

In [7]:
userId = df['userId'].values
movieId = df['movieId'].values
rating = df['rating'].values

from scipy.sparse import coo_matrix
mat = coo_matrix((rating, (userId, movieId)), dtype=np.float64)
mat_csr = mat.tocsr()
print ("shape of the input matrix is {}".format(mat.shape))

shape of the input matrix is (69878, 10677)


#### Helper Function

In [8]:
def get_recommendations(handle, algo, n):
    all_recommended_items = []
    for user in test_ids:
        user_recommended_items = []
        for i in handle(user, n):
            if algo=="frov":
                item = i[0]
            elif algo=="spark":
                item = i.product
            user_recommended_items.append(item)
        all_recommended_items.append(user_recommended_items)
    return all_recommended_items

## <font color='green'> 2. Frovedis ALS<font>

In [9]:
# initializing frovedis server
import os
from frovedis.exrpc.server import FrovedisServer
FrovedisServer.initialize("mpirun -np 8 " + os.environ["FROVEDIS_SERVER"])

# fitting the input matrix on frovedis ALS object
from frovedis.mllib.recommendation import ALS as frovALS
start_time = time.time()
als = frovALS(rank=256, max_iter=15, reg_param=0.01).fit(mat_csr)
frov_train_time = time.time() - start_time
print("Frovedis ALS Train Time: %.3f sec" % frov_train_time)

# recommend 10 items for all test user
import ml_metrics
start_time = time.time()
all_recommended_items = get_recommendations(als.recommend_products, "frov", 10)
frov_pred_time = time.time() - start_time
print("Frovedis ALS Recommendation Time: %.3f sec" % frov_pred_time)

# generate mapk (Mean Average Precision at k) score
frov_score = ml_metrics.mapk(all_actual_items, all_recommended_items, 10)
print("Frovedis ALS MAP(10) Score: %.3f " % frov_score)

# clean-up
als.release()
FrovedisServer.shut_down()

Frovedis ALS Train Time: 366.292 sec
Frovedis ALS Recommendation Time: 0.947 sec
Frovedis ALS MAP(10) Score: 0.882 


## <font color='green'> 3. Pyspark ALS<font>

In [10]:
# initializing spark server
from pyspark import SparkConf, SparkContext
conf = SparkConf().set("spark.driver.memory", "15g")\
                  .set("spark.executor.memory", "48g")
sc = SparkContext(master="local[12]", appName="als", conf=conf)

# creating pyspark data for ALS train
ratingsRDD = sc.parallelize(zip(mat.row, mat.col, mat.data))

# fitting the input matrix on pyspark ALS object
from pyspark.mllib.recommendation import ALS as pyspark_als
start_time = time.time()
model = pyspark_als.trainImplicit(ratingsRDD, rank=256, iterations=15, lambda_=0.01)
sp_train_time = time.time() - start_time
print("Pyspark ALS Train Time: %.3f sec" % sp_train_time)

# recommend 10 items for all test user
import ml_metrics
start_time = time.time()
all_recommended_items = get_recommendations(model.recommendProducts, "spark", 10)
sp_pred_time = time.time() - start_time
print("Pyspark ALS Recommendation Time: %.3f sec" % sp_pred_time)

# generate mapk (Mean Average Precision at k) score
sp_score = ml_metrics.mapk(all_actual_items, all_recommended_items, 10)
print("Pyspark ALS MAP(10) Score: %.3f" % sp_score)

# clean-up
sc.stop()

Pyspark ALS Train Time: 917.875 sec
Pyspark ALS Recommendation Time: 10.233 sec
Pyspark ALS MAP(10) Score: 0.892


## <font color='green'> 4. Performance Comparison <font> 

In [11]:
import frovedis
import pyspark
print("evaluation of ALS using frovedis_%s vs pyspark_%s: " \
      % (frovedis.__version__, pyspark.__version__))
print("frovedis train speed-up: %.3f" % (sp_train_time / frov_train_time))
print("frovedis recommendation speed-up: %.3f" % (sp_pred_time / frov_pred_time))

evaluation of ALS using frovedis_0.9.10 vs pyspark_3.0.1: 
frovedis train speed-up: 2.506
frovedis recommendation speed-up: 10.807
