<a href="https://colab.research.google.com/github/harnalashok/hadoop/blob/main/recommenderSystem_giftCards.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Last amended: 15th October, 2021
# Myfolder: github/hadoop
# Objectives:
#             i)  Develop ALS based recommender system
#             ii) Tune grid parameters
#
# Refer:
# https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
# https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2799933550853697/2823893187441173/2202577924924539/latest.html
#
# Data Source(s):
# https://nijianmo.github.io/amazon/index.html


# Spark Reference API
a. [Quickstart](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart.html) <br>
b. Dataframe [APIs list](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#dataframe-apis) at a glance<br>
c. ALso look at useful [this source code](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html) of functions that has examples



# A. Full spark install

### 1.0 Libraries

In [1]:
# 1.0 How to set environment variable
import os  
import time  

## 2.0 Define some functions

#### ssh_install()

In [2]:
# 2.0 Function to install ssh client and sshd (Server)
def ssh_install():
  print("\n--1. Download and install ssh server----\n")
  ! sudo apt-get remove openssh-client openssh-server
  ! sudo apt install openssh-client openssh-server
  
  print("\n--2. Restart ssh server----\n")
  ! service ssh restart

#### Java install

In [3]:
# 3.0 Function to download and install java 8
def install_java():
  ! rm -rf /usr/java

  print("\n--Download and install Java 8----\n")
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null        # install openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     # set environment variable

  !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
  !update-alternatives --set javac /usr/lib/jvm/java-8-openjdk-amd64/bin/javac
  
  !mkdir -p /usr/java
  ! ln -s "/usr/lib/jvm/java-8-openjdk-amd64"  "/usr/java"
  ! mv "/usr/java/java-8-openjdk-amd64"  "/usr/java/latest"
  
  !java -version       #check java version
  !javac -version

#### setup ssh passphrase

In [4]:
# 6.0 Function tp setup ssh passphrase
def set_keys():
  print("\n---22. Generate SSH keys----\n")
  ! cd ~ ; pwd 
  ! cd ~ ; ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
  ! cd ~ ; cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  ! cd ~ ; chmod 0600 ~/.ssh/authorized_keys


#### Set environment

In [5]:
# 7.0 Function to set up environmental variables
def set_env():
  print("\n---23. Set Environment variables----\n")
  # 'export' command does not work in colab
  # https://stackoverflow.com/a/57240319
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
  os.environ["JRE_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/jre"   
  

#### function to install prerequisites
java and ssh<br>


In [6]:
# 8.0 Function to call all functions
def install_components():
  print("\n--Install java----\n")
  ssh_install()
  install_java()  
  #set_keys()
  set_env()


## 3.0 Install components
Start downloading, install and configure. Takes around 2 minutes<br>
Your <u>input *'y'* is required </u>at one place while overwriting earlier ssh keys

In [7]:
# 9.0 Start installation
start = time.time()
install_components()
end = time.time()
print("\n---Time taken----\n")
print((end- start)/60)


--Install java----


--1. Download and install ssh server----

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following packages were automatically installed and are no longer required:
  ncurses-term python3-certifi python3-chardet python3-idna
  python3-pkg-resources python3-requests python3-six python3-urllib3
Use 'sudo apt autoremove' to remove them.
The following packages will be REMOVED:
  openssh-client openssh-server openssh-sftp-server ssh-import-id
0 upgraded, 0 newly installed, 4 to remove and 37 not upgraded.
After this operation, 5,240 kB disk space will be freed.
(Reading database ... 158402 files and directories currently installed.)
Removing openssh-server (1:7.6p1-4ubuntu0.5) ...
invoke-rc.d: could not determine current runlevel
invoke-rc.d: policy-rc.d denied execution of stop.
Removing ssh-import-id (5.7-0ubuntu1.1) ...
Removing openssh-sftp-server (1:7.6p1-4ubuntu0.5) ...
Removing openssh-client (1:7.6p1-4ubuntu0

## 4.0 Install spark
koalas will also be installed

### Define functions

`findspark`: PySpark isn't on `sys.path` by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding `pyspark` to `sys.path` at runtime. `findspark` does the latter.

In [8]:
# 1.0 Function to download and unzip spark
def spark_koalas_install():
  print("\n--1.1 Install findspark----\n")
  !pip install -q findspark

  print("\n--1.2 Install databricks Koalas----\n")
  !pip install koalas
  
  # This download link NEEDS TO BE CHECKED AGAIN
  print("\n--1.3 Download Apache tar.gz----\n")
  ! wget -c https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

  print("\n--1.4 Transfer downloaded content and unzip tar.gz----\n")
  !  mv /content/spark*   /opt/
  ! tar -xzf /opt/spark-3.1.2-bin-hadoop3.2.tgz  --directory /opt/

  print("\n--1.5 Check folder for files----\n")
  ! ls -la /opt


In [9]:
# 1.1 Function to set environment
def set_spark_env():
  print("\n---2. Set Environment variables----\n")
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
  os.environ["JRE_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/jre" 
  os.environ["SPARK_HOME"] = "/opt/spark-3.1.2-bin-hadoop3.2" 
  os.environ["SPARK_CONF_DIR"] = "/opt/spark-3.1.2-bin-hadoop3.2/conf"     
  os.environ["LD_LIBRARY_PATH"] += ":/opt/spark-3.1.2-bin-hadoop3.2/lib/native"
  os.environ["PATH"] += ":/opt/spark-3.1.2-bin-hadoop3.2/bin:/opt/spark-3.1.2-bin-hadoop3.2/sbin"
  print("\n---2.1. Check Environment variables----\n")
  # Check
  ! echo $PATH
  ! echo $LD_LIBRARY_PATH

In [10]:
# 1.2 Function to configure spark 
def spark_conf():
  print("\n---3. Configure spark to access hadoop----\n")
  !mv /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-env.sh.template  /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-env.sh
  #!echo "HADOOP_CONF_DIR=/opt/hadoop-3.2.2/etc/hadoop/" >> /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-env.sh
  print("\n---3.1 Check ----\n")
  #!cat /opt/spark-3.1.1-bin-hadoop3.2/conf/spark-env.sh

### Install spark


In [11]:
# 2.0 Call all the three functions
def install_spark():
  spark_koalas_install()
  set_spark_env()
  spark_conf()


In [12]:
# 2.1 
install_spark()


--1.1 Install findspark----


--1.2 Install databricks Koalas----


--1.3 Download Apache tar.gz----

--2021-10-15 13:04:01--  https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228834641 (218M) [application/x-gzip]
Saving to: ‘spark-3.1.2-bin-hadoop3.2.tgz’


2021-10-15 13:04:03 (142 MB/s) - ‘spark-3.1.2-bin-hadoop3.2.tgz’ saved [228834641/228834641]


--1.4 Transfer downloaded content and unzip tar.gz----


--1.5 Check folder for files----

total 223492
drwxr-xr-x  1 root root      4096 Oct 15 13:04 .
drwxr-xr-x  1 root root      4096 Oct 15 09:20 ..
drwxr-xr-x  1 root root      4096 Oct  8 13:39 google
drwxr-xr-x  4 root root      4096 Oct  8 13:31 nvidia
drwxr-xr-x 13 1000 1000      4096 May 24 04:45 spark-3.1.2-bin-hadoop3.2
-rw-r--r--  1 root r

# B. Call libraries
We call some essential libraries

In [13]:
# 3.0 Just call some libraries to test
import pandas as pd
import numpy as np
import os
import time 

# 3.1 Get spark in sys.path
import findspark
findspark.init()

# 3.2 Call other spark libraries
#     Just to test
from pyspark.sql import SparkSession
import databricks.koalas as ks



In [14]:
# 3.3
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [15]:
# 3.4 Increase cell width to display wide columnar output
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# C. Build spark session
You can modify spark driver/executor memory here<br>
SparkSession name is 'spark'


## Modifying spark configuraion
Increase driver and executor memory

In [16]:
# 4.0 Check template file
! cat /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf.template

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enable

In [17]:
# 4.1 Create spark-defaults.conf 
! cp /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf.template  /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf


In [18]:
# 4.2 Amend properties
! echo "spark.driver.memory 6g" >> /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf
! echo "spark.executor.memory 3g" >> /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf

In [19]:
# 4.3 Check now
! cat /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enable

## Stop and start SparkSession

In [26]:
# 5.0 Build spark session:
#    Stop spark, if started

if 'spark' in locals():
  spark.stop()

# 5.1 Now start spark
spark = SparkSession. \
                    builder. \
                    master("local[*]"). \
                    appName("myexpt"). \
                    getOrCreate()

In [27]:
sc = spark.sparkContext
spark.sparkContext.getConf().getAll()

[('spark.executor.memory', '3g'),
 ('spark.app.name', 'myexpt'),
 ('spark.executor.id', 'driver'),
 ('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'),
 ('spark.app.startTime', '1634303391906'),
 ('spark.driver.memory', '6g'),
 ('spark.driver.port', '33101'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '90202ab9013d'),
 ('spark.app.id', 'local-1634303392022'),
 ('spark.ui.showConsoleProgress', 'true')]

In [28]:
# 5.2
print(spark.sparkContext._conf.getAll())

[('spark.executor.memory', '3g'), ('spark.app.name', 'myexpt'), ('spark.executor.id', 'driver'), ('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'), ('spark.app.startTime', '1634303391906'), ('spark.driver.memory', '6g'), ('spark.driver.port', '33101'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.driver.host', '90202ab9013d'), ('spark.app.id', 'local-1634303392022'), ('spark.ui.showConsoleProgress', 'true')]


# D. Test spark
Use existing *spark* session to test if spark is working


In [None]:
# 6.0 Pandas DataFrame
pdf = pd.DataFrame({
        'x1': ['a','a','b','b','', 'c', 'd','d'],
        'x2': ['apple','', 'orange','orange', 'peach','','apple','orange'],
        'x3': [1, 1, 2, 2, 2, 4, 1, 2],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5, 3.0, 2.0],
        'y1': [1, 0, 1, 0, 0, 1, 1, 0],
        'y2': ['yes', 'no', 'no','','', 'yes','', 'yes']
    })

# 6.1
pdf

In [None]:
# 6.2 Transform to Spark DataFrame
#     and print
df = spark.createDataFrame(pdf)
df.show()

In [61]:
############

# E. Some useful functions
Execute only if you need. Else, forget it

In [None]:
# 7.0 Per column how many null values:
from pyspark.sql.functions import isnan, when, count, col
def null_values(data):
  data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

In [None]:
# 8.0 Finding mode of a column
# Refer StackOverflow: https://stackoverflow.com/a/58279672
def mode(df,col):
  df.groupby("col").count().orderBy("count", ascending=False).first()[0]

# 9.0 Find mode of all columns
def mode_cols(df):
  [[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]  

In [None]:
# 10.0 Value counts
def value_counts(df):
    for colm in df.columns:
        df.groupby(colm).count().show()

In [None]:
# 11.0 Map a string to another
# See here: https://stackoverflow.com/a/55026324/3282777
# Code not tested
# Map a column 'colname' in dataframe 'df'
# as follows:
#
# map_dict= {
#            'A': '1',
#            'B': '2'
#           }

def mapping(df,map_dict, colname):
  df2 = df.replace(to_replace=map_dict, subset=['yourColName'])

# F. Your experiments
SparkSession is <i>'spark'</i>. Call all needed libraries

In [29]:
# 1.0 Mount gdrive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [30]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext

In [31]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [43]:
ratings = spark.read.csv(
                         "/content/drive/MyDrive/Colab_data_files/recommenderSystems/gift_cards.csv",
                         header=True
                         )

In [44]:
ratings.show(3)
ratings.printSchema()

+----------+--------------+------+----------+
|      item|        userid|rating| timestamp|
+----------+--------------+------+----------+
|B001GXRQW0| APV13CM0919JD|   1.0|1229644800|
|B001GXRQW0|A3G8U1G1V082SN|   5.0|1229472000|
|B001GXRQW0| A11T2Q0EVTUWP|   5.0|1229472000|
+----------+--------------+------+----------+
only showing top 3 rows

root
 |-- item: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [45]:
st = StringIndexer(
                    inputCols = ['item', 'userid'],
                    outputCols = ['card', 'user']
                   )
model = st.fit(ratings)
ratings = model.transform(ratings)

In [50]:
ratings.show()

+------+----+------+
|rating|card|  user|
+------+----+------+
|   1.0|  39|120184|
|   5.0|  39| 87652|
|   5.0|  39| 13165|
|   5.0|  39|106567|
|   1.0|  39| 77751|
|   3.0|  39| 44380|
|   1.0|  39| 42655|
|   1.0|  39| 96529|
|   5.0|  39| 21991|
|   1.0|  39|114512|
|   5.0|  39| 79408|
|   5.0|  39|102618|
|   4.0|  39| 15821|
|   5.0|  39| 69061|
|   4.0|  39|  6167|
|   5.0|  39| 44270|
|   5.0|  39| 38757|
|   5.0|  39|113343|
|   1.0|  39| 53351|
|   5.0|  39|104423|
+------+----+------+
only showing top 20 rows



In [None]:
ratings = ratings.\
                   withColumn('card', col('card').cast('integer')).\
                   withColumn('user', col('user').cast('integer')).\
                   withColumn('rating', col('rating').cast('float')).\
                   drop('timestamp').drop('item').drop('userid')
ratings.show()

In [48]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [71]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

print(train.count())
print(test.count())

117935
29259


In [72]:
train = train.cache()

In [None]:
assert train.is_cached

## Modeling

In [76]:
# Create ALS model
als = ALS(
            userCol="user",
            itemCol="card",
            ratingCol="rating",
            maxIter=5, 
            #regParam=0.01,
            nonnegative = True,
            implicitPrefs = False,
            coldStartStrategy="drop"
          )

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [99]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [100]:
# Add hyperparameters and their respective values to param_grid
# May try:  .addGrid(als.regParam, [.01, .1, .15]) \
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [4, 8, 12]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  3


In [101]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
                                 metricName="rmse",
                                 labelCol="rating",
                                 predictionCol="prediction"
                                ) 


In [102]:
# Build cross validation using CrossValidator
cv = CrossValidator(
                     estimator=als,
                     estimatorParamMaps=param_grid,
                     evaluator=evaluator,
                     numFolds=3
                    )

# Confirm cv was built
print(cv)

CrossValidator_4c319b63ea00


In [103]:
%%time

#Fit cross validator to the 'train' dataset

model = cv.fit(train)
#model = als.fit(train)     # 30seconds / 10 minutes

CPU times: user 3.72 s, sys: 571 ms, total: 4.29 s
Wall time: 7min 10s


In [104]:
# View the predictions
%%time
test_predictions = model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.5538336352667395
CPU times: user 274 ms, sys: 43.1 ms, total: 317 ms
Wall time: 32.5 s


In [105]:
#Extract best model from the cv model above
best_model = model.bestModel

In [106]:

# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 4
  MaxIter: 5
  RegParam: 0.1


In [107]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.5538336352667395


In [108]:
test_predictions.show()

+------+----+-----+----------+
|rating|card| user|prediction|
+------+----+-----+----------+
|   5.0| 148| 6732| 1.9062009|
|   5.0| 148| 5585| 3.1085262|
|   5.0| 148|10053|  5.093481|
|   5.0| 148| 6618| 1.9062009|
|   5.0| 148| 9197| 3.8480022|
|   5.0| 148|   23| 3.3468761|
|   5.0| 148|    7|  5.146885|
|   4.0| 148|  277| 5.1043305|
|   5.0| 148|  524| 4.6295757|
|   4.0| 148| 9201| 3.3762894|
|   5.0| 148|  730|  4.504654|
|   5.0| 148| 2981| 4.9238777|
|   5.0| 148| 7544| 1.9062009|
|   5.0| 148|10717|0.47338432|
|   5.0| 148| 1309|  4.302889|
|   1.0| 148|   89| 2.6841044|
|   4.0| 463|10145| 10.234779|
|   5.0| 496| 3631| 3.3246105|
|   5.0| 243| 7361| 2.3075113|
|   5.0| 392| 5327| 3.8420656|
+------+----+-----+----------+
only showing top 20 rows



In [110]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)
#userRecs = model.recommendForAllUsers(10)

userRecs.show()

+----+--------------------+
|user|     recommendations|
+----+--------------------+
| 148|[{949, 15.823175}...|
| 463|[{1141, 16.12802}...|
| 471|[{1175, 15.156533...|
| 496|[{949, 14.877338}...|
| 833|[{1141, 15.091605...|
|1088|[{1141, 15.598734...|
|1238|[{1141, 16.194313...|
|1342|[{1292, 15.278643...|
|1580|[{1253, 14.972759...|
|1591|[{1353, 15.475293...|
|1645|[{1141, 15.24032}...|
|1829|[{1224, 14.170139...|
|1959|[{1175, 15.554655...|
|2122|[{1141, 15.24032}...|
|2142|[{1259, 15.30538}...|
|2366|[{1175, 15.224998...|
|2659|[{1141, 15.612871...|
|2866|[{1029, 12.96638}...|
|3175|[{1141, 15.773278...|
|3749|[{1224, 17.205673...|
+----+--------------------+
only showing top 20 rows

