In [None]:
#source shell code (for peel) for greene? 
export HADOOP_EXE='/usr/bin/hadoop'

module load python/gcc/3.7.9

alias hfs="$HADOOP_EXE fs"
alias spark-submit='PYSPARK_PYTHON=$(which python) spark-submit'

For loop version hyperparameter tuning:

In [None]:
#Your recommendation model should use Spark's alternating least squares (ALS) method to learn latent factor representations for users and items.

#user_id  | count | track_id   

#Run final code in peel cluster
#Run numpy code in greene cluster
#**Set up source shell_setup.sh in repo**
#hdfs:/user/bm106/pub/MSD
#Go under correct directory
#spark-submit rec_sys.py --conf "spark.blacklist.enabled=false"
#spark-submit --driver-memory=4g --executor-memory=4g --executor-cores=50 'pyfile'
#conf.set("spark.blacklist.enabled","false") #if in code itself
#import umap #seems to be local 
import sys
import getpass 
import random
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import RankingMetrics #May want to use MAP
from pyspark.ml.evaluation import RegressionEvaluator #RMSE
from pyspark.ml.recommendation import ALS #Alternating least squares
from pyspark.sql import Row, Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import explode, col, expr
import pyspark.sql.functions as pyF

def main (spark, netID):
    '''
    Parameters
    ----------
    spark : SparkSession object
    netID : string, netID of student to find files in HDFS
    '''
    training=spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_train_new.parquet')
    (train,_) = training.randomSplit([0.01,0.99])
    validation = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_validation.parquet')
    test = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_test.parquet')

    regParam_list = [0.01, 0.1, 1, 10.0, 100.0] #L2 regularization parameter
    rank_list = [50, 100, 200, 300] #number of latent factors in model
    #numBlocks_list=[10]    #adjust this to optimize parallel computation
    alpha_list = [1.0] #alpha parameter
    
    error_list = []
    model_list=[]

    print(f'Pre string-indexing.')

    #training data string indexer coversion on user_num column
    user_indexer = StringIndexer(inputCol = "user_id", outputCol = "user_num", handleInvalid = "skip").fit(train)
    track_indexer = StringIndexer(inputCol = "track_id", outputCol = "track_num", handleInvalid = "skip").fit(train)

    train_idx = user_indexer.transform(train)
    train_idx = track_indexer.transform(train_idx)
    
    val_idx = user_indexer.transform(validation)
    val_idx = track_indexer.transform(val_idx)

    test_idx = user_indexer.transform(test)
    test_idx = track_indexer.transform(test_idx)

    print("Setup complete, fitting models.")

    #different combinations of hyperparameters
    grid = list(itertools.product(rank_list, regParam_list, alpha_list))

    #filter users from training set who are also in val and test set 
    unique_users=val_idx.select('user_num').distinct()
    
    for rank, reg, alph in grid:
        #train model using current hyperparameters
        als_model = ALS(regParam=reg, rank = rank, userCol="user_num", itemCol="track_num", ratingCol="count", coldStartStrategy="drop", alpha=alph)
        model = als_model.fit(train_idx)
        model_list.append(model)

        #run model on validation set
        val_pred = model.transform(val_idx)
        evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction")
        
        #rmse = root mean squared error, determine it on the res of val w/ this model
        rmse = evaluator.evaluate(val_pred)
        error_list.append(rmse)
        
        print ("---------Hyperparameter Tuning---------")
        print ("regParam = ", reg, "rank = ", rank)
        print("Root-mean-square error = " + str(rmse))
    
    #Best hyperparameter?
    min_error = min(error_list)
    print("Minimum error:", min_error)
    print("Hyperparameters:", grid[error_list.index(min_error)])
    best_model=model_list[error_list.index(min_error)]
    
    #get the top 500 song recommendations for each user? 
    userRecs=best_model.recommendForUserSubset(unique_users, 500)
    userRecs.limit(10).show()

    #interpretable format of recommendations (shows user_id, track_id, rating)
    userRecs= userRecs.withColumn("rec_exp", explode("recommendations"))\
    .select('user_id', col("rec_exp.track_id"), col("rec_exp.rating"))
    #incorporate MAP estimator here? 

    #top 500 items for each user
    #parquet file into a dataframe?
    #val_with_pred=pd.concat([validation, val_pred]) 
    #top500=val_with_pred.groupby(['user_id', 'track_id'])['count'].sum()
    #top500=top500.to_frame().groupby(['track_id]).head(500)


if __name__ == "__main__":

    # Create the spark session object
    spark = SparkSession.builder.appName('part1').getOrCreate()

    #If you wish to command line arguments, look into the sys library(primarily sys.argv)
    #Details are here: https://docs.python.org/3/library/sys.html
    #If using command line arguments, be sure to add them to main function
    netID = getpass.getuser()

    # Call our main routine
    main(spark, netID)

Extension 1 Code

In [None]:
#Run final code in peel cluster
#Run numpy code in greene cluster
#**Set up source shell_setup.sh in repo**
#hdfs:/user/bm106/pub/MSD
#Go under correct directory
#spark-submit rec_sys.py --conf "spark.blacklist.enabled=false"
#spark-submit --driver-memory=4g --executor-memory=4g --executor-cores=50 'pyfile'
#conf.set("spark.blacklist.enabled","false") #if in code itself
import sys
import getpass 
import random
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import RankingMetrics #May want to use MAP
from pyspark.ml.evaluation import RegressionEvaluator #RMSE
from pyspark.ml.recommendation import ALS #Alternating least squares
from pyspark.sql import Row, Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import explode, col, expr
import pyspark.sql.functions as pyF

def main (spark, netID):
    training=spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_train_new.parquet')
    (train,_) = training.randomSplit([0.01,0.99])
    validation = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_validation.parquet')
    test = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_test.parquet')

    #user_id, count, track_id
    #group on track_id add the counts
    #sort by count descending

    prediction = train.groupBy('track_id').agg({'count':'mean'})
    
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="avg(count)")
    #take track id from validation and take track id from prediciton and  use  mean from prediction as result
    #want user_id, rating, track_id, mean(counts)

    val_pred = validation.join(prediction, on='track_id')
    rmse_val = evaluator.evaluate(val_pred)
    print('RMSE on the validation set ' + str(rmse_val))

    test_pred = test.join(prediction, on='track_id')
    rmse_test = evaluator.evaluate(test_pred)
    print('RMSE on the test set ' + str(rmse_test))

if __name__ == "__main__":

    # Create the spark session object
    spark = SparkSession.builder.appName('part1').getOrCreate()

    #If you wish to command line arguments, look into the sys library(primarily sys.argv)
    #Details are here: https://docs.python.org/3/library/sys.html
    #If using command line arguments, be sure to add them to main function
    netID = getpass.getuser()

    # Call our main routine
    main(spark, netID)

ParamGridBuilder() hyperparameter tuning in parallel:

In [None]:
#Your recommendation model should use Spark's alternating least squares (ALS) method to learn latent factor representations for users and items.

#user_id  | count | track_id   

#Run final code in peel cluster
#Run numpy code in greene cluster
#**Set up source shell_setup.sh in repo**
#hdfs:/user/bm106/pub/MSD
#Go under correct directory
#spark-submit rec_sys.py --conf "spark.blacklist.enabled=false"
#spark-submit --driver-memory=4g --executor-memory=4g --executor-cores=50 'pyfile'
#conf.set("spark.blacklist.enabled","false") #if in code itself
import sys
import getpass 
import random
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import RankingMetrics #May want to use MAP
from pyspark.ml.evaluation import RegressionEvaluator #RMSE
from pyspark.ml.recommendation import ALS #Alternating least squares
from pyspark.sql import Row, Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import explode, col, expr
import pyspark.sql.functions as pyF

def main (spark, netID):
    '''
    Parameters
    ----------
    spark : SparkSession object
    netID : string, netID of student to find files in HDFS
    '''
    training=spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_train_new.parquet')
    (train,_) = training.randomSplit([0.25,0.75])
    validation = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_validation.parquet')
    test = spark.read.parquet('hdfs:/user/bm106/pub/MSD/cf_test.parquet')

    print('Pre string-indexing.')

    #training data string indexer coversion on user_num column
    user_indexer = StringIndexer(inputCol = "user_id", outputCol = "user_num", handleInvalid = "skip").fit(train)
    track_indexer = StringIndexer(inputCol = "track_id", outputCol = "track_num", handleInvalid = "skip").fit(train)

    train_idx = user_indexer.transform(train)
    train_idx = track_indexer.transform(train_idx)
    
    val_idx = user_indexer.transform(validation)
    val_idx = track_indexer.transform(val_idx)

    test_idx = user_indexer.transform(test)
    test_idx = track_indexer.transform(test_idx)

    print("Setup complete, fitting models.")
    
    #Hyperparameter tuning in parallel
    als_model = ALS(userCol="user_num", itemCol="track_num", ratingCol="count", coldStartStrategy="drop")

    ranks = [200, 225, 250, 275, 300]
    regParams = [10, 1.0, 0.5, 0.1, 0.01]

    paramGrid = ParamGridBuilder() \
        .addGrid(als_model.rank, ranks) \
        .addGrid(als_model.regParam, regParams) \
        .build()

    print ("Num models tested: ", len(paramGrid))

    evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction")

    nF = 5 #NumFolds
    parallelism = 6 #number of cores to run on... seemed like we get 6

    cv = CrossValidator(estimator = als_model, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = nF, parallelism = parallelism)

    model = cv.fit(train_idx)
    best_model = model.bestModel

    print("Best RegParam:", best_model._java_obj.parent().getRegParam())
    print("Best Rank:", best_model._java_obj.parent().getRank())

    model = ALS(userCol="user_num", itemCol="track_num", ratingCol="count", coldStartStrategy="drop", rank=300, regParam=1.0)
    best_model = model.fit(train_idx)
    #Fit on test and validation set
    val_pred = best_model.transform(val_idx)
    rmse_val = evaluator.evaluate(val_pred)
    print('RMSE on the validation set ' + str(rmse_val))

    test_pred = best_model.transform(test_idx)
    rmse_test = evaluator.evaluate(test_pred)
    print('RMSE on the test set ' + str(rmse_test))
    
    #filter users from training set who are also in val and test set 
    unique_users_val = val_idx.select('user_num').distinct()
    unique_users_test = test_idx.select('user_num').distinct()
    unique_users_train =  train_idx.select('user_num').distinct()

    #unique_users=[users if users is in unique_users_val and unique_users_test for users in unique_users_train]
        
    #get the top 500 song recommendations for each user (prediction)
    userRecs=best_model.recommendForUserSubset(unique_users_test, 500)
    track_df = Window.partitionBy('user_num').orderBy(col('count').desc())
    RealTop500=test_idx.withColumn('rank', pyF.rank().over(track_df))\
    .where('rank <= {0}'.format(500)).groupBy('user_num').agg(expr('collect_list(track_num) as actual_track_lists'))
    
    RealAndPredict = RealTop500.join(userRecs, 'user_num').\
    rdd.map(lambda x: ([rec.track_num for rec in x['recommendations']], x['actual_track_lists']))
    
    RealAndPredict.limit(10).show()

    #incorporate MAP estimator here? 

    #convert the df into csv file
    RealAndPredict.write.csv('RealAndPredict_outputs.csv')
    #top 500 items for each user
    #parquet file into a dataframe?
    #val_with_pred=pd.concat([validation, val_pred]) 
    #top500=val_with_pred.groupby(['user_id', 'track_id'])['count'].sum()
    #top500=top500.to_frame().groupby(['track_id]).head(500)


if __name__ == "__main__":

    # Create the spark session object
    spark = SparkSession.builder.appName('part1').getOrCreate()

    #If you wish to command line arguments, look into the sys library(primarily sys.argv)
    #Details are here: https://docs.python.org/3/library/sys.html
    #If using command line arguments, be sure to add them to main function
    netID = getpass.getuser()

    # Call our main routine
    main(spark, netID)

Extension 2, UMAP

In [None]:
# Extension - UMAP Visualization

import umap
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

data = pd.read_csv('merged.csv', header = None)

data.columns = ['users', 'tracks', 'pred']

data = data.pivot_table(index=['users'], 
            columns=['tracks'], values='pred').fillna(0)

data = data.to_numpy()
data1 = data[:,1:]
users = np.unique(data[:,0])
fit = umap.UMAP()
u = fit.fit_transform(data1)
plt.scatter(
    u[:, 0],
    u[:, 1],
    c='black')

#using first 10 columns
data1 = data[:,1:10]
users = np.unique(data[:,0])
fit = umap.UMAP()
u = fit.fit_transform(data1)
plt.scatter(
    u[:, 0],
    u[:, 1],
    c='black')

ModuleNotFoundError: No module named 'umap'