In [4]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer

from pyspark.sql.functions import col, explode
import pandas as pd
import pyspark.sql.functions as func
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark import SparkContext, SQLContext
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
! java -version

java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)


In [6]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", "16G").getOrCreate()

sc = spark._sc

In [7]:
trainSample = spark.read.option("inferSchema", True).parquet('train_sample1.parquet')
testSample = spark.read.option("inferSchema", True).parquet('test_sample1.parquet')
trainSample.createOrReplaceTempView('trainSample')
testSample.createOrReplaceTempView('testSample')

In [8]:
valSample = spark.read.option("inferSchema", True).parquet('val_sample1.parquet')
valSample.createOrReplaceTempView('valSample')

In [9]:
indexer_obj_1 = StringIndexer(inputCol="user_id", outputCol="user_id_numer").setHandleInvalid("keep")
indexer_model_1 = indexer_obj_1.fit(trainSample)
indexer_df_1 = indexer_model_1.transform(trainSample)

indexer_obj_2 = StringIndexer(inputCol="track_id", outputCol="track_id_numer").setHandleInvalid("keep")
indexer_model_2 = indexer_obj_2.fit(indexer_df_1)
indexer_df_2 = indexer_model_2.transform(indexer_df_1)


train_df = indexer_df_2.drop('user_id')
train_df = train_df.drop('track_id')

In [10]:
val_df_1 = indexer_model_1.transform(valSample)
val_df_2 = indexer_model_2.transform(val_df_1)

val_df = val_df_2.drop('user_id')
val_df = val_df.drop('track_id')

test_df_1 = indexer_model_1.transform(testSample)
test_df_2 = indexer_model_2.transform(test_df_1)

test_df = test_df_2.drop('user_id')
test_df = test_df.drop('track_id')

In [None]:
#train_df.show()

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
# als = ALS(maxIter=5, regParam=0.01, userCol="user_id_numer", itemCol="track_id_numer", ratingCol= "count",
#           coldStartStrategy="drop", implicitPrefs = True)

In [8]:
#Hyperparam Tuning
from bayes_opt import BayesianOptimization
tuning_params = dict()
tuning_params = {"rank":(30,70),"maxIter":(8,16),"regParam":(.01,1),"alpha":(0.0,3.0)}
def BO_func(rank,maxIter,regParam,alpha):
    recommender = ALS(userCol="user_id_numer",itemCol="track_id_numer",ratingCol="count",
                     coldStartStrategy="drop",implicitPrefs=True,rank=int(rank),
                     maxIter=int(maxIter),regParam=int(regParam),alpha=int(alpha))
    model = recommender.fit(train_df)
    preds = model.transform(val_df)
    res_valid = RegressionEvaluator(metricName="rmse",labelCol="count",
                                   predictionCol="prediction")
    rmse=res_valid.evaluate(preds)
    return rmse

In [9]:
optimizer  = BayesianOptimization(
f=BO_func,
pbounds=tuning_params,
verbose=5,
random_state=5,
)
optimizer.maximize(
    init_points=2,
n_iter=5,
)
optimizer.max

|   iter    |  target   |   alpha   |  maxIter  |   rank    | regParam  |
-------------------------------------------------------------------------
| [0m 1       [0m | [0m 4.662   [0m | [0m 2.222   [0m | [0m 11.48   [0m | [0m 42.07   [0m | [0m 0.9194  [0m |
| [95m 2       [0m | [95m 4.666   [0m | [95m 2.488   [0m | [95m 10.45   [0m | [95m 47.66   [0m | [95m 0.5232  [0m |
| [0m 3       [0m | [0m 4.662   [0m | [0m 2.755   [0m | [0m 11.38   [0m | [0m 42.2    [0m | [0m 0.1176  [0m |
| [0m 4       [0m | [0m 4.663   [0m | [0m 2.0     [0m | [0m 8.0     [0m | [0m 50.0    [0m | [0m 1.0     [0m |


{'target': 4.665940743841398,
 'params': {'alpha': 2.4884111887948293,
  'maxIter': 10.446975451610584,
  'rank': 47.65907856480315,
  'regParam': 0.5232338079942138}}

In [2]:
#type(optimizer.max)
params = optimizer.max.get('params')
alpha = params.get("alpha")
rank = params.get("rank")
maxIter = params.get("maxIter")
regParam= params.get("regParam")
#regParam

NameError: name 'optimizer' is not defined

In [None]:
#implement with optimal hyperparameters
recommender = ALS(userCol="user_id_numer",itemCol="track_id_numer",ratingCol="count",
                     coldStartStrategy="drop",implicitPrefs=True,rank=int(rank),
                     maxIter=float(maxIter),regParam=float(regParam),alpha=float(alpha))
model = recommender.fit(train_df)
#change the val_df to test
val_transformed = model.transform(test_df)

In [None]:
#model = als.fit(train_df)

In [None]:
#val_transformed = model.transform(val_df)

In [None]:
val_transformed.show()

In [None]:
# Print best_model
#print(type(best_model))

# Complete the code below to extract the ALS model parameters
#print("**Best Model**")

# # Print "Rank"
#print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
#print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
#print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [None]:
# for each user, sort track ids by count
val_true = val_df.orderBy('count')

# flatten to group by user id and get list of true track ids
val_true_flatten = val_true.groupby('user_id_numer').agg(func.collect_list('track_id_numer').alias("track_id_numer"))

# add to dictionary
val_true_dict = val_true_flatten.collect()
val_true_dict = [{r['user_id_numer']: r['track_id_numer']} for r in val_true_dict]
val_true_dict = dict((key,d[key]) for d in val_true_dict for key in d)

In [None]:
val_true_dict

In [None]:
#https://stackoverflow.com/questions/59390481/how-to-implement-ranking-metrics-of-pyspark
#https://stackoverflow.com/questions/67345691/apply-stringindexer-to-several-columns-in-multiple-dataset

In [None]:
### model transform before recommend for UserSubset
### recommend for distinct users in validation
### implicit prefs = true ???
users = val_transformed.select(als.getUserCol()).distinct()

In [None]:
val_preds = model.recommendForUserSubset(users, 10)
val_preds_explode = val_preds.select(val_preds.user_id_numer,explode(val_preds.recommendations.track_id_numer))

val_preds_flatten = val_preds_explode.groupby('user_id_numer').agg(func.collect_list('col').alias("col"))

val_preds_dict = val_preds_flatten.collect()
val_preds_dict = [{r['user_id_numer']: r['col']} for r in val_preds_dict]
val_preds_dict = dict((key,d[key]) for d in val_preds_dict for key in d)

In [None]:
#--spark.yarn.submit.file.replication=1  --> replication factor

dictcon= list(map(list, val_preds_dict.items()))
dfpreds = spark.createDataFrame(dictcon, ["user_id_numer", "tracks"])

dictcon2= list(map(list, val_true_dict.items()))
dftrue = spark.createDataFrame(dictcon2, ["user_id_numer", "tracks"])

rankingsRDD = (dfpreds.join(dftrue, 'user_id_numer')
               .rdd
               .map(lambda row: (row[1], row[2])))
rankingsRDD

In [None]:
metrics = RankingMetrics(rankingsRDD)

In [None]:
### OLD ### 

# labels_list = []

# for user in val_preds_dict.keys():
#     labels_list.append((val_preds_dict[user], [int(i) for i in val_true_dict[user]]))

# labels = sc.parallelize(labels_list)
metrics = RankingMetrics(rankingsRDD)
#print(metrics.meanAveragePrecision)# metrics = RankingMetrics(labels)
# #print(metrics.meanAveragePrecision)


In [None]:
metrics.meanAveragePrecision