In [None]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.types import *
from scipy import sparse
from sklearn.metrics.pairwise import cosine_similarity

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
sc = spark.sparkContext

In [35]:
df = pd.read_csv('data/training.csv')

In [193]:
df.head()
df.shape

(800000, 4)

In [37]:
df.shape

(800000, 4)

In [38]:
df_training = spark.createDataFrame(df)

In [39]:
df_training.printSchema()

root
 |-- user: long (nullable = true)
 |-- movie: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [40]:
df_training.summary().show()

+-------+------------------+------------------+------------------+-------------------+
|summary|              user|             movie|            rating|          timestamp|
+-------+------------------+------------------+------------------+-------------------+
|  count|            800000|            800000|            800000|             800000|
|   mean|      3403.0978375|     1849.25725625|        3.59047875|9.683921498700112E8|
| stddev|1546.5890280451977|1086.8524851599616|1.1203761265092076| 5820930.9564835895|
|    min|               636|                 1|                 1|          956703932|
|    25%|              2033|              1028|                 3|          964152059|
|    50%|              3507|              1788|                 4|          967587699|
|    75%|              4694|              2750|                 4|          974687743|
|    max|              6040|              3952|                 5|          975767289|
+-------+------------------+---------------

In [26]:
train, test = df_training.randomSplit([.8,.2], seed=427471138)

In [27]:
print(train.count())
print(test.count())

640028
159972


In [28]:
from pyspark.ml.recommendation import ALS

In [57]:
# Create an untrained ALS model.
als_model = pyspark.ml.recommendation.ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    coldStartStrategy="drop",
    nonnegative=True,    
    regParam=0.1,
    rank=10)

In [58]:
recommender = als_model.fit(train)

In [69]:
train_pd = recommender.transform(train).toPandas()
train_spark = recommender.transform(train)
test_spark = recommender.transform(test)

In [66]:
test_pd = preds.toPandas()
test_pd = test_pd.fillna(train_pd['rating'].mean())
#test_spark = spark.createDataFrame(test_pd)

In [62]:
from pyspark.ml.evaluation import RegressionEvaluator

In [70]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating",metricName="rmse")
rmse_train=evaluator.evaluate(train_spark)
rmse_test=evaluator.evaluate(test_spark)

In [68]:
print(rmse_train, rmse_test)

0.8175956655408757 0.8769549767465412


Initial ALS Model to get items and users matrices

In [50]:
users_df = pd.read_table('data/users.dat', sep='::', names=['userID', 'gender', 'age', 'occupation', 'Zip-code' ])
users_df.head()

  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.


Unnamed: 0,userID,gender,age,occupation,Zip-code
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [289]:
requests_df = pd.read_csv('data/requests.csv')  #requests as a pd df
requests_spark = spark.createDataFrame(requests_df) #changing to spark df

In [290]:
requests_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200209 entries, 0 to 200208
Data columns (total 2 columns):
user     200209 non-null int64
movie    200209 non-null int64
dtypes: int64(2)
memory usage: 3.1 MB


In [291]:
requests_preds = recommender.transform(requests_spark)
requests_preds.show(20)

+----+-----+----------+
|user|movie|prediction|
+----+-----+----------+
|4169|  148| 3.1455045|
|5333|  148|  2.533397|
|4387|  148| 2.0944188|
| 840|  148|  2.626107|
| 752|  148|  3.338347|
| 970|  463|  2.763186|
|4169|  463|  2.536682|
| 721|  463| 3.3177676|
|1962|  463| 2.0736015|
| 934|  463| 2.2156308|
|1088|  471| 3.4190986|
|4186|  471| 2.8205547|
| 796|  471|   3.09946|
|1500|  471|  3.946217|
|4958|  471| 3.2345638|
|1243|  471| 3.4194014|
| 743|  471|  3.451454|
|2168|  471| 4.0295696|
|5333|  471| 3.3854756|
|2127|  471|  3.483123|
+----+-----+----------+
only showing top 20 rows



In [292]:
requests_preds

104424

In [71]:
rmse_preds=evaluator.evaluate(requests_preds)
print(requests_preds)

IllegalArgumentException: 'Field "rating" does not exist.\nAvailable fields: user, movie, prediction'

In [122]:
movies_df = pd.read_table('data/movies.dat', sep='::', names=['MovieID', 'Titles', 'Genre'])
movies_df.head(10)

  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.


Unnamed: 0,MovieID,Titles,Genre
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children's
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


In [123]:
titles = movies_df['Titles'].str.split('(',1, expand=True)
titles.rename(mapper={0:'Title', 1:'Year'}, axis=1, inplace=True)
titles.head()

Unnamed: 0,Title,Year
0,Toy Story,1995)
1,Jumanji,1995)
2,Grumpier Old Men,1995)
3,Waiting to Exhale,1995)
4,Father of the Bride Part II,1995)


In [124]:
genre = movies_df['Genre'].str.split('|', 2,expand=True)
genre.rename(mapper={0:'Genre1', 1:'Genre2', 2:'Genre3'}, axis=1, inplace=True)
genre.head()

Unnamed: 0,Genre1,Genre2,Genre3
0,Animation,Children's,Comedy
1,Adventure,Children's,Fantasy
2,Comedy,Romance,
3,Comedy,Drama,
4,Comedy,,


In [125]:
frames = [movies_df, genre, titles]
result = pd.concat(frames, axis=1)

In [126]:
result.head()

Unnamed: 0,MovieID,Titles,Genre,Genre1,Genre2,Genre3,Title,Year
0,1,Toy Story (1995),Animation|Children's|Comedy,Animation,Children's,Comedy,Toy Story,1995)
1,2,Jumanji (1995),Adventure|Children's|Fantasy,Adventure,Children's,Fantasy,Jumanji,1995)
2,3,Grumpier Old Men (1995),Comedy|Romance,Comedy,Romance,,Grumpier Old Men,1995)
3,4,Waiting to Exhale (1995),Comedy|Drama,Comedy,Drama,,Waiting to Exhale,1995)
4,5,Father of the Bride Part II (1995),Comedy,Comedy,,,Father of the Bride Part II,1995)


In [127]:
result.drop('Genre', axis=1, inplace=True)
result.drop('Titles', axis=1, inplace=True)

In [128]:
result.head()

Unnamed: 0,MovieID,Genre1,Genre2,Genre3,Title,Year
0,1,Animation,Children's,Comedy,Toy Story,1995)
1,2,Adventure,Children's,Fantasy,Jumanji,1995)
2,3,Comedy,Romance,,Grumpier Old Men,1995)
3,4,Comedy,Drama,,Waiting to Exhale,1995)
4,5,Comedy,,,Father of the Bride Part II,1995)


In [73]:
from pyspark.ml.feature import VectorAssembler

In [129]:
spark_movies = spark.createDataFrame(result)

In [130]:
spark_movies.printSchema()

root
 |-- MovieID: long (nullable = true)
 |-- Genre1: string (nullable = true)
 |-- Genre2: string (nullable = true)
 |-- Genre3: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Year: string (nullable = true)



In [131]:
vec_assembler = VectorAssembler(inputCols=['MovieID', 'Genre1', 'Genre2', 'Genre3', 'Title', 'Year'],
                                  outputCol="features")

In [133]:
vector = vec_assembler.transform(spark_movies)

IllegalArgumentException: 'Data type string of column Genre1 is not supported.\nData type string of column Genre2 is not supported.\nData type string of column Genre3 is not supported.\nData type string of column Title is not supported.\nData type string of column Year is not supported.'

In [134]:
from sklearn.feature_extraction.text import TfidfVectorizer as TFIDF

In [None]:
tfidf = TFIDF()

In [135]:
from sklearn.cluster import KMeans

In [160]:
users_df.head()

Unnamed: 0,userID,gender,age,occupation,Zip-code
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [163]:
users_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6040 entries, 0 to 6039
Data columns (total 5 columns):
userID        6040 non-null int64
gender        6040 non-null object
age           6040 non-null int64
occupation    6040 non-null int64
Zip-code      6040 non-null object
dtypes: int64(3), object(2)
memory usage: 236.0+ KB


In [164]:
users_df = users_df.iloc[:,:-1]
#drop zip code

In [165]:
users = pd.get_dummies(users_df, columns=['gender'], drop_first=True)
users.head()

Unnamed: 0,userID,age,occupation,gender_M
0,1,1,10,0
1,2,56,16,1
2,3,25,15,1
3,4,45,7,1
4,5,25,20,1


In [166]:
kmdata = users.iloc[:,1:]

In [168]:
km = KMeans(n_clusters=6, n_jobs=-1, random_state=1)

In [169]:
kmeans_output = km.fit(kmdata)

In [171]:
labels = kmeans_output.labels_

In [172]:
cc = kmeans_output.cluster_centers_
cc

array([[23.14347826, 15.32695652,  0.83826087],
       [49.29150824,  3.40937896,  0.61343473],
       [50.13971743, 15.00313972,  0.79277865],
       [ 1.        ,  9.00900901,  0.64864865],
       [35.        ,  8.80469405,  0.71668064],
       [22.27379209,  3.33333333,  0.67301122]])

In [176]:
labels

array([3, 2, 0, ..., 1, 1, 5], dtype=int32)

In [177]:
from sklearn import preprocessing

In [179]:
spark_movies.show()

+-------+---------+----------+--------+--------------------+-----+
|MovieID|   Genre1|    Genre2|  Genre3|               Title| Year|
+-------+---------+----------+--------+--------------------+-----+
|      1|Animation|Children's|  Comedy|          Toy Story |1995)|
|      2|Adventure|Children's| Fantasy|            Jumanji |1995)|
|      3|   Comedy|   Romance|    null|   Grumpier Old Men |1995)|
|      4|   Comedy|     Drama|    null|  Waiting to Exhale |1995)|
|      5|   Comedy|      null|    null|Father of the Bri...|1995)|
|      6|   Action|     Crime|Thriller|               Heat |1995)|
|      7|   Comedy|   Romance|    null|            Sabrina |1995)|
|      8|Adventure|Children's|    null|       Tom and Huck |1995)|
|      9|   Action|      null|    null|       Sudden Death |1995)|
|     10|   Action| Adventure|Thriller|          GoldenEye |1995)|
|     11|   Comedy|     Drama| Romance|American Presiden...|1995)|
|     12|   Comedy|    Horror|    null|Dracula: Dead and...|19

In [180]:
train.show()

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|4615|   34|     4|
|4615|   47|     5|
|4615|   50|     5|
|4615|  293|     5|
|4615|  377|     3|
|4615|  454|     3|
|4615|  457|     3|
|4615|  492|     3|
|4615|  587|     3|
|4615|  593|     4|
|4615|  608|     5|
|4615|  733|     4|
|4615|  780|     1|
|4615|  924|     4|
|4615| 1036|     3|
|4615| 1089|     5|
|4615| 1210|     5|
|4615| 1214|     4|
|4615| 1249|     4|
|4615| 1296|     4|
+----+-----+------+
only showing top 20 rows



In [181]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [192]:
requests_spark.show()

+----+-----+
|user|movie|
+----+-----+
|4958| 1924|
|4958| 3264|
|4958| 2634|
|4958| 1407|
|4958| 2399|
|4958| 3489|
|4958| 2043|
|4958| 2453|
|5312| 3267|
|5948| 3098|
|5948| 1180|
|3158| 2648|
| 403| 1036|
|3693|  468|
|5950| 1262|
|5950| 3555|
|5950| 3793|
|5950| 3578|
|5950| 3948|
|5950| 3893|
+----+-----+
only showing top 20 rows



In [189]:
vec_assembler = VectorAssembler(inputCols=[],
                                  outputCol="features")

IllegalArgumentException: 'Field "MovieID" does not exist.\nAvailable fields: user, movie, rating'

In [187]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [188]:
scalerModel = scaler.fit(vec_assembler)

AttributeError: 'VectorAssembler' object has no attribute '_jdf'

In [220]:
x = pd.DataFrame(np.zeros([5,2]))
y= 'abcd'

In [228]:
from pyspark.sql.functions import isnan, col, count

In [269]:
def populator(df, recommender):
    uf = recommender.userFactors
    mf = recommender.itemFactors  
    for row in df.iterrows():
        user_feature = uf.filter(col('id')==row[1][0])
        movie_feature = mf.filter(col('id')==row[1][1])
#       df['rating'][row] = np.dot(user_feature.collect()[0][1], movie_feature.collect()[0][1])
    return df

In [270]:
populator(requests_df, recommender)

AttributeError: 'numpy.int64' object has no attribute '_get_object_id'

In [237]:
mf = recommender.itemFactors

In [238]:
mf

DataFrame[id: int, features: array<float>]

In [252]:
requests_df

Unnamed: 0,user,movie
0,4958,1924
1,4958,3264
2,4958,2634
3,4958,1407
4,4958,2399
5,4958,3489
6,4958,2043
7,4958,2453
8,5312,3267
9,5948,3098


In [268]:
for row in requests_df.iterrows():
    print(row)
    break

(0, user     4958
movie    1924
Name: 0, dtype: int64)


In [279]:
r = recommender.transform(requests_spark)

In [280]:
RP = r.toPandas()

In [281]:
RP.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 104424 entries, 0 to 104423
Data columns (total 3 columns):
user          104424 non-null int64
movie         104424 non-null int64
prediction    104424 non-null float32
dtypes: float32(1), int64(2)
memory usage: 2.0 MB


In [288]:
RP.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 104424 entries, 0 to 104423
Data columns (total 3 columns):
user          104424 non-null int64
movie         104424 non-null int64
prediction    104424 non-null float32
dtypes: float32(1), int64(2)
memory usage: 2.0 MB
