In [77]:
import pandas as pd
import numpy as np
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    IntegerType, StringType, IntegerType, FloatType, 
    StructField, StructType, DoubleType
)
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

In [13]:
als_df = pd.read_pickle('als_df.pkl')

In [14]:
als_df.reset_index(drop=True, inplace=True)

In [18]:
als_df = als_df[['user_id', 'item_id', 'rating', 'date']]

In [50]:
als_df.shape

(5216, 4)

In [21]:
spark = SparkSession.builder.getOrCreate()


In [22]:
spark_als_df = spark.createDataFrame(als_df) 


In [27]:
spark_als_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- item_id: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- date: timestamp (nullable = true)



In [28]:
spark_als_df.limit(5).show()

+-------+-------+------+-------------------+
|user_id|item_id|rating|               date|
+-------+-------+------+-------------------+
|   1520|    596|   3.0|2005-08-02 00:00:00|
|   1520|    592|   4.0|2005-09-14 00:00:00|
|   1369|    480|   4.0|2006-05-13 00:00:00|
|   1369|    601|   5.0|2006-05-19 00:00:00|
|   1369|    488|   5.0|2006-05-22 00:00:00|
+-------+-------+------+-------------------+



In [31]:
pandas_als_df = spark_als_df.toPandas()

In [63]:
train = pandas_als_df[:int(len(pandas_als_df)*.8)]
test = pandas_als_df[int(len(pandas_als_df)*.8):]

In [64]:
test = test[['user_id', 'item_id', 'rating']]
test.head()

Unnamed: 0,user_id,item_id,rating
4172,115,478,4.0
4173,346,296,5.0
4174,149,78,4.0
4175,533,308,5.0
4176,59,39,4.0


In [65]:
train = train[['user_id', 'item_id', 'rating']]

In [66]:
train.head()

Unnamed: 0,user_id,item_id,rating
0,1520,596,3.0
1,1520,592,4.0
2,1369,480,4.0
3,1369,601,5.0
4,1369,488,5.0


In [105]:
als_model = ALS(
    itemCol='item_id',
    userCol='user_id',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=10) 

In [106]:
spark_train = spark.createDataFrame(train) 

In [107]:
spark_test = spark.createDataFrame(test)

In [108]:
recommender = als_model.fit(spark_train)

In [109]:
train_preds = recommender.transform(spark_train)

In [110]:
test_preds = recommender.transform(spark_test)

In [111]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [112]:
rmse_train = evaluator.evaluate(train_preds)

In [113]:
rmse_train

0.1541621332564429

In [114]:
rmse_test = evaluator.evaluate(test_preds)

In [115]:
rmse_test

nan

In [99]:
restaurant_recs = recommender.recommendForAllUsers(10)


In [104]:
restaurant_recs.limit(5).toPandas()

Unnamed: 0,user_id,recommendations
0,471,"[(616, 5.617592811584473), (344, 5.07469081878..."
1,1342,"[(350, 3.8274924755096436), (162, 3.8184649944..."
2,463,"[(16, 5.386734962463379), (94, 5.3835062980651..."
3,833,"[(87, 5.153095245361328), (225, 5.072264671325..."
4,496,"[(16, 4.219986915588379), (64, 4.0130820274353..."
