# Training

In [1]:
from pyspark.sql import SparkSession

    # build our own SparkSession
spark = (SparkSession
        .builder
        .appName("BigData")
        .config("spark.sql.shuffle.partitions",6)
        .config("spark.sql.repl.eagereval.enabled",True)
        .getOrCreate()
        )

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/09 15:33:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/09 15:33:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/09 15:33:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
df_hotels = spark.read.parquet('small-hotels')
#df_hotels = spark.read.parquet('whole-hotels')

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import os
import sys

import numpy as np 
import pandas as pd  
import matplotlib.pyplot as plt
import seaborn as sns
# from ydata_profiling import ProfileReport
import warnings
warnings.filterwarnings("ignore")

In [4]:
df_hotels.show(1, vertical=True)

-RECORD 0------------------------------------
 date_time             | 2014-09-02 01:05:54 
 site_name             | 2                   
 posa_continent        | 3                   
 user_location_country | 75                  
 user_location_region  | 144                 
 user_location_city    | 52467               
 user_id               | 1145960             
 mobile                | 0                   
 package               | 0                   
 channel               | 9                   
 srch_ci               | 2014-09-03 00:00:00 
 srch_co               | 2014-09-08 00:00:00 
 num_adults            | 2                   
 num_children          | 0                   
 num_room              | 1                   
 id_destination        | 46375               
 type_destination      | 1                   
 booking               | 0                   
 similar_srch          | 1                   
 hotel_continent       | 5                   
 hotel_country         | 147      

In [5]:
df_hotel = df_hotels.select("num_nights", "similar_srch", "num_room", "num_adults", "num_children", "channel", "mobile", "booking", "package", "user_location_country", "user_location_city", "user_location_region", "Id_hotel", "user_id")

## Applying Smoothing

In [6]:
from pyspark.sql.functions import avg, when, col

# Calcular a média de "booking" quando igual a 1
booking_avg = df_hotels.select(avg(when(col("booking") == 1, 1))).collect()[0][0]

# Calcular a porcentagem de "booking" igual a 1
booking_percentage = df_hotels.filter(col("booking") == 1).count() / df_hotels.count()

# Adicionar a porcentagem à coluna "booking"
df_hotels = df_hotels.withColumn("booking", col("booking") + booking_percentage * booking_avg)

# Encoding

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Creating StringIndexer objects for each categorical variable
indexer_country = StringIndexer(inputCol="user_location_country", outputCol="country_index")
indexer_city = StringIndexer(inputCol="user_location_city", outputCol="city_index")
indexer_region = StringIndexer(inputCol="user_location_region", outputCol="region_index")
indexer_channel = StringIndexer(inputCol="channel", outputCol="channel_index")

# Fitting the StringIndexer objects on the data
indexer_model_country = indexer_country.fit(df_hotel)
indexer_model_city = indexer_city.fit(df_hotel)
indexer_model_region = indexer_region.fit(df_hotel)
indexer_model_channel = indexer_channel.fit(df_hotel)

# Transforming the data using the StringIndexer objects
df_hotel_indexed = indexer_model_country.transform(df_hotel)
df_hotel_indexed = indexer_model_city.transform(df_hotel_indexed)
df_hotel_indexed = indexer_model_region.transform(df_hotel_indexed)
df_hotel_indexed = indexer_model_channel.transform(df_hotel_indexed)

# Creating OneHotEncoder objects for each categorical variable
encoder_country = OneHotEncoder(inputCol="country_index", outputCol="country_encoded")
encoder_city = OneHotEncoder(inputCol="city_index", outputCol="city_encoded")
encoder_region = OneHotEncoder(inputCol="region_index", outputCol="region_encoded")
encoder_channel = OneHotEncoder(inputCol="channel_index", outputCol="channel_encoded")

# Fitting the OneHotEncoder objects on the data
encoder_model_country = encoder_country.fit(df_hotel_indexed)
encoder_model_city = encoder_city.fit(df_hotel_indexed)
encoder_model_region = encoder_region.fit(df_hotel_indexed)
encoder_model_channel = encoder_channel.fit(df_hotel_indexed)

# Transforming the data using the OneHotEncoder objects
df_hotel_encoded = encoder_model_country.transform(df_hotel_indexed)
df_hotel_encoded = encoder_model_city.transform(df_hotel_encoded)
df_hotel_encoded = encoder_model_region.transform(df_hotel_encoded)
df_hotel_encoded = encoder_model_channel.transform(df_hotel_encoded)

# Displaying the encoded data
df_hotel_encoded.select("user_location_country", "country_encoded", "user_location_city", "city_encoded", "user_location_region", "region_encoded", "channel", "channel_encoded").show()

+---------------------+----------------+------------------+--------------------+--------------------+-----------------+-------+---------------+
|user_location_country| country_encoded|user_location_city|        city_encoded|user_location_region|   region_encoded|channel|channel_encoded|
+---------------------+----------------+------------------+--------------------+--------------------+-----------------+-------+---------------+
|                   75|(212,[58],[1.0])|             52467| (17894,[287],[1.0])|                 144|(857,[156],[1.0])|      9| (10,[0],[1.0])|
|                  218|(212,[40],[1.0])|             39100| (17894,[217],[1.0])|                  17|(857,[127],[1.0])|      9| (10,[0],[1.0])|
|                  218|(212,[40],[1.0])|             39100| (17894,[217],[1.0])|                  17|(857,[127],[1.0])|      9| (10,[0],[1.0])|
|                   66| (212,[0],[1.0])|              4948| (17894,[150],[1.0])|                 220|  (857,[4],[1.0])|      9| (10,[0],

# Standard Scaler

In [8]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

# Creating a VectorAssembler to assemble the features into a single vector
vectorAssembler = VectorAssembler(inputCols=["num_nights", "similar_srch", "num_room", "num_adults", "num_children"], outputCol="features")
df_hotel = vectorAssembler.transform(df_hotel)

# Creating a StandardScaler object to scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Fitting the StandardScaler on the data
scaler_model = scaler.fit(df_hotel)

# Transforming the data using the StandardScaler
df_hotel_scaled = scaler_model.transform(df_hotel)

# Displaying the scaled data
df_hotel_scaled.select("features", "scaled_features").show()

+--------------------+--------------------+
|            features|     scaled_features|
+--------------------+--------------------+
|[5.0,1.0,1.0,2.0,...|[1.61125150707184...|
|[3.0,3.0,1.0,2.0,...|[0.96675090424310...|
|[5.0,1.0,1.0,2.0,...|[1.61125150707184...|
|[5.0,1.0,1.0,1.0,...|[1.61125150707184...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,1.0,1.0,2.0,...|[0.64450060282873...|
|[2.0,1.0,1.0,2.0,...|[0.64450060282873...|
|[3.0,1.0,1.0,4.0,...|[0.96675090424310...|
|[2.0,2.0,2.0,2.0,...|[0.64450060282873...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,2.0,1.0,1.0,...|[0.64450060282873...|
|[3.0,1.0,1.0,2.0,...|[0.96675090424310...|
|[3.0,1.0,1.0,2.0,...|[0.96675090424310...|
|[3.0,4.0,1.0,2.0,...|[0.96675090424310...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,2.0,2.0,2.0,...|[0.64450060282873...|
|[9.0,3.0,2.0,3.0,...|[2.90025271272932...|
|[2.0,2.0,1.0,1.0,...|[0.64450060282873...|
|[5.0,1.0,1.0,4.0,...|[1.61125150707184...|
|[2.0,1.0,1.0,2.0,...|[0.6445006

## Doing Everything -> Creating a pipeline

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Creating StringIndexer objects for each categorical variable
indexer_country = StringIndexer(inputCol="user_location_country", outputCol="country_index")
indexer_city = StringIndexer(inputCol="user_location_city", outputCol="city_index")
indexer_region = StringIndexer(inputCol="user_location_region", outputCol="region_index")
indexer_channel = StringIndexer(inputCol="channel", outputCol="channel_index")

# Creating OneHotEncoder objects for each categorical variable
encoder_country = OneHotEncoder(inputCol="country_index", outputCol="country_encoded")
encoder_city = OneHotEncoder(inputCol="city_index", outputCol="city_encoded")
encoder_region = OneHotEncoder(inputCol="region_index", outputCol="region_encoded")
encoder_channel = OneHotEncoder(inputCol="channel_index", outputCol="channel_encoded")

# Creating a VectorAssembler to assemble the features into a single vector
vectorAssembler = VectorAssembler(inputCols=["num_nights", "similar_srch", "num_room", "num_adults", "num_children"], outputCol="assembled_features")
df_hotel_assembled = vectorAssembler.transform(df_hotel_encoded)

# Creating a StandardScaler object to scale the features
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")

# Fitting the StandardScaler on the data
scaler_model = scaler.fit(df_hotel_assembled)

# Transforming the data using the StandardScaler
df_hotel_scaled = scaler_model.transform(df_hotel_assembled)

# Displaying the scaled data
df_hotel_scaled.select("assembled_features", "scaled_features").show()

+--------------------+--------------------+
|  assembled_features|     scaled_features|
+--------------------+--------------------+
|[5.0,1.0,1.0,2.0,...|[1.61125150707184...|
|[3.0,3.0,1.0,2.0,...|[0.96675090424310...|
|[5.0,1.0,1.0,2.0,...|[1.61125150707184...|
|[5.0,1.0,1.0,1.0,...|[1.61125150707184...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,1.0,1.0,2.0,...|[0.64450060282873...|
|[2.0,1.0,1.0,2.0,...|[0.64450060282873...|
|[3.0,1.0,1.0,4.0,...|[0.96675090424310...|
|[2.0,2.0,2.0,2.0,...|[0.64450060282873...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,2.0,1.0,1.0,...|[0.64450060282873...|
|[3.0,1.0,1.0,2.0,...|[0.96675090424310...|
|[3.0,1.0,1.0,2.0,...|[0.96675090424310...|
|[3.0,4.0,1.0,2.0,...|[0.96675090424310...|
|[1.0,1.0,1.0,2.0,...|[0.32225030141436...|
|[2.0,2.0,2.0,2.0,...|[0.64450060282873...|
|[9.0,3.0,2.0,3.0,...|[2.90025271272932...|
|[2.0,2.0,1.0,1.0,...|[0.64450060282873...|
|[5.0,1.0,1.0,4.0,...|[1.61125150707184...|
|[2.0,1.0,1.0,2.0,...|[0.6445006

# Training 

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.recommendation import ALS

# Criando um vetor com os atributos que serão usados para fazer as recomendações
vectorAssembler = VectorAssembler(inputCols=["num_nights", "similar_srch", "num_room", "num_adults", "num_children", "channel", "mobile", "booking", "package", "user_location_country", "user_location_city", "user_location_region", "scaled_features"], outputCol="features")
df_hotels = vectorAssembler.transform(df_hotel_scaled)

# Separando os dados em conjuntos de treinamento e teste
#(training, test) = df_hotels.randomSplit([0.8, 0.2])

training = df_hotels.sampleBy("booking", fractions={0: 0.8, 1: 0.8}, seed=42)
test = df_hotels.subtract(training)


# Configurando o modelo ALS (Alternating Least Squares) para fazer as recomendações
als = ALS(userCol="user_id", itemCol="Id_hotel", ratingCol="booking", coldStartStrategy="drop")
model = als.fit(training)

#recommendations = model.recommendForAllUsers(10)

user_subset = df_hotels.filter(col("booking") > 0).select('user_id')
recommendations = model.recommendForUserSubset(user_subset, 10)


23/04/09 15:33:11 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/09 15:33:11 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/04/09 15:33:11 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [11]:
df_hotels.show(2, vertical=True)

23/04/09 15:33:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
-RECORD 0-------------------------------------
 num_nights            | 5                    
 similar_srch          | 1                    
 num_room              | 1                    
 num_adults            | 2                    
 num_children          | 0                    
 channel               | 9                    
 mobile                | 0                    
 booking               | 0                    
 package               | 0                    
 user_location_country | 75                   
 user_location_city    | 52467                
 user_location_region  | 144                  
 Id_hotel              | 301431475            
 user_id               | 1145960              
 country_index         | 58.0                 
 city_index            | 287.0                
 region_index        

In [12]:
recommendations.filter(recommendations.user_id == 336709).show(truncate=False)



+-------+---------------+
|user_id|recommendations|
+-------+---------------+
+-------+---------------+





----------------------------

# Using the model with new user

In [13]:
from pyspark.sql import Row

# criar um novo DataFrame para um novo usuário
new_user = Row(user_id=999, num_nights=3, similar_srch=2,num_room=1,num_adult=2, num_children=0,
               channel=1, mobile=0, booking=0, package=1, user_location_country=50, user_location_city=1234,
               user_location_region=123)

new_user_df = spark.createDataFrame([new_user])

# gerar recomendações para o novo usuário
new_user_recs = model.recommendForUserSubset(new_user_df, 10)
new_user_recs.show()

+-------+---------------+
|user_id|recommendations|
+-------+---------------+
+-------+---------------+





# Validar o modelo

## Performance

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col


from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test)
predictions = predictions.withColumn("prediction", col("prediction").cast("double"))
# Avalia as previsões do modelo nos dados de teste
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="booking", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2%}".format(accuracy))

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="booking", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision: {:.2%}".format(precision))

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="booking", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall: {:.2%}".format(recall))

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="booking", metricName="f1")
f1 = evaluator.evaluate(predictions)
print("F1: {:.2%}".format(f1))


23/04/09 15:33:18 WARN DAGScheduler: Broadcasting large task binary with size 1535.5 KiB
23/04/09 15:33:18 WARN DAGScheduler: Broadcasting large task binary with size 1556.9 KiB


                                                                                

23/04/09 15:33:21 WARN DAGScheduler: Broadcasting large task binary with size 1729.7 KiB


[Stage 216:>                                                        (0 + 6) / 6]

23/04/09 15:33:24 WARN DAGScheduler: Broadcasting large task binary with size 1605.5 KiB
23/04/09 15:33:24 WARN DAGScheduler: Broadcasting large task binary with size 1670.1 KiB


                                                                                

Accuracy: 85.21%
23/04/09 15:33:25 WARN DAGScheduler: Broadcasting large task binary with size 1535.5 KiB
23/04/09 15:33:25 WARN DAGScheduler: Broadcasting large task binary with size 1556.9 KiB


                                                                                

23/04/09 15:33:26 WARN DAGScheduler: Broadcasting large task binary with size 1729.7 KiB




23/04/09 15:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1605.5 KiB
23/04/09 15:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1670.1 KiB


                                                                                

Precision: 86.75%
23/04/09 15:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1535.5 KiB
23/04/09 15:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1556.9 KiB


                                                                                

23/04/09 15:33:29 WARN DAGScheduler: Broadcasting large task binary with size 1729.7 KiB


[Stage 392:>                                                        (0 + 6) / 6]

23/04/09 15:33:31 WARN DAGScheduler: Broadcasting large task binary with size 1605.5 KiB
23/04/09 15:33:31 WARN DAGScheduler: Broadcasting large task binary with size 1670.1 KiB


                                                                                

Recall: 85.21%
23/04/09 15:33:31 WARN DAGScheduler: Broadcasting large task binary with size 1535.5 KiB
23/04/09 15:33:31 WARN DAGScheduler: Broadcasting large task binary with size 1556.9 KiB


                                                                                

23/04/09 15:33:32 WARN DAGScheduler: Broadcasting large task binary with size 1729.7 KiB


[Stage 480:>                                                        (0 + 6) / 6]

23/04/09 15:33:33 WARN DAGScheduler: Broadcasting large task binary with size 1605.5 KiB
23/04/09 15:33:33 WARN DAGScheduler: Broadcasting large task binary with size 1670.1 KiB
F1: 85.97%


                                                                                

In [15]:
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("f1_score", f1)

Accuracy:  0.8521256931608133
Precision:  0.8674667449198459
Recall:  0.8521256931608133
f1_score 0.8597277877470005


# Saving the model

In [None]:
import pickle
from pyspark.serializers import PickleSerializer

with open('model.pkl', 'wb') as f:
    pickle.dump(model, f)