In [3]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt 



import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import countDistinct, col

In [4]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark, sc

(<pyspark.sql.session.SparkSession at 0x11bf4e0f0>,
 <SparkContext master=local[*] appName=pyspark-shell>)

In [5]:
df=pd.read_csv('data/training.csv')

In [6]:
df.head()

Unnamed: 0,user,movie,rating,timestamp
0,6040,858,4,956703932
1,6040,593,5,956703954
2,6040,2384,4,956703954
3,6040,1961,4,956703977
4,6040,2019,5,956703977


In [7]:
del df['timestamp']

In [8]:
df.head()

Unnamed: 0,user,movie,rating
0,6040,858,4
1,6040,593,5
2,6040,2384,4
3,6040,1961,4
4,6040,2019,5


In [9]:
sdf=spark.createDataFrame(df)

In [10]:
sdf.show()

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|6040|  858|     4|
|6040|  593|     5|
|6040| 2384|     4|
|6040| 1961|     4|
|6040| 2019|     5|
|6040| 1419|     3|
|6040|  573|     4|
|6040| 3111|     5|
|6040|  213|     5|
|6040| 3505|     4|
|6040| 1734|     2|
|6040|  912|     5|
|6040|  919|     5|
|6040| 2503|     5|
|6040|  527|     5|
|6040|  318|     4|
|6040| 1252|     5|
|6040|  649|     5|
|6040| 3289|     5|
|6040|  759|     5|
+----+-----+------+
only showing top 20 rows



In [11]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [12]:
train, validate = sdf.randomSplit([0.8, 0.2])

In [95]:
als= ALS(itemCol='movie',userCol='user',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [96]:
model_als=als.fit(train)

In [97]:
pred=model_als.transform(validate)

In [98]:
pred_als=pred.toPandas()

In [99]:
pred_als.head()

Unnamed: 0,user,movie,rating,prediction
0,1605,148,2,2.391161
1,3841,463,3,2.585815
2,2051,463,1,2.103802
3,2210,463,3,3.363463
4,2777,463,3,3.107105


In [100]:
del pred_als['rating']

In [101]:
pred_als

Unnamed: 0,user,movie,prediction
0,1605,148,2.391161
1,3841,463,2.585815
2,2051,463,2.103802
3,2210,463,3.363463
4,2777,463,3.107105
5,5511,463,3.329710
6,5300,471,4.045573
7,1884,471,3.343160
8,5847,471,3.955298
9,3211,471,3.515817


In [14]:
#tuning the model
param_grid=ParamGridBuilder().addGrid(als.rank,[12,13,14]).addGrid(als.maxIter,[20,21,22]).addGrid(als.regParam,[.13,.14,.15]).build()

#Define evaluator as RMSE
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

#Build CrossValidation
tvs=TrainValidationSplit(estimator=als,estimatorParamMaps=param_grid,evaluator=evaluator)

In [15]:
model=tvs.fit(train)

In [16]:
best_model=model.bestModel

In [17]:
predictions=best_model.transform(validate)

In [18]:
rmse=evaluator.evaluate(predictions)

In [19]:
rmse

0.879101938344237

In [20]:
print("RMSE",rmse)
print("Best Model\n")
print("Best Rank:", best_model.rank)
print("maxIter:", best_model._java_obj.parent().getMaxIter())
print("RegParam", best_model._java_obj.parent().getRegParam())

RMSE 0.879101938344237
Best Model

Best Rank: 14
maxIter: 22
RegParam 0.13


In [21]:
df_pred=predictions.toPandas()

In [22]:
df_pred.head()

Unnamed: 0,user,movie,rating,prediction
0,1605,148,2,2.267845
1,3841,463,3,2.615297
2,2051,463,1,2.195916
3,2210,463,3,3.213562
4,2777,463,3,3.019728


In [23]:
del df_pred['rating']

In [24]:
df_pred.head()

Unnamed: 0,user,movie,prediction
0,1605,148,2.267845
1,3841,463,2.615297
2,2051,463,2.195916
3,2210,463,3.213562
4,2777,463,3.019728


In [100]:
df_pred.to_csv('/Users/nivzz/Downloads/recommend_matrix_fact_2.csv')

In [25]:
#for us to use to test our Predictions against a Validation Set
#format should has to be exactly:
'''
df_validation = ['user','movie','predicted_rating']
df_actual = ['user','movie','actual_rating']
Example on how you can rename cols
df_preds.columns= ['user', 'movie' ,'predicted_rating'] 
df_actual.columns = ['user', 'movie' ,'actual_rating'] 
'''
def compute_score(df_validation, df_actual):
    """Look at 5% of most highly predicted movies for each user.
    Return the average actual rating of those movies.
    """
    df = pd.merge(df_validation, df_actual, on=['user','movie']).fillna(1.0)
    #df = pd.concat([predictions.fillna(1.0), actual.actualrating], axis=1)
    # for each user
    g = df.groupby('user')
    # detect the top_5 movies as predicted by your algorithm
    top_5 = g.prediction.transform(
        lambda x: x >= x.quantile(.95)
    )
    # return the mean of the actual score on those
    return df.rating[top_5==1].mean()

In [26]:
compute_score(df_pred,df)

4.43147583402643

In [87]:
testing=pd.read_csv("data/fake_testing.csv")

In [88]:
test_fake=spark.createDataFrame(testing)

In [49]:
testing.head()

Unnamed: 0,user,movie,rating
0,4958,1924,4
1,4958,3264,4
2,4958,2634,2
3,4958,1407,4
4,4958,2399,2


In [39]:
#test_fake=spark.read.csv('data/fake_testing.csv')

In [107]:
train_fake, validate_fake = test_fake.randomSplit([1.0, 0.0])

In [108]:
prediction_fake_test=best_model.transform(train_fake)

In [109]:
df_pred_fake=prediction_fake_test.toPandas()

In [110]:
del df_pred_fake['rating']

In [106]:
data=df_pred_fake.groupby('prediction').size()
#data.sort_values(ascending=False)

In [111]:
compute_score(df_pred_fake,testing)

2.5029229711141676

In [55]:
request=pd.read_csv('data/requests.csv')

In [127]:
request.shape

(200209, 2)

In [57]:
#request_test=spark.read.csv('data/requests.csv')

In [113]:
request_test=spark.createDataFrame(request)

In [71]:
len(request)

200209

In [114]:
train_request, validate_request = request_test.randomSplit([1.0, 0.0])

In [115]:
request_p=best_model.transform(train_request)

In [122]:
check=best_model.transform(request_test)

In [123]:
final=check.toPandas()

In [130]:
final.head()

Unnamed: 0,user,movie,prediction
0,4169,148,2.985415
1,5333,148,2.458104
2,4387,148,2.360365
3,840,148,2.543673
4,752,148,2.795214


In [128]:
final.shape

(104380, 3)

In [74]:
request_p

DataFrame[user: bigint, movie: bigint, rating: bigint, prediction: float]

In [131]:
result=request_p.toPandas()

In [132]:
result.head()

Unnamed: 0,user,movie,prediction
0,4169,148,2.985415
1,5333,148,2.458104
2,4387,148,2.360365
3,840,148,2.543673
4,752,148,2.795214


In [129]:
result.shape

(104380, 3)

In [133]:
result['rating']=result['prediction']

In [134]:
del result['prediction']

In [135]:
result.head()

Unnamed: 0,user,movie,rating
0,4169,148,2.985415
1,5333,148,2.458104
2,4387,148,2.360365
3,840,148,2.543673
4,752,148,2.795214


In [136]:
result.to_csv('/Users/nivzz/Downloads/request_final.csv',index=False)

In [138]:
requestdata=pd.read_csv('request_final.csv')

In [139]:
requestdata.head()

Unnamed: 0,user,movie,rating
0,4169,148,2.985415
1,5333,148,2.458104
2,4387,148,2.360365
3,840,148,2.543673
4,752,148,2.795214


In [7]:
from pyspark.sql import SparkSession

# Setup a SparkSession
spark = SparkSession.builder.getOrCreate()
...

# Convert a Pandas DF to a Spark DF
sdf = spark.createDataFrame(training) 

# Convert a Spark DF to a Pandas DF
df = sdf.toPandas()

In [62]:
df.head()

Unnamed: 0,user,movie,rating
0,6040,858,4
1,6040,593,5
2,6040,2384,4
3,6040,1961,4
4,6040,2019,5


In [140]:
session.close()

NameError: name 'session' is not defined