<a href="https://colab.research.google.com/github/BALaka-18/Sample_Recommendation_Pyspark_ALS/blob/master/Pyspark_ALS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Installing Apache Spark_2.4.5, Java_8, Hadoop_2.7.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirrors.viethosting.com/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

#### Importing the other libraries and mounting GDrive.

In [0]:
import pandas as pd
from google.colab import drive

In [13]:
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


#### Initiating Spark and starting the session. After that, the data is read.

In [0]:
import findspark
findspark.init()     # Initializing
from pyspark.sql import SparkSession          # for running a session

#### STEPS TO READ A FILE AS CSV IN SPARK

1st PART 

1. Opens a session
2. Build
3. Name of project
4. Called to create/build the project

2nd PART 

1. Format to be read in
2. Explicitly mention delimiter if it's something else than ','
3. header = true, if you want the first row to be the column headers
4. File source

In [20]:
spark = SparkSession \
      .builder \
      .appName('ALS_Recommendation') \
      .getOrCreate()         

rawData = spark.read\
               .format('csv')\
               .option('delimiter', '\t')\
               .option('header', 'true')\
               .load('/content/gdrive/My Drive/user_artists.dat')

rawData.toPandas().head(90)       # Turn to a pandas DataFrame

Unnamed: 0,userID,artistID,weight
0,2,51,13883
1,2,52,11690
2,2,53,11351
3,2,54,10300
4,2,55,8983
...,...,...,...
85,3,136,76
86,3,137,75
87,3,138,72
88,3,139,72


In [23]:
# Change type to 'int'

from pyspark.sql.functions import col

data = rawData.select(col('userID').cast('int'),
                      col('artistID').cast('int'),
                      col('weight').cast('int')
                      )
data

DataFrame[userID: int, artistID: int, weight: int]

#### Normalizing the Weights (Number of times an user played a song)

In [25]:
from pyspark.sql.functions import col, mean, stddev

df = data.select(mean('weight').alias('mean_weight'),
                 stddev('weight').alias('stddev_weight'))\
          .crossJoin(data)\
          .withColumn('weight_scaled' , 
                      (col('weight') - col('mean_weight')) / col('stddev_weight'))
          
df.toPandas().head()

Unnamed: 0,mean_weight,stddev_weight,userID,artistID,weight,weight_scaled
0,745.24393,3751.32208,2,51,13883,3.502167
1,745.24393,3751.32208,2,52,11690,2.917573
2,745.24393,3751.32208,2,53,11351,2.827205
3,745.24393,3751.32208,2,54,10300,2.547037
4,745.24393,3751.32208,2,55,8983,2.195961


###### IMPLEMENTING PRE-BUILT ALS ALGORITHM FOR RECOMMENDATION

In [0]:
(train,test) = df.randomSplit([0.8,0.2])   # Train-test split

from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10,
          regParam=0.1,
          userCol='userID',
          itemCol='artistID',
          implicitPrefs=True,
          ratingCol='weight_scaled',
          coldStartStrategy='drop')

model = als.fit(train)

In [29]:
pred = model.transform(test)
pred.toPandas().head()

Unnamed: 0,mean_weight,stddev_weight,userID,artistID,weight,weight_scaled,prediction
0,745.24393,3751.32208,1137,463,77,-0.178136,0.0
1,745.24393,3751.32208,306,463,452,-0.078171,0.0
2,745.24393,3751.32208,859,463,243,-0.133885,0.012615
3,745.24393,3751.32208,578,471,152,-0.158143,0.031211
4,745.24393,3751.32208,1456,471,825,0.021261,0.008705


In [30]:
pred_new = pred.select('weight_scaled', 'prediction').toPandas()
pred_new.describe()

Unnamed: 0,weight_scaled,prediction
count,16211.0,16211.0
mean,0.010592,0.043121
std,1.046301,0.101711
min,-0.198395,-0.196088
25%,-0.166673,0.0
50%,-0.123222,0.00246
75%,-0.025389,0.036711
max,85.297863,0.97279
