# HW 8-1 - Tim Demetriades
## Music Recommendation 2 - PySpark & ALS
4/17/2021

### Part 1 - Create output predictions file using PySpark and ALS

Here, we create `als_predictions.csv`, which will hold the userID, itemID, and predicted item rating. It should be 120,000 entries long, and there should be 6 entries for each user.

The input files will be:
- `trainItem.data` - the training file in the format `userID,itemID,rating`. It 12,403,570 entries long.
- `testItem.data` - the testing file in the format `userID,itemID,rating` where rating is 0. It is 120,000 entries long.

First we import the need modules.

In [1]:
from pyspark.ml.evaluation import RegressionEvaluator    # for regression evaluation on the output
from pyspark.ml.recommendation import ALS    # for alternating least squares algorithm for matrix factorization
from pyspark.sql.types import IntegerType    # for changing df column to int data type
from pyspark.sql import SparkSession    # main entry point for DataFrame and SQL functionality
from os import path   # for deleting predictions folder
import pandas as pd   # for data frames
import numpy          # for arrays
import time           # for timing cells          
import shutil         # for deleting predictions folder

We initialize SparkSession, the entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet.

SparkSession contains the functionality of SparkContext, SQLContext, and more (starting with Spark 2.0).

In [2]:
spark = SparkSession.builder.appName('Music Recommender').getOrCreate()

We can use `spark` to get some data on it and get a link to the UI.

In [3]:
spark

Now we load the training data file into a PySpark data frame.

In [4]:
training_df = spark.read.csv("trainItem.data", header = False)
training_df.show(5)

+------+------+---+
|   _c0|   _c1|_c2|
+------+------+---+
|199808|248969| 90|
|199808|  2663| 90|
|199808| 28341| 90|
|199808| 42563| 90|
|199808| 59092| 90|
+------+------+---+
only showing top 5 rows



In [5]:
training_df.count()

12403575

We can rename the column names using the `.withColumnRenamed()` method.

In [6]:
training_df = training_df.withColumnRenamed('_c0', 'userID').withColumnRenamed('_c1', 'itemID').withColumnRenamed('_c2', 'rating')

We can use the `.dtypes` method to see the data type of each column.

Then, we can uset the `.withColumn` method to change the data type of a column.

In [7]:
training_df.dtypes    # returns all columns names and their datatypes

[('userID', 'string'), ('itemID', 'string'), ('rating', 'string')]

In [8]:
training_df = training_df.withColumn('userID', training_df['userID'].cast(IntegerType()))
training_df = training_df.withColumn('itemID', training_df['itemID'].cast(IntegerType()))
training_df = training_df.withColumn('rating', training_df['rating'].cast('float'))

In [9]:
training_df.dtypes    # returns all columns names and their datatypes

[('userID', 'int'), ('itemID', 'int'), ('rating', 'float')]

Now we will configure the Alternating Least Squares (ALS) model. ALS is an algorithm for recommenders that uses matrix factorization.

In [10]:
als = ALS(
    maxIter=20,              # max number of iterations of the model
    rank=15,                 # number of latent features
    regParam=1.0,            # regularization parameter
    userCol="userID",        # user column (ids must be ints)
    itemCol="itemID",        # item column (ids must be ints)
    ratingCol="rating",      # ratings column
    nonnegative = True,      # whether to use nonnegative constraint for least squares
    implicitPrefs = False    # whether to use implicit preference
    #coldStartStrategy="drop" # drop any rows in the df of predictions that contain NaN values
)

We then fit the ALS model using the training df.

In [11]:
start_time = time.time()

model = als.fit(training_df)

end_time = time.time()
elapsed_time = end_time - start_time
print(f'Done! Time elapsed - {elapsed_time:.2f} seconds.')

Done! Time elapsed - 103.01 seconds.


We can use the test data to evaluate the model.

In [12]:
testing_df = spark.read.csv('testItem.data', header=False)
testing_df.show(5)

+------+------+---+
|   _c0|   _c1|_c2|
+------+------+---+
|199810|208019|  0|
|199810| 74139|  0|
|199810|  9903|  0|
|199810|242681|  0|
|199810| 18515|  0|
+------+------+---+
only showing top 5 rows



In [13]:
testing_df.count()

120000

Like for the training_df we will rename each column and cast them to the desired data types.

In [14]:
testing_df = testing_df.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "itemID").withColumnRenamed("_c2", "rating")

In [15]:
testing_df.dtypes

[('userID', 'string'), ('itemID', 'string'), ('rating', 'string')]

In [16]:
testing_df = testing_df.withColumn("userID", testing_df["userID"].cast(IntegerType()))
testing_df = testing_df.withColumn("itemID", testing_df["itemID"].cast(IntegerType()))
testing_df = testing_df.withColumn("rating", testing_df["rating"].cast('float'))

In [17]:
testing_df.dtypes

[('userID', 'int'), ('itemID', 'int'), ('rating', 'float')]

Now we can make predictions using the testing data frame.

In [18]:
predictions = model.transform(testing_df)    # add 'predictions' column to df

In [19]:
predictions = predictions.drop('rating')    # remove ratings column (since it's all just 0s)

In [20]:
predictions.show(5)

+------+------+----------+
|userID|itemID|prediction|
+------+------+----------+
|230073|   463|  74.94046|
|230962|   471|  55.97319|
|218845|  1088|  32.30332|
|209697|  1088|  57.35444|
|224445|  2142| 18.731167|
+------+------+----------+
only showing top 5 rows



In [21]:
predictions.count()

120000

Now that we have the predictions we can save the data frame to a csv file.

In [22]:
# Check if this folder exists already and if so remove it
if path.exists('predictions'):
    shutil.rmtree('predictions') 

In [23]:
# Save data frame into a folder called 'predictions' 
# with a single file (coalesce(1)) (you cannot assign the filename)
predictions.coalesce(numPartitions=1).write.csv("predictions")

Finally, we save the predictions to a single file with the desired name.

In [24]:
# Save data frame to a single csv file 
predictions.toPandas().to_csv('als_predictions.csv')

### Part 2 - Use ratings predictions file to generate predictions
Here, we use the output file from Part 1 `als_predictions.csv` to create a predictions file that will hold the predictions for each user in the format `userID_trackID,prediction`. Prediction will either be 1 or 0, with 1 meaning that we predict the user will like the song and 0 meaning that we predict the user will not like the song. For each user there are 6 predictions (6 songs) and there will be 3 1-predictions and 3 0-predictions.

The way the predictions work is for the 6 songs for each user, we take the ratings predictions from the previous file and sort them from highest to lowest. Then, we assign the top 3 items a 1 and the bottom 3 items a 0. 

In [25]:
# Open files
ratings_file = 'als_predictions.csv'
predictions_file = 'predictions.csv'
final_predictions_file = 'final_predictions.csv'

f_ratings = open(ratings_file)
f_predictions = open(predictions_file, 'w')
f_final_predictions = open(final_predictions_file, 'w')

In [26]:
# Write header
f_final_predictions.write('TrackID,Predictor\n')

18

In [27]:
# Read csv into df
column_list = ('userID', 'itemID', 'prediction')
ratings_df = pd.read_csv(ratings_file, usecols=column_list)

In [28]:
# If any value in prediction column is NaN then replcae it with 50.0
ratings_df['prediction'] = ratings_df['prediction'].fillna(50.0)

In [29]:
ratings_df

Unnamed: 0,userID,itemID,prediction
0,230073,463,74.940460
1,230962,471,55.973190
2,218845,1088,32.303320
3,209697,1088,57.354440
4,224445,2142,18.731167
...,...,...,...
119995,230215,294724,59.698616
119996,224603,294724,20.960081
119997,218132,294724,59.090970
119998,235806,295019,87.108110


In [30]:
# Sort by userID (asc) and then by prediction (desc)
ratings_df.sort_values(["userID", "prediction"], ascending = (True, False), inplace=True)

In [31]:
# Reset index to remove original index
ratings_df.reset_index(drop=True, inplace=True)

In [32]:
ratings_df.head(12)

Unnamed: 0,userID,itemID,prediction
0,199810,105760,66.2449
1,199810,208019,59.15773
2,199810,18515,59.09715
3,199810,242681,47.325626
4,199810,74139,45.075775
5,199810,9903,39.30143
6,199812,223706,96.74225
7,199812,211361,89.956055
8,199812,142408,86.1118
9,199812,130023,85.505325


In [33]:
# Save to csv without index column and header row
ratings_df.to_csv('predictions.csv', index=False, header=False)

In [34]:
f_predictions.close()    # close the file

In [35]:
f_predictions = open(predictions_file)    # now open it for reading

In [36]:
# Initialize some values
ratings_array = numpy.zeros(shape=(6))
last_user_id = -1
track_id_out_vec = [0] * 6

In [37]:
start_time = time.time()

# Go through each line of the predictions file
for line in f_predictions:
    arr_out = line.strip().split(',')    # remove any spaces/new lines and create list 
    user_id_out = arr_out[0]             # set user
    track_id_out = arr_out[1]            # set track
    rating = float(arr_out[2])           # set rating
    
    if user_id_out != last_user_id:             # if new user reached
        i = 0                                   # reset i
        ratings_array = numpy.zeros(shape=(6))  # reset this array
        
    ratings_array[i] = rating                   # add rating to ratings array
    track_id_out_vec[i] = track_id_out          # add trackID to trackID array
        
    i = i + 1                    # increment i
    last_user_id = user_id_out   # set last_user_id as current userID
    
    if i == 6:                               # if last entry for current user reached
        # Here we set the predictions 
        predictions = numpy.zeros(shape=(6)) # initialize numpy array for predictions
        for index in range(0, 3):            
            predictions[index] = 1           # set first 3 values in array to 1 (other 3 are 0)
        
        # Here we write to the final predictions file for the 6 track predictions for the current user
        for ii in range(0, 6):         
            out_str = str(user_id_out) + '_' + str(track_id_out_vec[ii]) + ',' + str(int(predictions[ii]))
            f_final_predictions.write(out_str + '\n')

        
end_time = time.time()
elapsed_time = end_time - start_time
print(f'Done! Time elapsed - {elapsed_time:.2f} seconds.')

Done! Time elapsed - 0.39 seconds.


In [38]:
f_ratings.close()
f_predictions.close()
f_final_predictions.close()