In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

import pandas as pd
import logging
import json

def create_spark_configuration():
    spark_config = None
    try:
        spark_config = (SparkSession.builder
            .appName("ElasticsearchSparkIntegration")
            # .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-20_2.12:7.17.14,"
            #         "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")
            .getOrCreate())
        
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return spark_config

spark = create_spark_configuration()

In [2]:
df = spark.read.csv('data/csv/movie_ratings.csv',inferSchema=True,header=True)
df.describe().show()

(train,test) = df.select('userId','movieId','rating').randomSplit([0.7,0.3],seed=42)

+-------+------------------+------------------+------------------+-----------------+
|summary|            userId|           movieId|            rating|        timestamp|
+-------+------------------+------------------+------------------+-----------------+
|  count|            100000|            100000|            100000|           100000|
|   mean|         462.48475|         425.53013|           3.52986|8.8352885148862E8|
| stddev|266.61442012750905|330.79835632558473|1.1256735991443214|5343856.189502848|
|    min|                 1|                 1|                 1|        874724710|
|    max|               943|              1682|                 5|        893286638|
+-------+------------------+------------------+------------------+-----------------+



In [3]:
als = ALS(maxIter=5,regParam=0.01,userCol="userId",itemCol="movieId",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)

In [4]:
model = als.fit(train)

In [5]:
prediction = model.transform(test)

In [6]:
prediction.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   897|    496|     5|  4.539078|
|   251|    148|     2| 2.7995195|
|   580|    148|     4|  2.412327|
|   580|    471|     3| 3.6193416|
|    65|    471|     4| 3.7422976|
|   883|   1591|     3| 3.5973558|
|   588|    463|     4| 4.0468345|
|   588|    496|     3| 3.2339876|
|   472|    496|     4|  4.184733|
|   321|    496|     4| 3.7210383|
|   593|    471|     3| 3.4918327|
|   642|    148|     5|  3.255121|
|   731|    496|     5|  4.236382|
|   332|    148|     5| 4.7987585|
|   332|    471|     4| 4.6127596|
|   271|    496|     5| 4.2733626|
|   844|    471|     3| 3.7330875|
|   806|    496|     5|   4.11687|
|   103|    471|     4| 3.1587062|
|   236|    496|     3|  4.049107|
+------+-------+------+----------+
only showing top 20 rows



In [7]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [8]:
rmse = evaluator.evaluate(prediction)
print(rmse)

1.0375587077925283


In [9]:
user_1 = test.filter(test['userId'] > 940 ).select(['movieId','userId'])

In [10]:
user_1.show()

+-------+------+
|movieId|userId|
+-------+------+
|      7|   941|
|    147|   941|
|    181|   941|
|    257|   941|
|    258|   941|
|    273|   941|
|     95|   942|
|    117|   942|
|    210|   942|
|    234|   942|
|    272|   942|
|    300|   942|
|    304|   942|
|    328|   942|
|    347|   942|
|    357|   942|
|    498|   942|
|    511|   942|
|    539|   942|
|    607|   942|
+-------+------+
only showing top 20 rows



In [11]:
rec = model.transform(user_1)

In [12]:
rec.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    219|   943|  5.748473|
|    498|   942|  5.083549|
|    511|   942|  4.928707|
|    945|   942|  4.912862|
|    763|   943| 4.7392116|
|   1044|   943| 4.6208005|
|    210|   942| 4.6042194|
|    272|   942| 4.6037908|
|    117|   942|  4.567074|
|    357|   942| 4.4385138|
|    705|   942|  4.414038|
|    765|   943| 4.4095225|
|     95|   942|    4.3789|
|    184|   943|  4.359953|
|      7|   941|  4.325896|
|    216|   943|  4.318621|
|    257|   941| 4.3021526|
|      9|   943|  4.292218|
|    181|   941|  4.246449|
|     56|   943| 4.2433863|
+-------+------+----------+
only showing top 20 rows



In [13]:
model.save('model/als-rec')