In [8]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt 



import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


# Build our Spark Session and Context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark, sc


from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import countDistinct, col

In [9]:
movie_data = pd.read_csv('data/movies.dat',
                                   sep="\t|::",
                                   names=['movie_id','title','genres'], 
                                   header=None, 
                                   engine="python")

In [14]:
def load_data_to_spark():
    df = pd.read_csv('data/training.csv')
    spark = SparkSession.builder.getOrCreate()
    s_df = spark.createDataFrame(df)
    return s_df, df
 

In [15]:
s_df, p_df = load_data_to_spark()

In [16]:
s_df.show()

+----+-----+------+---------+
|user|movie|rating|timestamp|
+----+-----+------+---------+
|6040|  858|     4|956703932|
|6040|  593|     5|956703954|
|6040| 2384|     4|956703954|
|6040| 1961|     4|956703977|
|6040| 2019|     5|956703977|
|6040| 1419|     3|956704056|
|6040|  573|     4|956704056|
|6040| 3111|     5|956704056|
|6040|  213|     5|956704056|
|6040| 3505|     4|956704056|
|6040| 1734|     2|956704081|
|6040|  912|     5|956704191|
|6040|  919|     5|956704191|
|6040| 2503|     5|956704191|
|6040|  527|     5|956704219|
|6040|  318|     4|956704257|
|6040| 1252|     5|956704257|
|6040|  649|     5|956704257|
|6040| 3289|     5|956704305|
|6040|  759|     5|956704448|
+----+-----+------+---------+
only showing top 20 rows



In [17]:
def get_density():
    # get density from original data
    p_df, _ = load_data_to_spark()
    n_ratings = s_df.count()
    n_users = s_df.select('user').distinct().count()
    n_movies = s_df.select('movie').distinct().count()
    density = n_ratings / (n_users * n_movies)
    print('The original density is: {} '.format(density))
    return density

In [18]:
def traintestsplit():
    ratings_df, _ = load_data_to_spark()
    train, test = ratings_df.randomSplit([0.8, 0.2], seed=427471138)
    return train, test

In [19]:
def get_train_density():
    # get density from original data
    train, _ = traintestsplit()
    n_ratings = train.count()
    n_users = train.select('user').distinct().count()
    n_movies = train.select('movie').distinct().count()
    density = n_ratings / (n_users * n_movies)
    print('The train desnsity is: {} '.format(density))
    return density

In [20]:
def get_test_density():
    # get density from original data
    _, test = traintestsplit()
    n_ratings = test.count()
    n_users = test.select('user').distinct().count()
    n_movies = test.select('movie').distinct().count()
    density = n_ratings / (n_users * n_movies)
    print('The test desnsity is: {} '.format(density))
    return density

In [21]:
get_density()

The original density is: 0.04046302241176001 


0.04046302241176001

In [22]:
get_train_density()

The train desnsity is: 0.03257245318242673 


0.03257245318242673

In [23]:
get_test_density()

The test desnsity is: 0.008776990345157408 


0.008776990345157408

In [24]:
# instantiate the model and set its parameters
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=10) 

In [26]:
# fitting
train, test = traintestsplit()
recommender = als_model.fit(train)

In [27]:
# find if there are some movies that do not appear in train
# s_df is dataset before split 
train.show()


+----+-----+------+---------+
|user|movie|rating|timestamp|
+----+-----+------+---------+
|3375|  265|     1|967595224|
|3375|  293|     3|967595382|
|3375|  434|     4|967595560|
|3375| 1090|     5|967595382|
|3375| 1562|     3|967595188|
|3375| 1587|     3|967595560|
|3375| 1625|     4|967595451|
|3375| 2000|     5|967595474|
|3375| 2001|     4|967595513|
|3375| 2302|     5|967595474|
|3375| 2431|     3|967595637|
|3375| 2529|     4|967595432|
|3375| 2959|     4|967595407|
|3375| 3176|     1|967595612|
|3375| 3177|     4|967595535|
|3375| 3263|     3|967595612|
|3375| 3452|     5|967595535|
|3375| 3753|     5|967595261|
|3375| 3755|     5|967595286|
|3375| 3879|     5|967595286|
+----+-----+------+---------+
only showing top 20 rows



In [37]:
# distinct = s_df.select("movie").distinct().show()
train_pd = train.toPandas()
# distinct = p_df.movie.unique()
# distinct
distinct = train_pd.movie.unique()

In [38]:
not_in_train = []
for row in s_df.rdd.collect():
    movie = row['movie']
    if movie not in distinct:
        not_in_train.append(movie)
not_in_train
    
    

[3522,
 530,
 2685,
 1842,
 887,
 1709,
 3228,
 3337,
 1685,
 2685,
 1470,
 655,
 139,
 1316,
 989,
 2685,
 2217,
 2484,
 579,
 601,
 3220,
 1118,
 3377,
 641,
 3126,
 1471,
 2556]

In [39]:
# idea for cold start - if movie doesn't show up in test, don't recommend it in top 5
# item to item similarity
# neighborhoods - size of these that you use
# use combinations of different models 
# types of similarity - cosine similarity
# cold start - user shows up that hasn't rated any movies or movie thats never been reviewed
# 

In [None]:
# neighborhoods 
# similarity based recommender 
# 