In [None]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

In [None]:
from pyspark.sql import SparkSession
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Recommendation').getOrCreate()

In [None]:
objekWisata = spark.read.csv("data_objekwisata.csv", header=True)
ratings = spark.read.csv("data_rating.csv", header=True)

In [None]:
ratings.show()

In [None]:
ratings.printSchema()

In [None]:
ratings = ratings.\
    withColumn('id_user', col('id_user').cast('integer')).\
    withColumn('id_objek', col('id_objek').cast('integer')).\
    withColumn('rating', col('rating').cast('integer'))
ratings.show()

In [None]:
numerator = ratings.select("rating").count()

num_users = ratings.select("id_user").distinct().count()
num_objekWisata =  ratings.select("id_objek").distinct().count()

denominator = num_users * num_objekWisata

sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [None]:
# Group data by userId, count ratings
username_ratings = ratings.groupBy("id_user").count().orderBy('count', ascending=False)
username_ratings.show()

In [None]:
attraction_ratings = ratings.groupBy("id_objek").count().orderBy('count', ascending=False)
attraction_ratings.show()

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="id_user", itemCol="id_objek", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

In [None]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [None]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [None]:
test_predictions.show()

In [None]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

In [None]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('id_user', col("rec_exp.id_objek"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

In [None]:
nrecommendations.join(objekWisata, on='id_objek').filter('id_user = 6658').show()

In [None]:
ratings.join(objekWisata, on='id_objek').filter('id_user = 6658').sort('rating', ascending=False).limit(10).show()