In [1]:
# set the environment path to find Recommenders
import os
import sys


os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

#os.environ["PYSPARK_PYTHON"]=r"C:\Users\mirza\AppData\Local\Programs\Python\Python39"
import pickle
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))


System version: 3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)]
Spark version: 3.1.2


In [2]:
TOP_K = 5
COL_USER = "userId"
COL_ITEM = "itemId"
COL_RATING = "rating"
COL_TIMESTAMP = "timestamp"

In [3]:
ratings = pickle.load(open("svd_ratings_data.pkl","rb" ))

In [4]:
ratings.head()

Unnamed: 0,userId,itemId,rating
0,1,110,1.0
1,11,110,3.5
2,22,110,5.0
3,24,110,5.0
4,29,110,3.0


In [5]:
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

In [6]:
ratings_data=spark.createDataFrame(ratings) 
#movie=spark.createDataFrame(movies) 

ratings_data =ratings_data.withColumn("userId", col("userId").cast("integer"))
ratings_data =ratings_data.withColumn("itemId", col("itemId").cast("integer"))
ratings_data =ratings_data.withColumn("rating", col("rating").cast("double"))
#ratings_data =ratings_data.withColumn("timestamp", col("timestamp").cast("integer"))

In [7]:
ratings_data.show(10)

+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     1|   110|   1.0|
|    11|   110|   3.5|
|    22|   110|   5.0|
|    24|   110|   5.0|
|    29|   110|   3.0|
|    30|   110|   5.0|
|    33|   110|   3.0|
|    34|   110|   5.0|
|    49|   110|   4.0|
|    56|   110|   4.0|
+------+------+------+
only showing top 10 rows



In [8]:
print((ratings_data.count(), len(ratings_data.columns)))

(1099679, 3)


In [9]:
for field in ratings_data.schema.fields:
    print(field.name +" , "+str(field.dataType))

userId , IntegerType
itemId , IntegerType
rating , DoubleType


In [10]:
train, test = spark_random_split(ratings_data, ratio=0.85, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 935070
N test 164609


## Find best model

In [11]:


header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=50,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [12]:


with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 12.154862599999994 seconds for training.


In [13]:
with Timer() as test_time:

    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 525.3789886 seconds for prediction.


In [14]:
top_all.show(10)

+------+------+----------+
|userId|itemId|prediction|
+------+------+----------+
|     1|   587| 2.7120118|
|     1|   869| 2.7746842|
|     1|  1677|  2.219958|
|     1|  1702| 1.9299065|
|     1|  1720|  1.237754|
|     1|  1892| 3.3349123|
|     1|  2086| 2.3612878|
|     1|  2202| 2.9522777|
|     1|  2324| 3.9532568|
|     1|  2667| 2.4829562|
+------+------+----------+
only showing top 10 rows



In [15]:
prediction = model.transform(test)

In [16]:
rank_eval = SparkRankingEvaluation(test, prediction, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")
rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                               predictionCol="prediction")
mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="rating",
                               predictionCol="prediction")

mae = mae_evaluator.evaluate(prediction)
rmse = rmse_evaluator.evaluate(prediction)

In [17]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "RMSE:\t%f" % rmse,
      "MAE:\t%f" % mae,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	5
RMSE:	0.861481
MAE:	0.663448
MAP:	0.792191
NDCG:	0.999950
Precision@K:	0.670452
Recall@K:	0.792187


In [18]:
test_results = prediction.toPandas()

In [20]:
model.write().overwrite().save(r"C:\Users\mirza\wzrdtls")