Checking system version and used packages 

In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pandas as pd


import pyspark
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType

from spark.starter import start_spark
from validation.spark_validation import SparkValidation
from data.data_helper import data_split
from pyspark.sql import SparkSession

print(f"System version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"PySpark version: {pyspark.__version__}")

System version: 3.11.10 | packaged by conda-forge | (main, Oct 16 2024, 01:27:36) [GCC 13.3.0]
Pandas version: 2.2.3
PySpark version: 3.5.4


Data structure and used parameters for the ALS model

In [2]:
# Data structure
COL_CUS = "customer_id"
COL_GAME = "game_id"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"
schema = StructType(
    (
        StructField(COL_CUS, IntegerType()),
        StructField(COL_GAME, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)
# Model parameters
RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05
K = 10 # number of recommendations for the Model

Starting Spark session

In [3]:
spark = start_spark("ALS Deep Dive", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 07:44:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Reading data from csv into a Spark DataFrame.

In [6]:
# Initialize SparkSession
spark = SparkSession.builder.appName("Load ALS Data").getOrCreate()

# Path to the CSV file 
file_path = "/root/Recommendation_system.csv"

# Load the CSV into a Spark DataFrame
dfs = spark.read.csv(file_path, schema=schema, header=True)

# Show the data
dfs.show()

25/02/19 14:03:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----------+-------+------------+----------+
|customer_id|game_id|      rating| timestamp|
+-----------+-------+------------+----------+
|    8561354|    201| 0.020287959|1685009284|
|    8561354|    201| 0.020287959|1685009285|
|    8561354|    201| 0.020287959|1685009286|
|    8561354|    201| 0.020287959|1685009286|
|    8561354|    201| 0.020287959|1685009287|
|    8561354|    201| 0.020287959|1685009287|
|    8561354|    401|0.0019633507|1685009289|
|    8561354|    402|0.0019633507|1685009290|
|    8561354|    401|0.0019633507|1685009290|
|    8561354|    402|0.0019633507|1685009291|
|    8577554|1000001|  0.30104712|1685009981|
|    8577554|1000001|  0.30104712|1685009981|
|    8577554|1000000| 0.009816754|1685009982|
|    8577554|1000002|         1.0|1685009982|
|    8577554|1000002|         1.0|1685009982|
|    8577554|1000002|         1.0|1685009982|
|    8577554|1000002|         1.0|1685009983|
|    8577554|1000002|         1.0|1685009983|
|    8577554|1000002|         1.0|

Spliting the Data by 75-25 ratio for training and testing of the model

In [7]:
dfs_train, dfs_test = data_split(dfs, ratio=0.75, seed=42)

### Train the movielens model using pyspark

In [8]:
als = ALS(
    maxIter=MAX_ITER, 
    rank=RANK,
    regParam=REG_PARAM, 
    userCol=COL_CUS, 
    itemCol=COL_GAME, 
    ratingCol=COL_RATING, 
    coldStartStrategy="drop"
)

model = als.fit(dfs_train)

25/02/19 08:51:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/02/19 08:51:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/02/19 08:51:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


use the trained model to predict ratings with test data.

In [9]:
dfs_pred = model.transform(dfs_test).drop(COL_RATING)

With the prediction results, the model performance can be evaluated.

In [10]:
evaluations = SparkValidation(
    dfs_test,
    dfs_pred,
    col_user=COL_CUS,
    col_item=COL_GAME,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
)

print(
    "RMSE score = {}".format(evaluations.rmse()),
    "MAE score = {}".format(evaluations.mae()),
    "R2 score = {}".format(evaluations.rsquared()),
    sep="\n"
)

[Stage 265:>                                                        (0 + 1) / 1]

RMSE score = 0.05680242215609236
MAE score = 0.05511392340144523
R2 score = 0.9805484556908253


                                                                                

#### Top k for all users (items)

In [11]:
dfs_rec = model.recommendForAllUsers(10)

In [12]:
dfs_rec.show(10)



+-----------+--------------------+
|customer_id|     recommendations|
+-----------+--------------------+
|    8450054|[{55104, 1.005364...|
|    8561354|[{1000002, 7.3052...|
|    8387754|[{1000004, 0.0269...|
|    8385254|[{1000052, 0.0308...|
|    8616454|[{1000004, 0.0720...|
|    8580454|[{1000004, 0.0125...|
|    8679454|[{55104, 0.186197...|
|    8488454|[{1000004, 2.2326...|
|    8509454|[{1000004, 0.0634...|
|    8680154|[{1000025, 0.0828...|
+-----------+--------------------+
only showing top 10 rows



                                                                                

#### Top k for a selected set of users (items)

In [13]:
users = dfs_train.select(als.getUserCol()).distinct().limit(3)

dfs_rec_subset = model.recommendForUserSubset(users, 10)

In [14]:
dfs_rec_subset.show(10, truncate=False)

+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                                                                                                                                                       |
+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|8450054    |[{55104, 1.0053643E-20}, {1000026, 8.001455E-21}, {57714, 6.961263E-21}, {30271, 4.707042E-21}, {1000004, 4.184424E-21}, {15064, 4.012089E-21}, {1000021, 3.6333666E-21}, {34204, 3.2639856E-21}, {10310, 3.

Stop the model

In [4]:
spark.stop()