<a href="https://colab.research.google.com/github/Gabrieldpll/Databricks-HotelReservations/blob/main/Treinamento_xgboost_optuna_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ambiente



In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
!pip install xgboost



In [3]:
!pip install optuna

Collecting optuna
  Downloading optuna-4.1.0-py3-none-any.whl.metadata (16 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.14.0-py3-none-any.whl.metadata (7.4 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting Mako (from alembic>=1.5.0->optuna)
  Downloading Mako-1.3.8-py3-none-any.whl.metadata (2.9 kB)
Downloading optuna-4.1.0-py3-none-any.whl (364 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m364.4/364.4 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.14.0-py3-none-any.whl (233 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.5/233.5 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Downloading Mako-1.3.8-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Ma

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [6]:
from pyspark.sql import functions as F
import xgboost

In [7]:
from  pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorAssembler, OneHotEncoder
from pyspark.sql.types import  BooleanType, DateType, DoubleType, IntegerType, StringType
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from xgboost.spark import SparkXGBClassifier


# processamento


In [48]:
dados = (spark.read
         .format('csv')
         .option('header', 'true')
         .option('sep' , ',')

         .option('inferSchema', 'true')  # Para inferir os tipos de dados automaticamente
         .load('/content/hotel_dataframe.csv')
        )


In [49]:
dados.columns

['no_of_adults',
 'no_of_children',
 'no_of_weekend_nights',
 'no_of_week_nights',
 'type_of_meal_plan',
 'required_car_parking_space',
 'room_type_reserved',
 'lead_time',
 'arrival_year',
 'arrival_month',
 'arrival_date',
 'market_segment_type',
 'repeated_guest',
 'no_of_previous_cancellations',
 'no_of_previous_bookings_not_canceled',
 'avg_price_per_room',
 'no_of_special_requests',
 'booking_status',
 'is_duplicated',
 'duplicated_count',
 'data',
 'Trimestre',
 'feriado']

In [50]:
train, test = dados.randomSplit([0.8, 0.2], seed= 13)

In [51]:
#Remover target e atributos relacionados a data da matriz de atribuos
cols_to_use = train.columns
cols_to_use.remove('booking_status')
cols_to_use.remove('data') # Remover data (deve ser usada so para fins de validação/exploratório)
cols_to_use.remove('arrival_date')

In [52]:
# verificar a cardinalidade de cada coluna categorica
cat_cols_df = [col.name for col in train.schema if col.dataType == StringType()]

train.select(*[F.countDistinct(col).alias(col) for col in cat_cols_df]).show()





+-----------------+------------------+-------------------+--------------+---------+
|type_of_meal_plan|room_type_reserved|market_segment_type|booking_status|Trimestre|
+-----------------+------------------+-------------------+--------------+---------+
|                3|                 7|                  5|             2|        3|
+-----------------+------------------+-------------------+--------------+---------+



Como há pouca cardinalidade, vamos utilizar o one hot encoding

In [53]:
stages = []

In [54]:
label_transform  = StringIndexer(inputCol="booking_status", outputCol="label")
stages.append(label_transform)

In [55]:
# colunas categoricas do pipeline , isso é que são codificadas como StringType() e estão na lista de colunas permitidas
cat_cols_pipe = [col.name for col in train.schema if (col.dataType ==  StringType() and col.name in cols_to_use)]
ohe_cat_cols = []

for col in cat_cols_pipe:
    indexer = StringIndexer(inputCol=col , outputCol= f'{col}_idx', handleInvalid = 'keep')
    stages.append(indexer)
    one_hot = OneHotEncoder(inputCol=f'{col}_idx' , outputCol= f'{col}_ohe')
    stages.append(one_hot)
    ohe_cat_cols.append(f'{col}_ohe')

In [56]:
cat_cols_pipe

['type_of_meal_plan', 'room_type_reserved', 'market_segment_type', 'Trimestre']

In [57]:
# verficiar as colunas restantes
not_cat_cols = list(set(cols_to_use) - set(cat_cols_pipe))
train.select(not_cat_cols).printSchema()

root
 |-- required_car_parking_space: integer (nullable = true)
 |-- is_duplicated: boolean (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- feriado: integer (nullable = true)
 |-- no_of_special_requests: integer (nullable = true)
 |-- avg_price_per_room: double (nullable = true)
 |-- no_of_previous_bookings_not_canceled: integer (nullable = true)
 |-- duplicated_count: integer (nullable = true)
 |-- no_of_week_nights: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- no_of_children: integer (nullable = true)
 |-- no_of_previous_cancellations: integer (nullable = true)
 |-- no_of_weekend_nights: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- no_of_adults: integer (nullable = true)



In [58]:
# vamos codificá-las tods como númericas
num_proces = VectorAssembler(inputCols = not_cat_cols , outputCol = 'num_features',handleInvalid = 'keep')
stages.append(num_proces)

In [59]:
# juntar tudo em um vetor de features
columns_pipe = ohe_cat_cols + ['num_features']
final_process =  num_proces = VectorAssembler(inputCols = columns_pipe , outputCol = 'features')
stages.append(final_process)

In [60]:
num_cluster = spark.sparkContext.defaultParallelism
print(f"numero de clusters xgb {num_cluster}")

#xgb = SparkXGBClassifier(num_workers= num_cluster, label_col="label", featuresCol = 'features')

numero de clusters xgb 2


In [61]:
# xgb_model = SparkXGBClassifier(num_workers= num_cluster, label_col="label", features_col  = 'features')
# stages.append(xgb_model)


# Ajuste optuna

In [62]:
import optuna
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder


In [63]:
pipeline = Pipeline(stages= stages)
pipelineModel = pipeline.fit(train)

In [64]:
train_transformed=  pipelineModel.transform(train)
test_transformed =  pipelineModel.transform(test)

In [65]:
# Para otimizar performance na busca dos hiperperâmetros, colocamos as transformações no cache

train_transformed.cache()
train_transformed.take(1)

[Row(no_of_adults=0, no_of_children=1, no_of_weekend_nights=2, no_of_week_nights=4, type_of_meal_plan='Meal Plan 1', required_car_parking_space=0, room_type_reserved='Room_Type 2', lead_time=109, arrival_year=2018, arrival_month=4, arrival_date=1, market_segment_type='Online', repeated_guest=0, no_of_previous_cancellations=0, no_of_previous_bookings_not_canceled=0, avg_price_per_room=73.74, no_of_special_requests=1, booking_status='Not_Canceled', is_duplicated=False, duplicated_count=0, data=datetime.date(2018, 4, 1), Trimestre='terceiro_trimestre', feriado=1, label=0.0, type_of_meal_plan_idx=0.0, type_of_meal_plan_ohe=SparseVector(3, {0: 1.0}), room_type_reserved_idx=3.0, room_type_reserved_ohe=SparseVector(7, {3: 1.0}), market_segment_type_idx=0.0, market_segment_type_ohe=SparseVector(5, {0: 1.0}), Trimestre_idx=0.0, Trimestre_ohe=SparseVector(3, {0: 1.0}), num_features=SparseVector(16, {2: 4.0, 3: 1.0, 4: 1.0, 5: 73.74, 8: 4.0, 9: 109.0, 10: 1.0, 12: 2.0, 13: 2018.0}), features=Spar

In [66]:
# define a forma como as predições do nosso modelo serão avaliadas (area sobre a curva roc)
evaluator = BinaryClassificationEvaluator( rawPredictionCol = 'probability' , labelCol = 'label', metricName ='areaUnderROC' )

In [67]:
params_grid = ParamGridBuilder().build()

In [68]:
def objective(trial):
    param = {
        "max_depth" : trial.suggest_int("max_depth",3,10),
        "learning_rate"  : trial.suggest_float('learning_rate', 1e-3, 1e-1, log = True),
        "n_estimators"  : trial.suggest_categorical('n_estimators', [50,100,200,300]),
        "lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
        "subsample": trial.suggest_float("subsample", 0.2, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
    }

    xgb = SparkXGBClassifier(num_workers= num_cluster,
                                   label_col="label",
                                   features_col  = 'features',
                                   verbose = False ,
                                  seed = 13,
                                   **param)


    cv = CrossValidator(estimator=xgb, estimatorParamMaps=params_grid, evaluator=evaluator, numFolds=5)

    xgb_model = cv.fit(train_transformed)

    pred = xgb_model.transform(train_transformed)

    auc = evaluator.evaluate(pred)

    return auc

In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

[I 2024-12-16 01:21:21,864] A new study created in memory with name: no-name-4b83e4c6-d5dd-40e7-954e-a18fd2aca100
INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 2 workers with
	booster params: {'colsample_bytree': 0.4002676144586085, 'device': 'cpu', 'learning_rate': 0.03859858780901437, 'max_depth': 5, 'objective': 'binary:logistic', 'subsample': 0.5290374909007032, 'seed': 13, 'lambda': 0.003665444552709094, 'alpha': 0.037086590975754896, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': False, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 2 workers with
	booster params: {'colsample_bytree': 0.4002676144586085, 'device': 'cpu', 'learning_rate': 0.03859858780901437, 'max_depth': 5, 'objective': 'binary:logistic', 'subsample': 0.5290374909007032, 'seed': 13, 'lambda': 0.003665444552709094, 'alpha': 0.037086590975754896, 'nthread': 1}
	train_call_kwargs_params: 

In [42]:
# ajustar xgboost com melhores hyperparametros

best_param = study.best_params

best_xgb = SparkXGBClassifier(num_workers= 2,
                                   label_col="label",
                                   features_col  = 'features',
                                  seed = 13,
                                   **best_param)


xgb_model_best = best_xgb.fit(train_transformed)


INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.7584291569453302, 'device': 'cpu', 'learning_rate': 0.09877004287725989, 'max_depth': 10, 'subsample': 0.5820599533678451, 'lambda': 2.9317489853263454e-08, 'alpha': 0.05348336685788292, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


In [43]:
print(best_param)

{'max_depth': 10, 'learning_rate': 0.09877004287725989, 'n_estimators': 300, 'lambda': 2.9317489853263454e-08, 'alpha': 0.05348336685788292, 'subsample': 0.5820599533678451, 'colsample_bytree': 0.7584291569453302}


In [44]:
train_pred_best = xgb_model_best.transform(train_transformed)
test_pred_best = xgb_model_best.transform(test_transformed)

auc_train_best = evaluator.evaluate(train_pred_best)
auc_test_best = evaluator.evaluate(test_pred_best)

In [46]:
xgb = SparkXGBClassifier(num_workers= 2,
                                   label_col="label",
                                   features_col  = 'features',
                                  seed = 13
                                   )

#force_repartition=true
xgb_model = xgb.fit(train_transformed)



train_pred = xgb_model.transform(train_transformed)
test_pred= xgb_model.transform(test_transformed)

auc_train = evaluator.evaluate(train_pred)
auc_test = evaluator.evaluate(test_pred)

INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


In [47]:
print(f'\n  auc train padrao : {auc_train:.3f} auc train optuna {auc_train_best:.3f}  \n auc test padrao {auc_test:.3f} auc test optuna {auc_test_best:.3f}')



  auc train padrao : 0.962 auc train optuna 0.998  
 auc test padrao 0.916 auc test optuna 0.912
