# KK-Box's Music Recommendation System

In [2]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import Row 
from pyspark.sql import functions as fn
from pyspark.sql.types import StringType, DateType
from pyspark.sql.dataframe import DataFrame
from pyspark.ml import feature, Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, GBTClassifier, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
#Setting up spark session and spark context
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
# read only cell

import os

# get the databricks runtime version
db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if the databricks env var exists
    if db_env != None:
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

#Function defining the shape of spark dataframe
def spark_df_shape(self):
    return (self.count(), len(self.columns))
  
#Plug the function into pyspark
DataFrame.shape = spark_df_shape

## Importing Data

#### Dataset Review
* The dataset has four different datadrames:
  1. **members**  contains information about the users of the platform
      * ***msno*** - User id 
      * ***City*** - city of the user
      * ***bd*** - Age of the user
      * ***gender*** - Gender
      * ***registered_via*** - Registration Channel
      * ***registration_init_time*** - Registration Date of membership
      * ***expiration_date*** - Expiration of membership
  2. **songs**  contains the details of the songs available in the platform.
      * ***song_id*** - Unique id of the song
      * ***song_length*** - Length of song
      * ***genre_id*** - genre of the song
      * ***artist_name*** - singer 
      * ***composer*** - Composer of the song
      * ***lyricist*** - Lyricist
      * ***Language*** - Language
  3. **songs_extra_info**  contains extra information about the songs
      * ***song_id*** - Unique id of the song
      * ***name*** - Title
      * ***isrc*** - International standard recording code
  4. **train(Renamed to logs)**  contains the logs of the listening history of the users.
      * ***msno*** - User id
      * ***song_id*** - Song id
      * ***source_system_tab*** - the name of the tab where the event was triggered. 
      * ***source_screen_name*** -  name of the layout a user sees.
      * ***source_type*** - an entry point a user first plays music on mobile apps.
      * ***target*** - a binary column specifying recursive listening event

In [6]:
#Importing members data frame which contains information about the users of the platform.
members = spark.read.csv(get_training_filename('members.csv'),header='true',inferSchema='true').na.drop('any')

#Shape and head of the dataframe
print("Shape of members:" , members.shape())
members.show(5)

In [7]:
#Importing the songs dataframe which contains the details of the songs avialable in the platform.
songs = spark.read.csv(get_training_filename('songs.csv'),header='true',inferSchema='true').na.drop('any')

#Head and Shape of dataframe
print("Shape of songs:" , songs.shape())
songs.show(5)

In [8]:
#Importing songs_extra_info dataframe that contains extra information about the songs
songs_extra_info = spark.read.csv(get_training_filename('song_extra_info.csv'),header='true',inferSchema='true').na.drop('any')

#Head and shape of dataFrame
print("Shape of songs_extra_info:" , songs_extra_info.shape())
songs_extra_info.show(5)

In [9]:
#Importing train data frame that contains the logs of the listening history of the users
logs = spark.read.csv(get_training_filename('train.csv'),header='true',inferSchema='true').na.drop('any')

#Head and shape of the dataframe.
print("Shape of logs:" , logs.shape())
logs.show(5)

## Data Pre-processing

In [11]:
#Merfing songs and song_extra_info using inner join technique.
songs = songs\
  .join(songs_extra_info, songs.song_id == songs_extra_info.song_id,'left_outer')\
  .drop(songs_extra_info.song_id)\
  .withColumn('language',songs.language.cast(StringType()))\
  .withColumnRenamed('song_id','songId')\
  .withColumn('year', songs_extra_info.isrc.substr(5,7))\
  .withColumn('country', songs_extra_info.isrc.substr(0,2))

#Adjusting data types of members dataframes.  Renaming few columns to more meaningful names
members = members.withColumn('registrationDate', fn.unix_timestamp(members.registration_init_time.cast(StringType()), 'yyyyMMdd').cast('timestamp'))\
                               .withColumn('expirationDate', fn.unix_timestamp(members.expiration_date.cast(StringType()),'yyyyMMdd').cast('timestamp'))\
                               .withColumnRenamed('bd','age')\
                               .withColumnRenamed('msno','userId')\
                               .withColumn('city',members.city.cast(StringType()))\
                               .withColumn('registered_via',members.registered_via.cast(StringType()))\
                               .drop('registration_init_time','expiration_date')

#Creating a new column 'daysLeft' calculated as the number of days between expiration and registration date
members = members.withColumn('daysLeft', fn.datediff(members.expirationDate,members.registrationDate))

In [12]:
#Summarizing Invalid entries for members DataFrame
print("1. Invalid registration and expiration dates: " , members.filter(fn.col('registrationDate') > fn.col('expirationDate')).count())
print("2. Age = 0 : " , members.filter(members.age<=0).count())
print("3.Invalid Age:" , members.filter(members.age>%0).count()+members.filter(members.age<0).count())
print("4. Invalid UserId(msno):" , members.filter(members.userId.isNull() | fn.isnan(members.userId)).count())

#### Removing dirty data from members dataframe
* After the inspection, we found some inconsistent entries in members data frame like Age being zero or negative or sometimes a very high value. Here we considered 50 as the threshold for Age.
* No invalid entries in User ID column 
* Registration data must always be greater than expiration date. This constraint is sometimes violated if data is properly validated before making an entry into the database. There are no records violating this constraint
* Since we have huge number of records, lets remove all invalid entries instead handling using appropriate interpolation techniques.
* Even after filtering we would be left huge number of records, which makes it difficult build the models. 
* Therefore, only the users registered in 2016 and 2017 are considered.

In [14]:
#Filtering out Invalid entries and outliers for members DataFrame
members = members.filter((members.age>0) &(members.age<50) & ((fn.year(members.registrationDate) == 2016) | (fn.year(members.registrationDate) == 2017)) & (fn.length(members.city)==1))
print("Shape of members dataframe after Cleaning:" , members.shape())

In [15]:
#Inspecting invalid entries in songs
print("Invalid song_id: ", songs.filter(songs.songId.isNull() | fn.isnan(songs.songId)).count())
print("Invalid song_lenght:", songs.filter(songs.song_length<=0).count())

* No Invalid entries in songs dataframes
* Possible invalid entries in logs dataframes would be null values which is already filtered out while importing the data
* Let's combine the logs dataframe with member and songs datframe to induce user and song specific attributes to listening history

In [17]:
#combining data using inner join
logs = logs.join(members,members.userId == logs.msno, 'inner').drop(logs.msno)
logs = logs.join(songs, logs.song_id == songs.songId, 'inner').drop(logs.song_id)

#Drop null values if any
logs = logs.na.drop()
print(logs.shape())

## Data Visualization

* To get insights about the data, a few visualizations are performed using matplotlib and seaborn libraries.
* The following few blocks of code convert the spark dataFrames to pandas dataFrames for visualization purpose.
* After that, we go ahead and generate some plots to get insights about data

In [20]:
#Converting to Pandas DF
members_pd = members.toPandas()
songs_pd = songs.toPandas() 
logs_pd = logs.toPandas()

In [21]:
%matplotlib inline

#Plots explaing the distribution of different attributes in members dataframe
fig , ax =plt.subplots(2,2,figsize = (15,6))
ax[0,0].hist(members_pd.age)
ax[0,0].set(xlabel = 'Age', ylabel = 'frequency' , title = 'Histogram of Age')
ax[0,1].bar(members_pd.registered_via.unique(), members_pd.registered_via.value_counts())
ax[0,1].set(xlabel = 'Registration Channel', ylabel = 'Count' , title = 'Bar Graph of Resgistration Channel')
ax[1,0].bar(members_pd.city.unique(), members_pd.city.value_counts())
ax[1,0].set(xlabel = 'City', ylabel = 'Count' , title = 'Bar Graph of city')
ax[1,1].bar(members_pd.gender.unique(), members_pd.gender.value_counts())
ax[1,1].set(xlabel = 'Gender', ylabel = 'Count' , title = 'Bar Graph of Gender')
plt.subplots_adjust(top =1.5)
plt.show()

These graphs are just a general distribution of users irrespective of the target values according to their age, registration channel, city and gender. 
* From the histogram of Age, it is clear that more users are in the age of 18-30 
* Most of the users are from the cities 4,5, and 8
* Data is balanced for female and male users
* Most of the registrations are happening from channels 3 and 4

In [23]:
#Plots for distribution of various attributes in logs dataframe
fig , axes = plt.subplots(2,2)
fig.set_figheight(13)
fig.set_figwidth(22)
sns.countplot(logs_pd['target'], ax = axes[0][0]).set_title('Distribution of target')
sns.countplot(logs_pd['source_system_tab'],hue=logs_pd['target'], ax = axes[0][1]).set_title('Bar Graph of Source System tab')
sns.countplot(logs_pd['language'],hue=logs_pd['target'], ax = axes[1][0]).set_title('Bar Graph of Language')
sns.countplot(logs_pd['source_type'],hue=logs_pd['target'], ax = axes[1][1]).set_title('Bar Graph of Source type')
locs, labels = plt.xticks()
plt.xticks(rotation=45)

**Insights from above Graphs**

*Distributio of target says that the number of 0s and 1s are almost equal. Hence, we can say that our dataset is balanced.*

*Bar graph of Source sytem tab specifies ‘my-library’ and the ‘discover’ features of the app have the highest count of users from where they play their music .*

*Bar graph of shows us that the 3 languages with codes 3.0 (Taiwanese), 52.0 (English) and 31.0 (Korean) make up for most of the data.*

*Bar graph of System type tells us that most of the users prefer playing from their local playlist or local library when they open their app.*

In [25]:
#plot to depict Genre_preference for males
plt.figure(figsize=(8,8))
logs_pd.query("gender =='male'")["genre_ids"].value_counts().head(15).plot.bar()
plt.title("Distribution of Genres across Males ")
plt.xlabel("Genre IDs")
plt.ylabel("Count")

The above graph is just a general distribution of Genre IDs across male. We can observe that  male user are more insterested in genre with id's 465 and 458.

In [27]:
#plot to depict Genre_preference for males
plt.figure(figsize=(8,8))
logs_pd.query("gender =='female'")["genre_ids"].value_counts().head(15).plot.bar()
plt.title("Distribution of Genres across Females ")
plt.xlabel("Genre IDs")
plt.ylabel("Count")

The above graph is just a general distribution of Genre IDs across females. We can observe that genre preference for female users follow almost the similar order that of male users

# Model Building

* Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables. There are 2 ways we can do this.

**String Indexing**

This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}. This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

**One-Hot Encoding**

This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

* In this dataset, we have ordinal variables like city,Source Sytem tab, source type, Source screen name,Registrayion channel, gender, genre_ids, and Language. we will use One-Hot Encoding to convert all categorical variables into binary vectors. It is possible here to improve prediction accuracy by converting each categorical column with an appropriate method.

* Here, we will use a combination of StringIndexer and OneHotEncoderEstimator to convert the categorical variables. The OneHotEncoderEstimator will return a SparseVector. 

* Since we will have more than 1 stage of feature transformations, we use a Pipeline to tie the stages together. This simplifies our code.

In [31]:
#Categorical Columns
encodeCol = ['source_system_tab', 'source_screen_name', 'source_type', 'gender', 'registered_via', 'language', 'city', 'genre_ids']

#Numerical columns
numericCol = ['daysLeft', 'age', 'song_length']

pipeline_stages = []

#creating stringIndexers and one hot encoders for each categorical column and appending them to pipeline stages.
for col in encodeCol:
  si = feature.StringIndexer().setInputCol(col).setOutputCol(col+'_vec').setHandleInvalid('skip')
  ohe = feature.OneHotEncoderEstimator().setInputCols([si.getOutputCol()]).setOutputCols([col+'_indexed'])
  pipeline_stages += [si,ohe]

#Assembling String indexed features
feature_assembler_si = feature.VectorAssembler().setInputCols( [c+'_vec' for c in encodeCol] + numericCol ).setOutputCol('features_si')

#Assembling one hot encoded features
feature_assembler_ohe=feature.VectorAssembler().setInputCols( [c+'_indexed' for c in encodeCol] + numericCol ).setOutputCol('features_ohe')

#Appending feature assembler to pipeline stages
pipeline_stages += [feature_assembler_si, feature_assembler_ohe]

In [32]:
#Pipeline for encoding the features
dataEncodingPipe = Pipeline().setStages(pipeline_stages)

#Transform the data
encoded_data = dataEncodingPipe.fit(logs).transform(logs)

In [33]:
#Splitting data into training and testing sets
train, test = encoded_data.randomSplit([0.6,0.4], seed =1234)

## Logistic Regression

***Label Encoding***

In [35]:
#Binary evaluator to find model performance on test data
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="target")

In [36]:
#Scaler to standardise the Sting indexed features
scaler_le = feature.StandardScaler(withMean = True, withStd = False, inputCol = 'features_si', outputCol = 'features_si_scaled')

#Logistic regression on Sting indexed features
lr_le = LogisticRegression().setFeaturesCol('features_si_scaled').setLabelCol('target')

#Creating the Pipeline
pipe_le = Pipeline().setStages([scaler_le, lr_le])

#fit the pipe
pipe_le_fitted = pipe_le.fit(train)
print('Performance on training data set:',pipe_le_fitted.stages[-1].summary.areaUnderROC)

In [37]:
#ROC for Logistic regression of String indexed features
import matplotlib.pyplot as plt
%matplotlib inline

roc = pipe_le_fitted.stages[-1].summary.roc.toPandas()
plt.figure(figsize=(8,8))
plt.plot([0, 1], [0, 1], 'r--')
plt.scatter(roc['FPR'],roc['TPR'])
plt.xlabel('False Positive Rate',fontsize=15)
plt.ylabel('True Positive Rate',fontsize=15)
plt.title('ROC Scatter Plot(String Indexed)',fontsize=18)
plt.show()

In [38]:
#Evaluating model on test data
lr_le_predictions = pipe_le_fitted.transform(test)
print("Area Under Roc for test data:", evaluator.evaluate(lr_le_predictions))

***One hot Encoding***

In [40]:
#scaler to standardise one hot encoded features
scaler_ohe = feature.StandardScaler(withMean = True, withStd = False, inputCol = 'features_ohe', outputCol = 'features_ohe_sacled')

#Logistic regression
lr_ohe = LogisticRegression(elasticNetParam=0.1, maxIter=10, regParam = 0.001).setFeaturesCol('features_ohe_sacled').setLabelCol('target')

#Creating the pipeline
pipe_ohe = Pipeline().setStages([scaler_ohe, lr_ohe])

#fit the pipe
pipe_ohe_fitted = pipe_ohe.fit(train)
print('Performance on training data set:',pipe_ohe_fitted.stages[-1].summary.areaUnderROC)

In [41]:
#ROC for one hot encoded Logistic regression
import matplotlib.pyplot as plt
%matplotlib inline

roc = pipe_ohe_fitted.stages[-1].summary.roc.toPandas()
plt.figure(figsize=(8,8))
plt.plot([0, 1], [0, 1], 'r--')
plt.scatter(roc['FPR'],roc['TPR'])
plt.xlabel('False Positive Rate',fontsize=15)
plt.ylabel('True Positive Rate',fontsize=15)
plt.title('ROC Scatter Plot(One Hot Encoding)',fontsize=18)
plt.show()

In [42]:
#Evaluating model on test data
lr_ohe_predictions = pipe_ohe_fitted.transform(test)
print("Area Under Roc for test data:", evaluator.evaluate(lr_ohe_predictions))

In [43]:
#Coeffiecients of logistic regression
weights = pipe_ohe_fitted.stages[-1].coefficients

In [44]:
w_pd = pd.DataFrame(weights.tolist(), columns=['weights'])
c_pd = w_pd[w_pd.weights == 0]
c_pd.shape[0]

11 Features were elimanated by L1 reguralization

## Decision Trees

In [47]:
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="target", featuresCol="features_ohe")

# Train model with Training Data
dtModel = dt.fit(train)

#Making prediction on test data
predictions = dtModel.transform(test)

#Evaluating the predictions
accuracy = evaluator.evaluate(predictions)

print("Accuracy od model: ", accuracy)

In [48]:
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

In [49]:
display(dtModel)

treeNode
"{""index"":27,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":8,""overflow"":false}"
"{""index"":13,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[1.0],""feature"":29,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":48.5,""categories"":null,""feature"":173,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":688.5,""categories"":null,""feature"":172,""overflow"":false}"
"{""index"":1,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[1.0],""feature"":4,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[1.0],""feature"":57,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


## Random Forest

In [51]:
# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="target", featuresCol="features_ohe", numTrees=60, maxDepth=8)

# Train model with Training Data
rfModel = rf.fit(train)

In [52]:
#performance on training set
predictions = rfModel.transform(train)

#Evaluating the model
print("Accuracy of Random Forest on training set:",evaluator.evaluate(predictions))

In [53]:
#Making thebprediction on test data
predictions = rfModel.transform(test)

#Evaluating the model
print("Accuracy of Random Forest on testing set:",evaluator.evaluate(predictions))

In [54]:
#feature importance from random Forest
feature_imp = pd.DataFrame(rfModel.featureImportances.toArray(), columns = ['weights'])
print("Number of features eliminated:", feature_imp[feature_imp.weights == 0].shape[0])

In [55]:
#Bar graph of feature importance
plt.figure(figsize=(12,8))
plt.bar(feature_imp.index,feature_imp.weights)
plt.title("Bar graph of feature importance")
plt.xlabel('feature number')
plt.ylabel('weights')
plt.show()

## Gradient Boosting

In [57]:
#Creating a intial model
gbt = GBTClassifier(labelCol = 'target', featuresCol = 'features_ohe', maxDepth = 6)

#Train the model
gbt_pipeline = gbt.fit(train)

#performance on training set
print("Accuracy of gbt on training data: ",evaluator.evaluate(gbt_pipeline.transform(train)))

#Making predictions on test data
gbt_prediction = gbt_pipeline.transform(test)

#Evaluating the predictions
print("Accuracy of gbt on test data: ",evaluator.evaluate(gbt_prediction))

In [58]:
#feature importance from random Forest
feature_imp2 = pd.DataFrame(gbt_pipeline.featureImportances.toArray(), columns = ['weights'])
print("Number of features eliminated:", feature_imp2[feature_imp2.weights == 0].shape[0])

In [59]:
#Bar graph of feature importance
plt.figure(figsize=(12,8))
plt.bar(feature_imp2.index,feature_imp2.weights)
plt.title("Bar graph of feature importance")
plt.xlabel('feature number')
plt.ylabel('weights')
plt.show()

## Principal Component Analysis (PCA)

In [61]:
#scaler to standardise features
scaler = feature.StandardScaler(withMean = True, withStd = False, inputCol = 'features_ohe',outputCol = 'features_ohe_zfeatures')

#Pca with maximumm number of components
pca = feature.PCA(k = 175, inputCol = 'features_ohe_zfeatures', outputCol = 'pca_scores')

#Pipeline
pipe_pca = Pipeline().setStages([scaler,pca])
pca_fit = pipe_pca.fit(encoded_data)

In [62]:
#Plot to explain cummulative variance
%matplotlib inline
explainedVariance = pca_fit.stages[-1].explainedVariance
cum_sum = np.cumsum(explainedVariance)
plt.figure(figsize=(8,8))
plt.plot(np.arange(1, len(explainedVariance)+1), cum_sum)
plt.title("Scree Plot with maximum number of principal components for genre_id", fontsize =18)
plt.xlabel("Principal Component",fontsize =15)
plt.ylabel("Proportion Variance Explained",fontsize =15)
plt.show()

* This is a little weird as only 3 components were able explain 100% variance of 175 dimensions
* let us try running a logistic regression model on this

In [64]:
#Scaler
scaler = feature.StandardScaler(withMean = True, withStd = False,inputCol = 'features_ohe',outputCol = 'features_ohe_zfeatures')

#Pca with three components
pca = feature.PCA(k = 3, inputCol = 'features_ohe_zfeatures', outputCol = 'pca_scores')

#logistic regression
lr = LogisticRegression().setFeaturesCol('pca_scores').setLabelCol('target')

#pipe line
pipe = Pipeline().setStages([scaler,pca,lr])

#Train the data
pipe_fitted = pipe.fit(train)

#performance on training set
pipe_fitted.stages[-1].summary.areaUnderROC

In [65]:
#Plotting the ROC
%matplotlib inline
roc = pipe_fitted.stages[-1].summary.roc.toPandas()
plt.figure(figsize=(8,8))
plt.plot([0, 1], [0, 1], 'r--')
plt.scatter(roc['FPR'],roc['TPR'])
plt.xlabel('False Positive Rate',fontsize=15)
plt.ylabel('True Positive Rate',fontsize=15)
plt.title('ROC Scatter Plot(PCA)',fontsize=18)
plt.show()

In [66]:
#prediction on test data
predictions = pipe_fitted.transform(test)
print("Performance of PCA" ,evaluator.evaluate(predictions))

* Using PCA to reduce dimesionality has resulted in decrease in accuracy.
* SO PCA is not used for analysis