In [1]:
import os
import pandas as pd
import numpy as np
import gc

In [2]:
os.chdir('/project/ds5559/Group1_Netflix')

# Imports

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import substring, length, col, expr
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline  

# Start Spark

In [4]:
spark = SparkSession.builder.getOrCreate()

# Read in the formatted data as Spark Dataframes

In [5]:
reviews_df=spark.read.csv("formatted_data.csv",header=True,inferSchema=True)
movies_df=spark.read.csv("movie_titles.csv",header=False,inferSchema=True)

In [6]:
from pyspark.sql.types import IntegerType

movies_df=movies_df.select(col('_c0').alias("movie_id"), \
  col('_c1').alias("movie_year").cast(IntegerType()), \
  col('_c2').alias("movie_name")) ##renaming so that we can run a simpler "join on (name)" later

### Drop Null Movies and recompute stats

In [7]:
null_movie_ids=movies_df.filter((col("movie_year").isNull()) | (col("movie_name") == "NULL")).select("movie_id").rdd.map(lambda row: row[0]).collect()
print(null_movie_ids)

[4388, 4794, 7241, 10782, 15918, 16678, 17667]


In [8]:
movies_df=movies_df.filter(~movies_df.movie_id.isin(null_movie_ids))
reviews_df=reviews_df.filter(~reviews_df.movie_id.isin(null_movie_ids))

In [9]:
seed = 314
weights = [.0001, .0001,.9998]
train, test, waste = reviews_df.randomSplit(weights, seed)
waste = None #free up mem

In [10]:
user_agg_df = reviews_df.groupBy("user_id").agg(F.count("rating")).sort(F.desc("count(rating)"))

In [11]:
num_top_reviewers = 1000 #number of top reviewers to use, corresponds to num columns
valid_reviewers = user_agg_df.limit(num_top_reviewers).select("user_id").rdd.map(lambda row: int(row[0])).collect()
valid_reviews = reviews_df.filter(reviews_df.user_id.isin(valid_reviewers))
#valid_reviews.count() #uncomment to see how many reviews total we have
train= valid_reviews #TODO maybe split this up?

In [12]:
#new_train=train.limit(5)
new_train = train

## Simplifying Features

In the code below we do several tasks: 
* Join datasets to get year and movie id as features.
* Create a new feature 'movie_age'
* Drop columns that aren't going to add any information to the model.

In [13]:
#reviews_movies = train.join(movies_df, on=['movie_id'], how='outer') #joining frames
reviews_movies = movies_df.join(new_train, on=['movie_id'])
reviews_movies = reviews_movies.withColumn('year_trunc', expr("substring(date,0,4)"))
reviews_movies = reviews_movies.withColumn('movie_age',reviews_movies.year_trunc.cast(IntegerType())-reviews_movies.movie_year.cast(IntegerType())) #defining a new feature by calculating (very roughly) the age of the movie when it was reviewed
df_for_kmeans=reviews_movies.drop(col("date")).drop(col("year_trunc")).drop(col("movie_name")) #dropping the columns used in the calculation of movie_age
#df_for_kmeans.show(20)

In [14]:
#Free up some memory by killing unneeded dataframes
reviews_movies = None
movies_df = None
reviews_df = None
gc.collect()

7201

In [15]:
spark.conf.set('spark.sql.pivotMaxValues', u'100000')
pivoted=df_for_kmeans.groupBy("movie_id").pivot("user_id").avg("rating")

In [16]:
pivoted = pivoted.fillna(0) #this fill na can go in the above line, left separate to test memory reqs

## K-Means Clustering

Now we will try K-Means clustering on all of the data. We include the response variable in features, because the initial goal of this unsupervised learning method is just to find out how many clusters yield the strongest silhouette score and then discuss how that score holds up. But dropping features from the model may provide some information

In [17]:
assembler1 = VectorAssembler(inputCols=[col for col in pivoted.columns if col!= "movie_id"], outputCol="features",handleInvalid = "skip") #assembling the features
dataset=assembler1.transform(pivoted) #adding features using transform

In [18]:
dataset=dataset.select("movie_id","features").cache() #reduce what we need to store and cache

### Create a Scaled DataFrame

In [19]:
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(dataset)
data_scale_output=data_scale.transform(dataset)
data_scale_output.show(2)

+--------+--------------------+--------------------+
|movie_id|            features|        standardized|
+--------+--------------------+--------------------+
|     148|(1000,[0,1,2,4,6,...|(1000,[0,1,2,4,6,...|
|     463|(1000,[1,2,9,20,2...|(1000,[1,2,9,20,2...|
+--------+--------------------+--------------------+
only showing top 2 rows



### Evaluating Silhouette Scores

In [20]:
def kmeans_range(lower,upper,dat,features): #function for iterating through various Ks and computing silhouette scores
    scores=[]
    ks= [k for k in range(lower,upper+1)]
    evaluator = ClusteringEvaluator()
    for k in ks: #upper and lower bounds are the range; +1 is to include 10
        kmeans = KMeans(featuresCol=features).setK(k).setSeed(314).setMaxIter(3) #k is the iterator, could probably pass "features" in as an argument to the function
        model = kmeans.fit(dat) #run the model
        predictions = model.transform(dat) #predict (in cluster or out)
        # Evaluate clustering by computing Silhouette score
        silhouette = evaluator.evaluate(predictions)
        scores.append(silhouette)
        print(k)#give progress update
    df=pd.DataFrame({'k': pd.Series(ks, dtype='int'),
               'sil_score': pd.Series(scores, dtype='float')}).sort_values('sil_score',ascending=False)
    return df

In [21]:
def kmeans_range_tune(lower,upper,dat,features): #function for iterating through various Ks and computing silhouette scores
    iter_vals = [1,3,5,10,20,30,40,50,100] #configure this to test different values
    k_vals= [k for k in range(lower,upper+1)]
    scores=[]
    ks = []
    iters = []
    
    evaluator = ClusteringEvaluator()
    for num_iter in iter_vals:
        for k in k_vals: #upper and lower bounds are the range; +1 is to include 10
            kmeans = KMeans(featuresCol= features).setK(k).setSeed(314).setMaxIter(num_iter) #k is the iterator, could probably pass "features" in as an argument to the function
            model = kmeans.fit(dat) #run the model
            predictions = model.transform(dat) #predict (in cluster or out)
            # Evaluate clustering by computing Silhouette score
            silhouette = evaluator.evaluate(predictions)
            ks.append(k)
            iters.append(num_iter)
            scores.append(silhouette)
        print(num_iter)#give progress update
    df=pd.DataFrame({'max_iter': pd.Series(iters, dtype='int'),
                     'k': pd.Series(ks, dtype='int'),
               'sil_score': pd.Series(scores, dtype='float')}).sort_values('sil_score',ascending=False)
    return df

In [22]:
kmeans_frame=kmeans_range(2,10,data_scale_output, 'features') #print a dataframe with the silhouette scores - this is trying on the data before scaling - do we need to use standard scaler?
print(kmeans_frame.head(10))

2
3
4
5
6
7
8
9
10
    k  sil_score
0   2   0.686687
1   3   0.579240
2   4   0.479950
3   5   0.402497
4   6   0.253708
6   8   0.229335
5   7   0.148384
7   9   0.007956
8  10  -0.041877


#### Running $K$ Means with scaled data

In [23]:
kmeans_frame_std=kmeans_range(2,10,data_scale_output, 'standardized') #print a dataframe with the silhouette scores - this is trying on the data before scaling - do we need to use standard scaler?
print(kmeans_frame.head(10))

2
3
4
5
6
7
8
9
10
    k  sil_score
0   2   0.686687
1   3   0.579240
2   4   0.479950
3   5   0.402497
4   6   0.253708
6   8   0.229335
5   7   0.148384
7   9   0.007956
8  10  -0.041877


In [24]:
kmeans_tuned_frame=kmeans_range_tune(2,10,data_scale_output,'standardized') #print a dataframe with the silhouette scores
print(kmeans_tuned_frame.head(10))

1
3
5
10
20
30
40
50
100
    max_iter  k  sil_score
0          1  2   0.705261
9          3  2   0.690397
18         5  2   0.686225
54        40  2   0.685270
36        20  2   0.685270
45        30  2   0.685270
27        10  2   0.685270
72       100  2   0.685270
63        50  2   0.685270
38        20  4   0.556725


In [25]:
kmeans = KMeans(featuresCol='standardized').setK(4).setSeed(314).setMaxIter(3)
model = kmeans.fit(data_scale_output) #run the model
predictions = model.transform(data_scale_output)
predictions.select("movie_id","standardized","prediction").show(10)

+--------+--------------------+----------+
|movie_id|        standardized|prediction|
+--------+--------------------+----------+
|     148|(1000,[0,1,2,4,6,...|         2|
|     463|(1000,[1,2,9,20,2...|         0|
|     496|(1000,[116,157,20...|         1|
|     833|(1000,[0,4,5,6,7,...|         2|
|    1342|(1000,[116,136,15...|         1|
|    1580|(1000,[31,70,94,1...|         1|
|    1645|(1000,[0,3,5,6,7,...|         2|
|    1959|(1000,[0,5,6,9,10...|         3|
|    2122|[2.78362675444102...|         2|
|    2366|(1000,[1,3,5,7,8,...|         0|
+--------+--------------------+----------+
only showing top 10 rows



In [26]:
# Free up memory for Pipeline
predictions = None
kmeans = None
model = None
assembler1 = None
dataset = None
gc.collect()

858

#### Add steps into a model Pipeline

In [27]:
assembler = VectorAssembler(inputCols=[col for col in pivoted.columns if col!= "movie_id"], outputCol="features",handleInvalid = "skip") #assembling the features

scale = StandardScaler(inputCol='features',outputCol='standardized')
 
kmeans = KMeans(featuresCol='standardized').setK(4).setSeed(314).setMaxIter(20)

# Build the pipeline; this takes the objects as inputs
pipeline = Pipeline(stages=[assembler, scale, kmeans])

In [28]:
model = pipeline.fit(pivoted).transform(pivoted)

In [29]:
modelSave = pipeline.fit(pivoted)
modelSave.write().overwrite().save('kmeans_model')

In [30]:
!du -h kmeans_model

20K	kmeans_model/metadata
56K	kmeans_model/stages/2_KMeans_ec58aaca6b4d/data
20K	kmeans_model/stages/2_KMeans_ec58aaca6b4d/metadata
80K	kmeans_model/stages/2_KMeans_ec58aaca6b4d
32K	kmeans_model/stages/0_VectorAssembler_71bc2cda2c83/metadata
36K	kmeans_model/stages/0_VectorAssembler_71bc2cda2c83
40K	kmeans_model/stages/1_StandardScaler_7da191d12dfe/data
20K	kmeans_model/stages/1_StandardScaler_7da191d12dfe/metadata
64K	kmeans_model/stages/1_StandardScaler_7da191d12dfe
184K	kmeans_model/stages
208K	kmeans_model


In [31]:
model.select('movie_id','standardized','prediction').show(10)

+--------+--------------------+----------+
|movie_id|        standardized|prediction|
+--------+--------------------+----------+
|     148|(1000,[0,1,2,4,6,...|         0|
|     463|(1000,[1,2,9,20,2...|         0|
|     496|(1000,[116,157,20...|         1|
|     833|(1000,[0,4,5,6,7,...|         0|
|    1342|(1000,[116,136,15...|         1|
|    1580|(1000,[31,70,94,1...|         1|
|    1645|(1000,[0,3,5,6,7,...|         0|
|    1959|(1000,[0,5,6,9,10...|         3|
|    2122|[2.78362675444102...|         2|
|    2366|(1000,[1,3,5,7,8,...|         0|
+--------+--------------------+----------+
only showing top 10 rows



#### Year Averaging

In [32]:
from pyspark.sql.functions import desc
preds_data = model.select("movie_id","prediction")
preds_joined = preds_data.join(new_train, on=['movie_id']) #joining frames
preds_joined.show()
preds_joined=preds_joined.withColumn('year_trunc', expr("substring(date,0,4)"))
rating_preds = preds_joined.groupBy("prediction").agg(F.avg("rating").alias('avg_rating'))
rating_preds.show()

+--------+----------+-------+------+----------+
|movie_id|prediction|user_id|rating|      date|
+--------+----------+-------+------+----------+
|     148|         0|1907667|   3.0|2005-12-15|
|     148|         0|1744889|   1.0|2002-09-04|
|     148|         0|1294425|   4.0|2004-04-08|
|     148|         0|1174811|   3.0|2002-06-14|
|     148|         0| 461110|   4.0|2003-10-20|
|     148|         0| 838130|   4.0|2003-05-14|
|     148|         0| 327122|   1.0|2003-07-07|
|     148|         0| 603277|   1.0|2001-12-24|
|     148|         0|  76196|   1.0|2001-11-29|
|     148|         0|2161899|   2.0|2002-06-26|
|     148|         0| 844049|   3.0|2004-06-07|
|     148|         0|2503887|   3.0|2004-08-04|
|     148|         0| 908205|   3.0|2005-03-29|
|     148|         0|1650301|   2.0|2005-06-05|
|     148|         0|2312349|   3.0|2004-08-18|
|     148|         0|2551641|   3.0|2003-08-01|
|     148|         0|1984086|   4.0|2004-08-18|
|     148|         0|   1333|   2.0|2004

In [33]:
!jupyter nbconvert --to pdf /sfs/qumulo/qhome/ja6mw/ds5110/DS5110-Netflix-Rating-Predictions-PySpark/KMeans.ipynb

[NbConvertApp] Converting notebook /sfs/qumulo/qhome/ja6mw/ds5110/DS5110-Netflix-Rating-Predictions-PySpark/KMeans.ipynb to pdf
[NbConvertApp] Writing 57595 bytes to notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', 'notebook.tex', '-quiet']
[NbConvertApp] Running bibtex 1 time: ['bibtex', 'notebook']
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 66265 bytes to /sfs/qumulo/qhome/ja6mw/ds5110/DS5110-Netflix-Rating-Predictions-PySpark/KMeans.pdf
