In [1]:
import numpy as np
import pandas as pd
from surprise.prediction_algorithms import knns
from surprise.similarities import cosine, msd, pearson
from surprise import accuracy
from surprise.model_selection import cross_validate, GridSearchCV
from surprise.prediction_algorithms import SVD, KNNWithMeans, KNNBasic, KNNBaseline
from sklearn.preprocessing import OneHotEncoder

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

In [2]:
spark = SparkSession.builder.master('local').getOrCreate() 

In [3]:
ratings = spark.read.json('data/ratings.json')#, schema=schema)

In [4]:
ratings.persist()
ratings.dtypes

[('movie_id', 'bigint'),
 ('rating', 'bigint'),
 ('timestamp', 'double'),
 ('user_id', 'bigint')]

In [5]:
als = ALS(maxIter=10,
          rank=10,
          userCol="user_id", 
          itemCol="movie_id", 
          ratingCol="rating")

model = als.fit(ratings)

In [6]:
predictions = model.transform(ratings)
predictions.persist()
evaluator = RegressionEvaluator(metricName='rmse',
                               labelCol='rating',
                               predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.812343252261968


In [7]:
predictions.persist()
predictions.show(5)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     148|     5|9.75592024E8|    673| 4.1599784|
|     148|     2|9.65634524E8|   4227| 1.9720497|
|     148|     4|9.68683753E8|   3184| 3.4084134|
|     148|     3| 9.6997537E8|   4784| 2.9059458|
|     148|     2|9.74388854E8|   2383| 2.3460627|
+--------+------+------------+-------+----------+
only showing top 5 rows



In [8]:
requests = spark.read.json('data/requests.json')
requests.dtypes

[('movie_id', 'bigint'),
 ('rating', 'double'),
 ('timestamp', 'double'),
 ('user_id', 'bigint')]

In [9]:
requests.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|    2019|   NaN|9.56678777E8|   6040|
|     759|   NaN|9.56679248E8|   6040|
|    2858|   NaN|9.56679275E8|   6040|
|     246|   NaN|9.56679413E8|   6040|
|    1617|   NaN|9.56679473E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [10]:
req_predict = model.transform(requests)

In [11]:
req_predict.show(5)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     148|   NaN|9.77959026E8|     53|       NaN|
|     148|   NaN|9.76559602E8|   4169| 3.1403506|
|     148|   NaN|9.89024856E8|   5333|  2.441079|
|     148|   NaN|9.77005381E8|   4387|   2.39551|
|     148|   NaN|9.66907208E8|   3539| 2.8318474|
+--------+------+------------+-------+----------+
only showing top 5 rows



In [12]:
final_requests = req_predict.drop('rating')
final_requests = final_requests.withColumnRenamed('prediction','rating')
final_requests.show(5)

+--------+------------+-------+---------+
|movie_id|   timestamp|user_id|   rating|
+--------+------------+-------+---------+
|     148|9.77959026E8|     53|      NaN|
|     148|9.76559602E8|   4169|3.1403506|
|     148|9.89024856E8|   5333| 2.441079|
|     148|9.77005381E8|   4387|  2.39551|
|     148|9.66907208E8|   3539|2.8318474|
+--------+------------+-------+---------+
only showing top 5 rows



In [13]:
movie_titles = spark.read.csv('data/movies.dat', sep=":")
movie_titles = movie_titles.drop('_c1','_c3')
movie_titles.persist()
movie_titles.head(5)

[Row(_c0='1', _c2='Toy Story (1995)', _c4="Animation|Children's|Comedy"),
 Row(_c0='2', _c2='Jumanji (1995)', _c4="Adventure|Children's|Fantasy"),
 Row(_c0='3', _c2='Grumpier Old Men (1995)', _c4='Comedy|Romance'),
 Row(_c0='4', _c2='Waiting to Exhale (1995)', _c4='Comedy|Drama'),
 Row(_c0='5', _c2='Father of the Bride Part II (1995)', _c4='Comedy')]

In [14]:
users = spark.read.csv('data/users.dat', sep=':')
users = users.drop('_c1', '_c3', '_c5', '_c7')
users.persist()
users.show(5)

+---+---+---+---+-----+
|_c0|_c2|_c4|_c6|  _c8|
+---+---+---+---+-----+
|  1|  F|  1| 10|48067|
|  2|  M| 56| 16|70072|
|  3|  M| 25| 15|55117|
|  4|  M| 45|  7|02460|
|  5|  M| 25| 20|55455|
+---+---+---+---+-----+
only showing top 5 rows



In [15]:
old_cols = ['_c0','_c2','_c4', '_c6', '_c8']
new_cols = ['id','gender','age_group','occupation','zipcode']

def rename_cols(new_cols,old_cols,data):
    for i in range(len(old_cols)):
        data = data.withColumnRenamed(old_cols[i],new_cols[i])
    return data

users = rename_cols(new_cols,old_cols,users)
users.show(5)

+---+------+---------+----------+-------+
| id|gender|age_group|occupation|zipcode|
+---+------+---------+----------+-------+
|  1|     F|        1|        10|  48067|
|  2|     M|       56|        16|  70072|
|  3|     M|       25|        15|  55117|
|  4|     M|       45|         7|  02460|
|  5|     M|       25|        20|  55455|
+---+------+---------+----------+-------+
only showing top 5 rows



In [16]:
new = ['id','title','genre']
old = ['_c0', '_c2','_c4']

movie_titles = rename_cols(new,old,movie_titles)
movie_titles.show(5)

+---+--------------------+--------------------+
| id|               title|               genre|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Animation|Childre...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
|  4|Waiting to Exhale...|        Comedy|Drama|
|  5|Father of the Bri...|              Comedy|
+---+--------------------+--------------------+
only showing top 5 rows



In [17]:
fulldata = ratings.join(movie_titles, ratings.movie_id == movie_titles.id)
fulldata = fulldata.join(users, fulldata.user_id == users.id)
fulldata.show(5)

+--------+------+------------+-------+----+--------------------+------------------+----+------+---------+----------+-------+
|movie_id|rating|   timestamp|user_id|  id|               title|             genre|  id|gender|age_group|occupation|zipcode|
+--------+------+------------+-------+----+--------------------+------------------+----+------+---------+----------+-------+
|     858|     4|9.56678732E8|   6040| 858|Godfather, The (1...|Action|Crime|Drama|6040|     M|       25|         6|  11106|
|    2384|     4|9.56678754E8|   6040|2384|                Babe|              null|6040|     M|       25|         6|  11106|
|     593|     5|9.56678754E8|   6040| 593|Silence of the La...|    Drama|Thriller|6040|     M|       25|         6|  11106|
|    1961|     4|9.56678777E8|   6040|1961|     Rain Man (1988)|             Drama|6040|     M|       25|         6|  11106|
|    1419|     3|9.56678856E8|   6040|1419|    Walkabout (1971)|             Drama|6040|     M|       25|         6|  11106|


In [18]:
fulldata = fulldata.drop('id')
fulldata.show(5)

+--------+------+------------+-------+--------------------+------------------+------+---------+----------+-------+
|movie_id|rating|   timestamp|user_id|               title|             genre|gender|age_group|occupation|zipcode|
+--------+------+------------+-------+--------------------+------------------+------+---------+----------+-------+
|     858|     4|9.56678732E8|   6040|Godfather, The (1...|Action|Crime|Drama|     M|       25|         6|  11106|
|    2384|     4|9.56678754E8|   6040|                Babe|              null|     M|       25|         6|  11106|
|     593|     5|9.56678754E8|   6040|Silence of the La...|    Drama|Thriller|     M|       25|         6|  11106|
|    1961|     4|9.56678777E8|   6040|     Rain Man (1988)|             Drama|     M|       25|         6|  11106|
|    1419|     3|9.56678856E8|   6040|    Walkabout (1971)|             Drama|     M|       25|         6|  11106|
+--------+------+------------+-------+--------------------+------------------+--

In [19]:
fulldata.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 719949 entries, 0 to 719948
Data columns (total 10 columns):
movie_id      719949 non-null int64
rating        719949 non-null int64
timestamp     719949 non-null float64
user_id       719949 non-null int64
title         719949 non-null object
genre         673236 non-null object
gender        719949 non-null object
age_group     719949 non-null object
occupation    719949 non-null object
zipcode       719949 non-null object
dtypes: float64(1), int64(3), object(6)
memory usage: 54.9+ MB


In [20]:
fulldata = fulldata.dropna()
fulldata.show(5)

+--------+------+------------+-------+--------------------+------------------+------+---------+----------+-------+
|movie_id|rating|   timestamp|user_id|               title|             genre|gender|age_group|occupation|zipcode|
+--------+------+------------+-------+--------------------+------------------+------+---------+----------+-------+
|     858|     4|9.56678732E8|   6040|Godfather, The (1...|Action|Crime|Drama|     M|       25|         6|  11106|
|     593|     5|9.56678754E8|   6040|Silence of the La...|    Drama|Thriller|     M|       25|         6|  11106|
|    1961|     4|9.56678777E8|   6040|     Rain Man (1988)|             Drama|     M|       25|         6|  11106|
|    1419|     3|9.56678856E8|   6040|    Walkabout (1971)|             Drama|     M|       25|         6|  11106|
|     213|     5|9.56678856E8|   6040|Burnt By the Sun ...|             Drama|     M|       25|         6|  11106|
+--------+------+------------+-------+--------------------+------------------+--

In [21]:
newdata = fulldata[['rating','timestamp','genre','gender','age_group','occupation']]
newdata.show(5)

+------+------------+------------------+------+---------+----------+
|rating|   timestamp|             genre|gender|age_group|occupation|
+------+------------+------------------+------+---------+----------+
|     4|9.56678732E8|Action|Crime|Drama|     M|       25|         6|
|     5|9.56678754E8|    Drama|Thriller|     M|       25|         6|
|     4|9.56678777E8|             Drama|     M|       25|         6|
|     3|9.56678856E8|             Drama|     M|       25|         6|
|     5|9.56678856E8|             Drama|     M|       25|         6|
+------+------------+------------------+------+---------+----------+
only showing top 5 rows



In [22]:
onehot = OneHotEncoder()
cols = ['genre','gender','age_group','occupation']
to_encode = newdata[cols].toPandas()

In [23]:
onehot.fit(to_encode)
new_cols = onehot.get_feature_names(cols)
ohe_data = onehot.transform(to_encode).toarray()

In [24]:
col_names = ['timestamp'] + list(new_cols)
ohe_data = np.concatenate((newdata[['timestamp']].toPandas(),ohe_data),axis=1)
X = pd.DataFrame(ohe_data,columns=col_names)
X.head()

Unnamed: 0,timestamp,genre_ Miami Beach (1988),genre_Action,genre_Action|Adventure,genre_Action|Adventure|Animation,genre_Action|Adventure|Animation|Children's|Fantasy,genre_Action|Adventure|Animation|Horror|Sci-Fi,genre_Action|Adventure|Children's|Comedy,genre_Action|Adventure|Children's|Fantasy,genre_Action|Adventure|Children's|Sci-Fi,...,occupation_19,occupation_2,occupation_20,occupation_3,occupation_4,occupation_5,occupation_6,occupation_7,occupation_8,occupation_9
0,956678732.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
1,956678754.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
2,956678777.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
3,956678856.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
4,956678856.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0


In [25]:
col_names[:10]

['timestamp',
 'genre_ Miami Beach (1988)',
 'genre_Action',
 'genre_Action|Adventure',
 'genre_Action|Adventure|Animation',
 "genre_Action|Adventure|Animation|Children's|Fantasy",
 'genre_Action|Adventure|Animation|Horror|Sci-Fi',
 "genre_Action|Adventure|Children's|Comedy",
 "genre_Action|Adventure|Children's|Fantasy",
 "genre_Action|Adventure|Children's|Sci-Fi"]

# KNN with Movie Data

In [26]:
y = newdata['rating']

targets = newdata.toPandas()

y = targets['rating']

In [27]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y)

from sklearn.neighbors import KNeighborsClassifier
clf = KNeighborsClassifier()
clf.fit(X_train, y_train)
test_preds = clf.predict(X_test)

print(test_preds)

from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score


def print_metrics(labels, preds):
    print("Precision Score: {}".format(precision_score(labels, preds, average=None)))
    print("Recall Score: {}".format(recall_score(labels, preds, average=None)))
    print("Accuracy Score: {}".format(accuracy_score(labels, preds)))
    print("F1 Score: {}".format(f1_score(labels, preds, average=None)))

print_metrics(y_test, test_preds) 



[1 4 4 ... 5 4 4]
Precision Score: [0.2299225  0.20371573 0.32100098 0.40674046 0.39316887]
Recall Score: [0.22479684 0.17958997 0.38126861 0.44300352 0.28003699]
Accuracy Score: 0.3501595280109798
F1 Score: [0.22733078 0.1908936  0.34854876 0.42409822 0.32709705]
