# Import Packages and Data

In [1]:
# import necessary libraries
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.sql.functions import explode
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

In [2]:
# instantiate SparkSession object
spark = SparkSession\
        .builder\
        .appName("ALSExample").config("spark.driver.host","localhost")\
        .getOrCreate()

In [3]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('../data/test_data/ratings.csv', header='true', inferSchema = 'true')


In [4]:
movie_ratings = movie_ratings.drop('timestamp')


# Grid Search Optimal Hyperparameters with Cross Validation

In [5]:
als_model =  ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
               
params = ParamGridBuilder().addGrid(als_model.regParam, [0.15]).addGrid(als_model.rank, [42]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [6]:
## instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)
best_model = cv.fit(movie_ratings)    

In [7]:
predictions = best_model.transform(movie_ratings)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.6326275155741841


In [8]:
rank = best_model.bestModel.rank
rank

42

In [9]:
best_model.getEstimatorParamMaps()

[{Param(parent='ALS_4dce346a73fa', name='regParam', doc='regularization parameter (>= 0).'): 0.15,
  Param(parent='ALS_4dce346a73fa', name='rank', doc='rank of the factorization'): 42}]

In [10]:
best_model.avgMetrics

[0.8853054686041917]

# Run Model with full data using parameters from grid search

In [11]:
user_factors = best_model.bestModel.userFactors
recommendationsDF = (user_factors
  .select("id", explode("features")
  .alias("features"))
  .select('id',"features")
)

In [12]:
rec_pdf = recommendationsDF.toPandas()
rec_pdf.head()

Unnamed: 0,id,features
0,10,-0.450676
1,10,0.422844
2,10,0.093504
3,10,-0.346517
4,10,-0.268203


In [None]:
multiplier = rec_pdf['id'].nunique()
feature_array = np.array(list(range(1,rank+1))*multiplier)
rec_pdf['value'] = feature_array

In [None]:
rec_pdf_unstacked = rec_pdf.pivot(index='id', columns='value', values='features')

In [None]:
rec_pdf_unstacked.head()

# Hierarchical Agglomerative Cluster Analysis

In [None]:
scaler = StandardScaler()
rec_pdf_scaled = scaler.fit_transform(rec_pdf_unstacked)

In [None]:
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.cluster.hierarchy import cophenet
from scipy.spatial.distance import pdist
import matplotlib.pyplot as plt
%matplotlib inline

z = linkage(rec_pdf_scaled, 'ward', metric = 'euclidean') # Ward linkage used to produce tightly knit clusters
c, coph_dist = cophenet(z, pdist(rec_pdf_scaled))
dendrogram(z,
          leaf_rotation=90,
          leaf_font_size = 8.,
          truncate_mode='lastp',
          p=50)
plt.show()

In [None]:
# coph_dist.sort()
# set(coph_dist)

In [None]:
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import KernelDensity
rec_pdf_clustered = pd.DataFrame(rec_pdf_scaled.copy())
cluster = AgglomerativeClustering(n_clusters=9, 
                                 affinity='euclidean',
                                 linkage='ward')
rec_pdf_clustered['cluster'] = cluster.fit_predict(rec_pdf_clustered)

In [None]:
rec_pdf_clustered.groupby(['cluster']).agg(['mean', 'std', 'count'])

In [None]:
# from mpl_toolkits.mplot3d import Axes3D
# fig = plt.figure(figsize=(20,10))
# ax = fig.add_subplot(111, projection='3d')
# plt.scatter(rec_pdf_clustered[1], rec_pdf_clustered[5], rec_pdf_clustered[3], c=rec_pdf_clustered.cluster)
# plt.show();

# Cluster Centroids and Distances

In [None]:
cluster_centroids = rec_pdf_clustered.groupby(['cluster']).agg('mean')

In [None]:
cluster_centroids_array = cluster_centroids.to_numpy()

In [None]:
cluster_distance_matrix = distance_matrix(cluster_centroids_array, cluster_centroids_array, p=2)

In [None]:
cluster_distance_df = pd.DataFrame(cluster_distance_matrix)
cluster_distance_df

In [None]:
def nearest_clusters(cluster, num_nearest_clusters=2):
    sorted_distances = cluster_distance_df[cluster].sort_values(ascending=True)
    return sorted_distances[1:num_nearest_clusters+1].index.values.astype(int)

In [None]:
nearest_clusters(0, 2)

# Bootstrap Sample

In [None]:
bs_sample = rec_pdf_clustered.sample(n=5000, replace=True, axis=0, random_state=42)

# KNN Model (for assigning new user to a cluster)

In [None]:
bs_sample.head()

In [None]:
X = bs_sample.iloc[:,:-1]
y = bs_sample.iloc[:,-1]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)

In [None]:
from sklearn.metrics import confusion_matrix
knn = KNeighborsClassifier(n_neighbors=7)
knn.fit(X_train,y_train)
train_preds = knn.predict(X_train)
test_preds = knn.predict(X_test)
accuracy_score(y_test, test_preds), accuracy_score(y_train, train_preds)

In [None]:
print(confusion_matrix(y_test, test_preds))
print(confusion_matrix(y_train, train_preds))

# Nearest Centroid

In [None]:
from sklearn.neighbors import NearestCentroid
nc = NearestCentroid()
nc.fit(X_train, y_train)
nc_train_preds = nc.predict(X_train)
nc_test_preds = nc.predict(X_test)
accuracy_score(y_test, nc_test_preds), accuracy_score(y_train, nc_train_preds)

In [None]:
print(confusion_matrix(y_test, nc_test_preds))
print(confusion_matrix(y_train, nc_train_preds))

# Decision Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree
dc = DecisionTreeClassifier(max_depth=8, max_features=8, criterion='gini', min_samples_leaf=9)
dc.fit(X_train, y_train)
dc_train_preds = dc.predict(X_train)
dc_test_preds = dc.predict(X_test)
accuracy_score(y_test, dc_test_preds), accuracy_score(y_train, dc_train_preds)

In [None]:
print(confusion_matrix(y_test, dc_test_preds))
print(confusion_matrix(y_train, dc_train_preds))

# Random Forest

In [None]:
from sklearn.ensemble import BaggingClassifier, RandomForestClassifier
from sklearn.model_selection import GridSearchCV
rf = RandomForestClassifier()
param_grid = {'criterion':['gini', 'entropy'], 
             'max_depth': [15],
             'min_samples_split': [5],
             'n_estimators': [50],
             'max_features': [6]}
gs_forest = GridSearchCV(rf, param_grid, cv=5)
gs_forest.fit(X_train, y_train)
gs_forest.best_params_

In [None]:
gs_forest_train_preds = gs_forest.predict(X_train)
gs_forest_test_preds = gs_forest.predict(X_test)
accuracy_score(y_test, gs_forest_test_preds), accuracy_score(y_train, gs_forest_train_preds)

In [None]:
print(confusion_matrix(y_test, gs_forest_test_preds))
print(confusion_matrix(y_train, gs_forest_train_preds))

# AdaBoost

In [None]:
from sklearn.ensemble import AdaBoostClassifier
ab = AdaBoostClassifier(learning_rate=.4)
ab.fit(X_train, y_train)
ab_train_preds = ab.predict(X_train)
ab_test_preds = ab.predict(X_test)
accuracy_score(y_test, ab_test_preds), accuracy_score(y_train, ab_train_preds)

In [None]:
print(confusion_matrix(y_test, ab_test_preds))
print(confusion_matrix(y_train, ab_train_preds))

# Gradient Boosting Classifier

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
gbc = GradientBoostingClassifier()
param_grid = {'learning_rate':[.15], 
             'max_depth': [5],
             'min_samples_split': [25],
             'n_estimators': [41],
             'max_features': [7]}
gs_gbc = GridSearchCV(gbc, param_grid, cv=5)
gs_gbc.fit(X_train, y_train)
gbc_train_preds = gs_gbc.predict(X_train)
gbc_test_preds = gs_gbc.predict(X_test)
accuracy_score(y_test, gbc_test_preds), accuracy_score(y_train, gbc_train_preds)

In [None]:
gs_gbc.best_params_

In [None]:
print(confusion_matrix(y_test, gbc_test_preds))
print(confusion_matrix(y_train, gbc_train_preds))

# Item Feature Extraction

In [None]:
item_factors = best_model.bestModel.itemFactors

In [None]:
item_factors_df = (item_factors
  .select("id", explode("features")
  .alias("features"))
  .select('id',"features")
)

In [None]:
item_factors_pdf = item_factors_df.toPandas()
item_factors_pdf.head()

In [None]:
multiplier_factors = item_factors_pdf['id'].nunique()
factors_feature_array = np.array(list(range(1,rank+1))*multiplier_factors)
item_factors_pdf['value'] = factors_feature_array
item_factors_unstacked = item_factors_pdf.pivot(index='id', columns='value', values='features')
item_factors_unstacked.head()

# Cluster Centroids

In [None]:
cluster_centroids_df = pd.DataFrame(cluster_centroids_array)
cluster_centroids_df.head()

In [None]:
cluster_centroids.shape

In [None]:
item_factors_unstacked_transposed = item_factors_unstacked.T

In [None]:
centroid_ratings = np.dot(cluster_centroids, item_factors_unstacked_transposed)

In [None]:
centroid_ratings_df = pd.DataFrame(centroid_ratings)
centroid_ratings_df.head(n=10)

In [None]:
centroid_ratings_df.columns = item_factors_unstacked.index
centroid_ratings_df.head()

In [None]:
centroid_ratings_T_df = centroid_ratings_df.transpose()
centroid_ratings_T_df.head()

# New User Data

In [None]:
movies_df = pd.read_csv('../data/test_data/movies.csv')
md = movie_ratings.toPandas()
R = md.pivot(index='userId', columns='movieId', values='rating')
mean_ratings = pd.DataFrame(np.mean(R))
mean_ratings = mean_ratings.reset_index()
total_recs = (md['movieId'].value_counts()).reset_index()
total_recs.columns = ['movieId', 'total_recs']
total_recs = total_recs[total_recs['total_recs']>50]
most_rated = pd.merge(total_recs, mean_ratings, how='left', on='movieId')
most_rated_sorted = most_rated.sort_values(by=0, ascending=False)
top_100 = most_rated_sorted[:100]
top_100 = pd.merge(top_100, movies_df, how='left', on='movieId')

In [None]:
#n_movies_to_rate = input('How many movies would you like to rate?')
ranking_list = top_100.sample(n=10, axis=0)
ranking_list
user_ratings = []
for i in list(range(10)):
    title = ranking_list['title']
    movieId = ranking_list['movieId']
    #print(title.iloc[i])
    user_rating = input('Enter a ranking for {} from 1 (lowest) to 5 (highest). If you have not seen the movie, press enter.'.format(title.iloc[i]))
    if user_rating=='':
        pass
    else:
        user_ratings.append((movieId.iloc[i], user_rating))
    

In [None]:
user_ratings 

## ALS recommendations

In [None]:
rated_movies = [float(x[0]) for x in user_ratings]
rated_item_factor = item_factors_pdf.loc[item_factors_pdf['id'].isin(rated_movies)].pivot(index='id', columns='value', values='features')
M = rated_item_factor.as_matrix()
E = np.identity(rank)
nui = len(rated_movies)
regParam = 0.15
R = np.array([float(x[1]) for x in user_ratings])

In [None]:
A = M.T.dot(M)+regParam*nui*E

In [None]:
V = M.T.dot(R.T)

In [None]:
user_fac = np.linalg.inv(A).dot(V)

In [None]:
user_fac

In [None]:
user_movie_ratings = user_fac.dot(item_factors_unstacked.T)

In [None]:
user_movie_ratings_df = pd.DataFrame(user_movie_ratings)

In [None]:
user_movie_ratings_df['movieId'] = item_factors_unstacked.T.columns

In [None]:
user_movie_ratings_df.sort_values(0,ascending=False).head(30)

In [None]:
user_top_10 = user_movie_ratings_df.sort_values(0,ascending=False).head(10)

In [None]:
user_top_10 = user_top_10.merge(movies_df, how='left', on='movieId')

In [None]:
user_top_10.drop([0,'movieId', 'genres'], axis=1, inplace=True)

In [None]:
user_top_10

# Predict User's Cluster

In [None]:
user_cluster = gs_gbc.predict(user_fac.reshape(1,-1))[0]

# Augmented Recommendations

In [None]:
def top_rated_movies(cluster):
    sorted_ratings = centroid_ratings_T_df[cluster].sort_values(ascending=False)
    sorted_ratings_df = sorted_ratings.reset_index()
    most_rated_df = pd.merge(most_rated, movies_df, how='left', on='movieId')
    most_rated_df.drop(['total_recs', 0], axis=1, inplace=True)
    top_movies = pd.merge(sorted_ratings_df, most_rated_df, how='inner', left_on='id', right_on='movieId')
    top_movies.columns = ['id', 'rating', 'movieId', 'title', 'genres']
    top_10_movies = top_movies.sort_values(by='rating', ascending=False )[:10]
    return top_10_movies.title

In [None]:
def get_recommendations(user_cluster):
    near_clusters = nearest_clusters(user_cluster)
    recommendation_set = set()
    for index, cluster in enumerate(near_clusters):
        if index==0:
            recs = np.random.choice(top_rated_movies(cluster), size=6, replace=False)
            recommendation_set.update(set(recs))
        if index==1:
            cluster_unique_top_movies = set(top_rated_movies(cluster)).difference(recommendation_set)
            recs = np.random.choice(list(cluster_unique_top_movies), size=4, replace=False)
            recommendation_set.update(recs)
    print(list(recommendation_set))

In [None]:
get_recommendations(user_cluster)