#KK-Box's Music Recommendation System
>___Team members___ <br />
*Anushi Doshi <br />
*Mahesh kumar Badam venkata <br />
*Manideep Kannaiah <br />
*Vidushi Mishra <br />

In [2]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import Row ,functions
from pyspark.sql.types import StringType, DateType
from pyspark.sql.dataframe import DataFrame
from pyspark.ml import feature, Pipeline
from pyspark.ml.classification import LogisticRegression
import numpy as np
import pandas as pd

#Setting up spark session and spark context
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
# read only cell

import os

# get the databricks runtime version
db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if the databricks env var exists
    if db_env != None:
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

#Function defining the shape of spark dataframe
def spark_df_shape(self):
    return (self.count(), len(self.columns))
  
#Plug the function into pyspark
DataFrame.shape = spark_df_shape

In [4]:
#Importing Data 
members = spark.read.csv(get_training_filename('members.csv'),header='true',inferSchema='true')
songs = spark.read.csv(get_training_filename('songs.csv'),header='true',inferSchema='true')
songs_extra_info = spark.read.csv(get_training_filename('song_extra_info.csv'),header='true',inferSchema='true')
logs = spark.read.csv(get_training_filename('train.csv'),header='true',inferSchema='true')

##Basic Data Cleaning and Analysis

In [6]:
print("Data Dimensions:")
print("members:" , members.shape())
print("songs:" , songs.shape())
print("songs_extra_info:" , songs_extra_info.shape())
print("logs:" , logs.shape())

In [7]:
members.show(5)

In [8]:
songs.show(5)

In [9]:
songs_extra_info.show(5)

In [10]:
logs.show(5)

In [11]:
#Merfing songs and song_extra_info 
songs = songs.join(songs_extra_info,
                   songs.song_id == songs_extra_info.song_id,'left_outer')\
  .drop(songs_extra_info.song_id)\
  .withColumn('language',songs.language.cast(StringType()))

#Adjusting data types
members = members.withColumn('registrationDate', functions.unix_timestamp(members.registration_init_time.cast(StringType()), 'yyyyMMdd').cast('timestamp'))\
                               .withColumn('expirationDate', functions.unix_timestamp(members.expiration_date.cast(StringType()),'yyyyMMdd').cast('timestamp'))\
                               .withColumn('Age',members.bd)\
                               .drop('registration_init_time','expiration_date','bd')
                               

#Displaying the data
songs.show(5)
members.show(5)

##Data Visualization

In [13]:
#display(songs.select('language').orderBy('language').groupBy('language').agg(functions.count('language')).dropna())
#display(songs.select('genre_ids').groupBy('genre_ids').agg(functions.count('genre_ids')))

#Model Building

___DataSet Preparation___

In [16]:
#combining data
logs = logs.join(members,members.msno == logs.msno, 'inner').drop(members.msno)
logs =logs.join(songs, logs.song_id == songs.song_id, 'inner').drop(songs.song_id)

#String Indexer for msno and Song_id
userIdStringIndexer = feature.StringIndexer().setInputCol('msno').setOutputCol('userId')
songIdStringIndexer = feature.StringIndexer().setInputCol('song_id').setOutputCol('songId')

#Data Cleaning PipeLines for StringIndexers
dcPipeline = Pipeline(stages = [userIdStringIndexer,songIdStringIndexer])

#Fit the pipeline and tranforming logsDF
logsDf = dcPipeline.fit(logs).transform(logs).drop('msno','song_id')

In [17]:
#logsDF = logsDf.drop('msno','song_id')
logsDf.show()

##Logistic Regression

In [19]:
encodeCol = ['source_system_tab','source_screen_name','source_type','city','gender','registered_via','language','genre_ids']

#String Indexers for all categorical variables
pipeline_stages=[]
for attr in encodeCol:
  si = feature.StringIndexer().setInputCol(attr).setOutputCol(attr+'_vec').setHandleInvalid('skip')
  pipeline_stages.append(si)

#One hot encoding of above String indexers
ohe = feature.OneHotEncoderEstimator().setInputCols([attr+'_vec' for attr in encodeCol]).setOutputCols([attr+'_indexed' for attr in encodeCol])

#feature assembler
features_indexed = [attr+'_indexed' for attr in encodeCol]
features_indexed.extend(['Age','song_length'])
feature_assembler = feature.VectorAssembler().setInputCols(features_indexed).setOutputCol('features')

#Appending one hot encoder and feature assembler pipeline stages
pipeline_stages.append(ohe)
pipeline_stages.append(feature_assembler)


In [20]:
Logit = LogisticRegression().setFeaturesCol('features').setLabelCol('target')

In [21]:
pipeline_stages.append(Logit)
Encode_pipeline = Pipeline().setStages(pipeline_stages)

In [22]:
fit = Encode_pipeline.fit(logsDf)

In [23]:
fit.stages[-1].summary.areaUnderROC