[View in Colaboratory](https://colab.research.google.com/github/SwapnilSParkhe/Big_Data_Analytics/blob/master/MusicArtist_RecommendationEngine_LastFM360k.ipynb)

# Recommendation Engine (Last.fm 360k data)

!["Image"](https://gigaom2.files.wordpress.com/2012/06/lastfmlogo.png)

## Implementation structure
- Model based Collaborative Filtering using Alternating Least Squares (ALS) Matrix Factorization
- Treating the entries in user-item association matrix as implicit feedback ('listenCount')
- Using DataFrame based API, pyspark.sql.DataFrame to use machine learning pipelines, pyspark.ml
- Model evaluation will be based on root-mean-square error (RMSE) of rating prediction
- Tuning the hyperparameters involved

!["Image"](https://i.pinimg.com/originals/ba/f0/c8/baf0c80a9fea91e79365630709a1fa5c.png)

### 0. Setting Up Google Colab related dependencies**

In [24]:
#Installing libraries
#This installs Apache Spark 2.3.0, Java 8, and Findspark, a library that makes it easy for Python to find Spark.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
!tar xf spark-2.3.0-bin-hadoop2.7.tgz
!pip install -q findspark

#Getting the data from the web
!wget http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-360K.tar.gz -P drive/MSBA/Big_Data_Analytics
  
#Untarring the data
import tarfile
tar_ref = tarfile.open('drive/MSBA/Big_Data_Analytics/lastfm-dataset-360K.tar.gz', "r:gz")
tar_ref.extractall()
tar_ref.close()

#Setting up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.0-bin-hadoop2.7"

--2018-04-30 18:32:46--  http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-360K.tar.gz
Resolving mtg.upf.edu (mtg.upf.edu)... 84.89.139.55
Connecting to mtg.upf.edu (mtg.upf.edu)|84.89.139.55|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 569202935 (543M) [application/x-gzip]
Saving to: ‘drive/MSBA/Big_Data_Analytics/lastfm-dataset-360K.tar.gz.1’

-360K.tar.gz.1       15%[==>                 ]  81.89M  1.72MB/s    eta 4m 32s 


2018-04-30 18:38:03 (1.72 MB/s) - ‘drive/MSBA/Big_Data_Analytics/lastfm-dataset-360K.tar.gz.1’ saved [569202935/569202935]



### 1. Importing relevant libraries

In [0]:
#Library to help Python find spark easily
import findspark
findspark.init()

#Importing libraries
from pyspark.sql import SparkSession #to connect to spark cluster/core
from pyspark import SparkContext  #to read file aptly
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd

### 2. Setting up "SparkSession"  (and "SparkContext" to read files aptly)
**Note**: It provides a single point of entry to interact with underlying Spark functionality;
allows programming Spark with DataFrame and Dataset APIs

In [0]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("RecommendationSystems") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
        
sc=SparkContext.getOrCreate()

#sqlContext=SQLContext(sc)

### 3. Importing file as RDD; Cleaning, Manipulating and Handling data; Making ADS_SDF
**Note**: SDF is Spark Dataframe (cretaing it for further convenient processing)

In [0]:
#Importing file from current library to RDD object
lastfm_RDD = sc.textFile("lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv")

#Cleaning up (lastfm file's line elements are tab separated) 
#Note: RDD schema ~ "user_id" "artsit_id" "artist-name" plays"
lastfm_RDD_clean=lastfm_RDD.map(lambda line:line.split('\t'))
lastfm_RDD_clean_1=lastfm_RDD_clean.map(lambda x: (x[0],x[1],x[3]))

#Convert strings-items into integers (required for ALS in Py using DF)
users = lastfm_RDD_clean_1.map(lambda x: x[0]).distinct().zipWithIndex()
artists = lastfm_RDD_clean_1.map(lambda x: x[1]).distinct().zipWithIndex()

#Substituting the ObjectIDs in the ratings RDD with the corresponding int values
lastfm_RDD_clean_1 = lastfm_RDD_clean_1.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
lastfm_RDD_clean_1 = lastfm_RDD_clean_1.map(lambda r: (r[1], (r[0], r[2]))).join(artists).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))

#Collecting all plays at user-artist level
plays = lastfm_RDD_clean_1.map(lambda x: x[2]).collect()

#Extracting relevant columns to "Rating" object
#Note: ADS RDD schema ~ "user_id" "artist_id" "plays"
ADS_RDD=lastfm_RDD_clean_1.map(lambda x: Row(int(x[0]), int(x[1]), float(x[2])))
ADS_SDF=spark.createDataFrame(ADS_RDD, ['userId', 'artistId', 'plays'] )

### 4. Training Model (using pyspark.ml's ALS model) - "BasicVersion"

In [0]:
#Setting up the parameters for ALS
rank_=7   #No.of Latent Factors (to be made)
maxIter_=10   #No.of Times to repeat 
regParam_= 0.1   #Regularization Parameter in ALS

#Instatiating ALS and Fitting model to whole data
ALS_obj = ALS(rank=rank_, maxIter=maxIter_, regParam=regParam_, 
              coldStartStrategy="drop", implicitPrefs=True,
              userCol="userId", itemCol="artistId", ratingCol="plays")
ALS_model = ALS_obj.fit(ADS_SDF)

### 5. Model Evaluations (based on RMSE) - "BasicVersion"

In [30]:
#Evaluating the model by computing the RMSE on the whole data
def compute_RMSE(model,data):
    
    """ Takes ALS models and testing data in SDF form 
    as input and returns RMSE value """
    
    predictions = model.transform(data)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="plays",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    
    return rmse
 
compute_RMSE(ALS_model,ADS_SDF)

650.7785559354666

### 6. Top 10 movie recommendations for all users - "BasicVersion"

In [0]:
#Generate top 10 movie recommendations for each user
userRecs_SDF = ALS_model.recommendForAllUsers(10)

#Manipulating SDF to get info in required form
user_artistRecs_SDF=userRecs_SDF.select("userId","recommendations.artistId")
user_artistRecs_DF=user_artistRecs_SDF.toPandas().sort_values("userId")
user_artistRecs_DF['artistId(s)']=[str(i)[1:-1] for i in user_artistRecs_DF.artistId]
user_artistRecs_DF.drop('artistId',axis=1,inplace=True)

#Exporting file to tab separated text file
user_artistRecs_txt=user_artistRecs_DF.to_csv(sep='\t', index=False)
file_obj=open("Music_Reco_360k.txt",'w')
file_obj.write(user_artistRecs_txt)
file_obj.close()

### 7.Tuning and Evaluating tuned Models - "TunedVersion"

In [0]:
#Model Params
seed_ = 561
maxIter_ = 10
regParam_ = 0.1
alpha_ = 0.01
ranks = [10,25,50]

#Eval params
errors = [0, 0, 0] #list for storing errors of hyperParams
err_ID = 0 #iterating errorID
tolerance = 0.02

#Print params
min_error = float('inf')
best_rank = -1
best_iteration = -1

#Hyperparamenter tuning (5-times RandomSplit Hold-Out Cross-Validation)
for rank_ in ranks:
    rmse = 0
    for i in range(5):
        print("Running rank : iter ::", rank_,':',i+1)
        #Randomly Splitting the SDF data (train:valid:test :: 60%:30%:10%)
        train_SDF, valid_SDF, test_SDF = ADS_SDF.randomSplit([0.6, 0.3, 0.1])
        valid_predict_SDF = valid_SDF.select(['userId', 'artistId'])
        test_predict_SDF = test_SDF.select(['userId', 'artistId'])
        
        #Instatiating ALS and Fitting model to train data
        ALS_obj = ALS(rank=rank_, maxIter=maxIter_, regParam=regParam_, alpha=alpha_,
                      coldStartStrategy="drop", implicitPrefs=True, seed=seed_,
                      userCol="userId", itemCol="artistId", ratingCol="plays")
        ALS_model = ALS_obj.fit(train_SDF)
    
        #Collecting rmse for this specific split (and then adding for same hyperParams)
        rmse += compute_RMSE(ALS_model,valid_SDF)
        
    
    #Summarizing error(by avg all split rounds' rmse) for given hyperParams
    error = rmse/5
    errors[err_ID] = error
    err_ID += 1 #iterating err_ID with rank iterations
    print ('For hyperParam "rank" %s the RMSE is %s' % (rank_, error))
    
    #Printing best model
    if error < min_error:
        min_error = error
        best_rank = rank_

print ('The best model was trained with hyperParam "rank": %s' % best_rank)

### Final Model - "TunedVersion"