# Workflow automation for a recommender engine using ALS Model in PySpark
### This notebook demonstrates the automation of a product recommendation engine given we have user-item interaction data in terms of the frequency of purchase for each unique user-item pair
### The automation is implemented by deploying the notebook as a job which reads the input data from a remote database, trains and runs the model and writes the output back to the designated database


## Importing the libraries and starting the Spark Session

In [1]:
#168de5db-7967-4549-8b28-2ae60765f842
#import os
#print ( os.environ['KERNEL_ID'])


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200924123619-0000
KERNEL_ID = e44a0d31-1eca-4d86-b860-cf00d23243c0
e44a0d31-1eca-4d86-b860-cf00d23243c0


In [2]:
#System.getenv("KERNEL_ID")

NameError: name 'System' is not defined

In [1]:
### Load packages 
import time
t0 = time.time()
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel # factorizacion de matrices
from pyspark.context import SparkContext 
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator
import jaydebeapi, pandas as pd

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200924065743-0002
KERNEL_ID = c5f13348-c8c7-440f-bcf2-ec6529f2f966


In [2]:
### set up spark session and context 
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## Remote connection to the database ( Any supported database like DB2, Netezza, PDA )
### To enable the connection, add your database under the Data Sources tab in your project. You would need information about your database like JDBC URL, type, username/password

### Adding the asset (for example- the dataset which has the user-item interaction information) from the remote database after setting up the connection

In [3]:
%%time
# @hidden_cell
# This connection object is used to access your data and contains your credentials.
# You might want to remove those credentials before you share your notebook.

#PARA LEER REGISTROS DE PDA CON ALTO VOLUMEN (Ej.: 300 MILLONES ) USAR SPARK SQL

from project_lib import Project
project = Project.access()
PDA_Analytics_credentials = project.get_connection(name="PDA_Analytics")

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

query = '''
SELECT CLIENTECODIGO as ID_CTE, CAST( ADCLAFAM as integer) as ID_CLAS1, 
		FRECUENCIA as FREQUENCY  
	FROM(	SELECT *, TRIM(TO_CHAR(FAMILIA,'000')) AS FAM, TRIM(TO_CHAR(CLASE,'00')) AS CLAS, 
	                  TRIM(TO_CHAR(AREA,'0')) AS AREA, (AREA||DEPARTAMENTO||CLAS||FAM) AS ADCLAFAM 
				FROM(    SELECT  CLIENTECODIGO, CLASE, FAMILIA, DEPARTAMENTO, 
				                 SUM(CASE WHEN CLASE>'0' THEN 1 ELSE 0 END) AS FRECUENCIA,
				                 MAX(CASE WHEN CARTERA='Ropa' THEN 1  
								          WHEN CARTERA='Muebles' THEN 2
										  WHEN CARTERA='Prestamos' THEN 3 ELSE 0 END) AS AREA
							FROM( SELECT *, CASE WHEN CLASE>'0' THEN 1 ELSE 0 END AS T_CLASE, 
							               CASE WHEN FAMILIA>'0' THEN 1 ELSE 0 END AS T_FAMILIA
									FROM DIRECCIONRIESGOS.ADMIN.TRANSACCIONESCARTERAS 
									where FECHACORTE between '2017-01-31' and '2019-12-31' 
									and CLIENTECODIGO not in (9001,9000) AND CLIENTECODIGO >10000) E 
				GROUP BY CLIENTECODIGO, CLASE, FAMILIA, DEPARTAMENTO 
				ORDER BY CLIENTECODIGO) T 
	WHERE CLASE NOT IN (-99)) P 
	ORDER BY CLIENTECODIGO,ADCLAFAM --limit 30000000
'''

dbTableOrQuery = "(" + query + ") TBL"

t1 = time.time()

spark_df = sparkSession.read.format('jdbc') \
    .option('url', 'jdbc:netezza://{}:{}/{}'.format(PDA_Analytics_credentials['host'],PDA_Analytics_credentials['port'],PDA_Analytics_credentials['database'])) \
    .option('dbtable', dbTableOrQuery) \
    .option("numPartitions", 18) \
    .option("lowerBound", 1) \
    .option("upperBound", 1000000) \
    .option("partitionColumn", "ID_CTE") \
    .option('user', PDA_Analytics_credentials['username']) \
    .option('password', PDA_Analytics_credentials['password']).load()

spark_df.show(5)


t2 = time.time()
seconds_get_data = t2 - t1 
print(str(seconds_get_data / 60 ) + ' minutos en transferir tabla PDA - USANDO SPARK SQL')


# PARA 300mil registros 0.23564455509185792 minutos en transferir tabla PDA - USANDO SPARK SQL - Usando 8 Executors 2VCPU 8GBRAM
# PARA 3MILLONES registros 2.376683322588603 minutos en transferir tabla PDA - USANDO SPARK SQL - Usando 8 Executors 2VCPU 8GBRAM
# PARA 10MILLONES registros 2.254576830069224 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 30MILLONES registros 2.587259344259898 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 50 MILLONES registros 2.624022964636485 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 60 MILLONES registros 3.0992782632509868 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 70 MILLONES registros 7.071753338972727 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 70 MILLONES registros parallel 2.1106858770052592 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA 100 MILLONES registros parallel 2.2880197008450827 minutos en transferir tabla PDA - USANDO SPARK SQL
# PARA TODA EL QUERY (318MILLONES) registros parallel 0.5605056325594584 minutos en transferir tabla PDA - USANDO SPARK SQL

+------+--------+---------+
|ID_CTE|ID_CLAS1|FREQUENCY|
+------+--------+---------+
| 10002| 1313064|        1|
| 10002| 1314064|        3|
| 10002| 1862008|        4|
| 10002| 1867048|        2|
| 10002| 2210070|        1|
+------+--------+---------+
only showing top 5 rows

2.335151453812917 minutos en transferir tabla PDA - USANDO SPARK SQL
CPU times: user 231 ms, sys: 73.1 ms, total: 304 ms
Wall time: 2min 20s


In [None]:
print(spark_df.count())

In [None]:
#SKIP THIS CODE
# Load JDBC data to Spark dataframe
# ESTE ES SOLAMENTE CODIGO DE EJEMPLO PARA HACER UN QUERY A UNA CONEXION DE BD CON SPARK SQL
dbTableOrQuery = '"' + (dataSet['schema'] + '"."' if(len(dataSet['schema'].strip()) != 0) else '') + dataSet['table'] + '"'
if (dataSet['query']):
    dbTableOrQuery = "(" + dataSet['query'] + ") TBL"
final_stat = sparkSession.read.format("jdbc").option("url", dataSource['URL']).option("dbtable", dbTableOrQuery).option("user",dataSource['user']).option("password",dataSource['password']).load()
final_stat.show(5)

In [None]:
#SKIP THIS CODE
# Access to Porject's conexion
# and get the data 
# NO RECOMENDABLE PARA DATASETS DE GRAN VOLUMEN USAR PANDAS

from project_lib import Project
project = Project.access()
PDA_COPPEL_2020_credentials = project.get_connection(name="PDA_Analytics")

PDA_COPPEL_2020_connection = jaydebeapi.connect('org.netezza.Driver',
    '{}://{}:{}/{}'.format('jdbc:netezza',
    PDA_COPPEL_2020_credentials['host'],
    PDA_COPPEL_2020_credentials['port'],
    PDA_COPPEL_2020_credentials['database']),
    [PDA_COPPEL_2020_credentials['username'],
    PDA_COPPEL_2020_credentials['password']])

query = '''
SELECT CLIENTECODIGO as ID_CTE, CAST( ADCLAFAM as integer) as ID_CLAS1, 
		FRECUENCIA as FREQUENCY  
	FROM(	SELECT *, TRIM(TO_CHAR(FAMILIA,'000')) AS FAM, TRIM(TO_CHAR(CLASE,'00')) AS CLAS, 
	                  TRIM(TO_CHAR(AREA,'0')) AS AREA, (AREA||DEPARTAMENTO||CLAS||FAM) AS ADCLAFAM 
				FROM(    SELECT  CLIENTECODIGO, CLASE, FAMILIA, DEPARTAMENTO, 
				                 SUM(CASE WHEN CLASE>'0' THEN 1 ELSE 0 END) AS FRECUENCIA,
				                 MAX(CASE WHEN CARTERA='Ropa' THEN 1  
								          WHEN CARTERA='Muebles' THEN 2
										  WHEN CARTERA='Prestamos' THEN 3 ELSE 0 END) AS AREA
							FROM( SELECT *, CASE WHEN CLASE>'0' THEN 1 ELSE 0 END AS T_CLASE, 
							               CASE WHEN FAMILIA>'0' THEN 1 ELSE 0 END AS T_FAMILIA
									FROM DIRECCIONRIESGOS.ADMIN.TRANSACCIONESCARTERAS 
									where FECHACORTE between '2017-01-31' and '2019-12-31' 
									and CLIENTECODIGO not in (9001,9000) AND CLIENTECODIGO >10000) E 
				GROUP BY CLIENTECODIGO, CLASE, FAMILIA, DEPARTAMENTO 
				ORDER BY CLIENTECODIGO) T 
	WHERE CLASE NOT IN (-99)) P 
	ORDER BY CLIENTECODIGO,ADCLAFAM 
'''
# 1% ~ 40 minutos 
# 0.1% ~ 6 minutos 
# medimos el tiempo en transferir los datos 
t1 = time.time()
data_df_1 = pd.read_sql(query, con=PDA_COPPEL_2020_connection)
t2 = time.time()
seconds_get_data = t2 - t1 
print(str(seconds_get_data / 60 ) + ' minutos en transferir tabla')
data_df_1.head(10)

In [None]:
#SKIP THIS CODE
print(data_df_1.dtypes) # #  validamos el tipo de dato y numero de registros
print(data_df_1.shape)

In [None]:
#SKIP THIS CODE
spark_df = spark.createDataFrame(data_df_1) # distribuimos el pandas dataframe a spark 
# hacer el get desde spark 

In [None]:
# spark_df.show() #  no es necesario verlo 

### Preparing data for the model

In [4]:

ratings = (spark_df
    .select(
        'ID_CTE',
        'ID_CLAS1',
        'FREQUENCY',
    )
).cache() #  lo cargamos a memoria ram 

### Make sure your data is 'integer' type 

### Spliting the data set to test and train for measuring the performance of the ALS Model

In [5]:
(training, test) = ratings.randomSplit([0.8, 0.2]) #  debemos validar esto 

## Build the recommendation model using ALS on the training data
### Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. 

### Parameters of ALS Model in PySpark realization are following:

##### NumBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation.
##### rank is the number of latent factors in the model.
##### maxIter is the maximum number of iterations to run.
##### regParam specifies the regularization parameter in ALS.
##### implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
##### alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0)

### Explicit vs. implicit feedback
#### The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item, for example, users giving ratings to products.
#### It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). The approach used in spark.ml to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets. Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

In [6]:
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
t2 = time.time()
als = ALS(maxIter=10, regParam=0.01, rank = 100, 
          userCol="ID_CTE", itemCol="ID_CLAS1", ratingCol="FREQUENCY",
          coldStartStrategy="drop",
          implicitPrefs=True)

model = als.fit(ratings)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="FREQUENCY",
                                predictionCol="prediction")
t3 = time.time()
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
print('Tiempo de entranar y predecir    ' + str( (t3- t2)/60) + '    minutos') 

# rbarran con 300mil registros Tiempo de entranar y predecir    2.1862693349520366    minuto
# rbarran con 3millones Tiempo de entranar y predecir    9.564896901448568    minutos
# rbarran con 10millones Tiempo de entranar y predecir    14.262361097335816    minutos
#rbarran con 30millones Tiempo de entranar y predecir    21.207575702667235    minutos

Root-mean-square error = 1.2610886477567078
Tiempo de entranar y predecir    39.46960059007009    minutos


###  Generate top k Item recommendations for each user
#### The value of 'k' here is 10 and can be changed by passing the desired value to the function



In [None]:
userRecs = model.recommendForAllUsers(10)
#userRecs.count()

#### Display the results : Each row represents the 'k' recommendations for each User

In [None]:
#userRecs.take(10)

#### For getting each recommendation as a row in the final csv, we break down the result generated above using the explode function

In [None]:
from pyspark.sql.functions import explode
userRecs1=userRecs.withColumn("recommendations", explode(userRecs.recommendations))
userRecs1.show()

####  Breaking down reach recommendation to separate columns

In [None]:
#import select as s   PDA_Analytics


In [None]:
userRecs1= userRecs1 \
  .select('ID_CTE', 'recommendations.*')       

### Writing the Output back to the Remote Datasource
#### Thereby the output or resulting csv can be consumed directly by anyone who has the access to the remote database 

In [None]:
'''new_table_name = 'ANALITICAAFORE.ADMIN.RECOMMENDATIONSRESULT_TEST_CENIC'

userRecs1.coalesce(2).write.format('jdbc') \
    .mode('overwrite') \
    .option('url', 'jdbc:netezza://{}:{}/{}'.format(PDA_COPPEL_2020_credentials['host'],PDA_COPPEL_2020_credentials['port'], PDA_COPPEL_2020_credentials['database'])) \
    .option('dbtable', new_table_name) \
    .option('user', PDA_COPPEL_2020_credentials['username']).option('driver','org.netezza.Driver').option('password', PDA_COPPEL_2020_credentials['password']).save()
'''    

In [None]:
userRecs1.show(5)

In [None]:
new_table_name = 'RecommendationsResult_test_cenic'

userRecs1.coalesce(1).write.format('jdbc') \
    .mode('overwrite') \
    .option('url', 'jdbc:netezza://{}:{}/{}'.format(PDA_COPPEL_2020_credentials['host'],PDA_COPPEL_2020_credentials['port'],'CENIC')) \
    .option('dbtable', new_table_name) \
    .option('user', PDA_COPPEL_2020_credentials['username']).option('driver','org.netezza.Driver').option('password', PDA_COPPEL_2020_credentials['password']).save()

## Saving the model for deployment in WML

In [None]:
#!pip install  --proxy=https://10.33.128.80:8080 dsx proxy para usar pip 

### Pack the model inside a pipeline 
#### Since the WML deployments allow saving Spark Pipelines directly, put the ALS model inside a Pipeline for direct deployment stage
#### Typically, A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For this case, since the pipeline is bought in to action just for the sole cause of deployment, we do not use any transformers and such

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[model])

In [None]:
model_alsWML = pipeline.fit(ratings)

In [None]:
# model_alsWML.save('/temp/')

In [None]:
t_final = time.time()

In [None]:
print(t_final- t0)

In [None]:
print((t_final- t0) / 60 )