## Projeto de Predição de doenças cardiacas

### Instalando Livrarias

In [4]:
!pip install -q pyspark
# !pip install -q handyspark

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import urllib.request
from pyspark.sql.functions import col, count, isnan, when, round
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Imputer
from pyspark.ml import Pipeline

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, UnivariateFeatureSelector
from pyspark.ml.evaluation import  BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

In [None]:
!pip list

### Criando a Sessão em Spark

In [7]:
#Create Session
conf = SparkConf() \
    .set("spark.executor.instances", "2") \
    .set("spark.executor.memory", "2g") \
    .set("spark.driver.memory", "2g") \
    .setAppName("MeuAPP")

spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# Obter o contexto Spark da sessão
# sc = spark.sparkContext

# # Parar a sessão Spark quando não for mais necessária
# spark.stop()

# # Parar o contexto Spark quando não for mais necessário
# sc.stop()

In [8]:
# Imprimir informações sobre a sessão Spark
print("Session Spark:")
print(" - ID da aplicação:", spark.sparkContext.applicationId)
print(" - Nome da aplicação:", spark.sparkContext.appName)

# Imprimir informações sobre o contexto Spark
# print("\nContexto Spark:")
# print(" - Versão do Spark:", sc.version)
# print(" - Modo de execução:", sc.master)

# # Print the Python version of SparkContext
# print("\nThe Python version of Spark Context in the PySpark shell is", sc.pythonVer)

Session Spark:
 - ID da aplicação: local-1711583145961
 - Nome da aplicação: MeuAPP


In [9]:
spark

### Importando os dados

In [10]:
#Link para o conjunto de dados Iris
# link_dados = "https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.cleveland.data"
# local_path = "./data/raw/heard_disease_raw_data.csv"  # Caminho local onde o arquivo será salvo

# # Baixar o arquivo CSV do URL
# urllib.request.urlretrieve(link_dados, local_path)

# # Nomes colunas
# nomes_colunas=["age", "sex", "cp", "trestbps", "chol", "fbs", 
#                 "restecg", "thalach", "exang", "oldpeak",
#                 "slope","ca","thal","num"
#               ]

local_path = "./data/raw/hotel_booking.csv" 
# work/data/raw/hotel_booking.csv
# Carregar os dados como um DataFrame Spark
df = spark.read.csv(local_path, header=True, inferSchema=True)

# Atribuir nomes às colunas
# df = df.toDF()

# Exibir o esquema dos dados
print("Esquema dos Dados:")
df.printSchema()

# Exibir a quantidade de colunas e linhas
print("Shape of the dataset: ", (df.count(), len(df.columns)))

# Exibir as primeiras 5 linhas dos dados
print("\nPrimeiras 5 linhas dos Dados:")
df.show(5)

Esquema dos Dados:
root
 |-- hotel: string (nullable = true)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: double (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = true)
 |-- assigned_room_t

In [11]:
type(df)

pyspark.sql.dataframe.DataFrame

### Processamento dos dados

In [12]:
df.groupBy('is_canceled').agg(count("*").alias("count")).show()

+-----------+-----+
|is_canceled|count|
+-----------+-----+
|          1|44224|
|          0|75166|
+-----------+-----+



In [13]:
df = df.drop('reservation_status_date')

In [18]:
null_df = df.select([count(when(
                                col(c).contains('None') | \
                                col(c).contains('NULL') | \
                                (col(c) == '') | \
                                col(c).isNull() | \
                                isnan(col(c)), c ))
           .alias(c)
           for c in df.columns])

# expresoes_condicionais = [when (col(coluna) >0,coluna).alias(coluna) for coluna in df.columns ]
colunas_com_soma_maior_que_1 = [coluna for coluna in null_df.columns if null_df.select(coluna).collect()[0][0] > 1]

null_df.select(*colunas_com_soma_maior_que_1).show()
# null_df.show()

+--------+-------+-----+-------+
|children|country|agent|company|
+--------+-------+-----+-------+
|       4|    488|16340| 112593|
+--------+-------+-----+-------+



In [19]:
colunas_com_soma_maior_que_1

['children', 'country', 'agent', 'company']

In [20]:
df=df.drop(*colunas_com_soma_maior_que_1)


In [21]:
df.printSchema()

root
 |-- hotel: string (nullable = true)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = true)
 |-- assigned_room_type: string (nullable = true)
 |-- booking_changes: integer (nullable = true)
 |-- deposit_type: s

In [12]:
# Deletando linhas com erro
# df_clean = df.filter(col("ca")!="?").filter(col("thal")!='?')

In [24]:
print(f"Qtd registros: {df.count()}")
print(f"Qtd colunas: {len(df.columns)}")

Qtd registros: 119390
Qtd colunas: 31


In [25]:
df_uniques = spark.createDataFrame([(i, len(df.select(i).distinct().collect())) for i  in df.columns],
                      ['Nome_Column', 'NUnicos'])\
                        .orderBy(['NUnicos'],ascending=True)\
                        .withColumn('Repre_Total',round((col('NUnicos')/df.count())*100,2))
df_uniques.show(31)

+--------------------+-------+-----------+
|         Nome_Column|NUnicos|Repre_Total|
+--------------------+-------+-----------+
|   is_repeated_guest|      2|        0.0|
|               hotel|      2|        0.0|
|         is_canceled|      2|        0.0|
|   arrival_date_year|      3|        0.0|
|  reservation_status|      3|        0.0|
|        deposit_type|      3|        0.0|
|       customer_type|      4|        0.0|
|              babies|      5|        0.0|
|distribution_channel|      5|        0.0|
|                meal|      5|        0.0|
|required_car_park...|      5|        0.0|
|total_of_special_...|      6|       0.01|
|      market_segment|      8|       0.01|
|  reserved_room_type|     10|       0.01|
|  arrival_date_month|     12|       0.01|
|  assigned_room_type|     12|       0.01|
|              adults|     14|       0.01|
|previous_cancella...|     15|       0.01|
|stays_in_weekend_...|     17|       0.01|
|     booking_changes|     21|       0.02|
|arrival_da

In [26]:
# Colunas con valores distintos e valores constantes
col_valores_dispersos = ['credit_card' ,'name' ,'email' ,'phone-number', 'adr']
df_clean = df.drop(*col_valores_dispersos)

In [27]:
df.select(col('is_canceled')).distinct().show(3)

+-----------+
|is_canceled|
+-----------+
|          1|
|          0|
+-----------+



In [28]:
# TRocando o nom da label
df_clean=df_clean.withColumnRenamed('is_canceled', 'label')

In [29]:
df_clean.first()

Row(hotel='Resort Hotel', label=0, lead_time=342, arrival_date_year=2015, arrival_date_month='July', arrival_date_week_number=27, arrival_date_day_of_month=1, stays_in_weekend_nights=0, stays_in_week_nights=0, adults=2, babies=0, meal='BB', market_segment='Direct', distribution_channel='Direct', is_repeated_guest=0, previous_cancellations=0, previous_bookings_not_canceled=0, reserved_room_type='C', assigned_room_type='C', booking_changes=3, deposit_type='No Deposit', days_in_waiting_list=0, customer_type='Transient', required_car_parking_spaces=0, total_of_special_requests=0, reservation_status='Check-Out')

In [30]:
# Criando a label
df_clean = df_clean.withColumn('label', col('label').cast("double"))

### Feature enginnering

In [32]:
# Divida o DataFrame em 70% para treino e 30% para teste
train_df, test_df = df_clean.randomSplit([0.7, 0.3],seed=24)

# Imprima o tamanho de cada subconjunto
print("Tamanho do conjunto de treino:", train_df.count())
print("Tamanho do conjunto de teste:", test_df.count())

Tamanho do conjunto de treino: 83594
Tamanho do conjunto de teste: 35796


In [33]:
train_df.groupBy('label').count().withColumn('porcent', round(col('count')/train_df.count(),4)).show()
test_df.groupBy('label').count().withColumn('porcent', round(col('count')/test_df.count(),4)).show()

+-----+-----+-------+
|label|count|porcent|
+-----+-----+-------+
|  0.0|52695| 0.6304|
|  1.0|30899| 0.3696|
+-----+-----+-------+

+-----+-----+-------+
|label|count|porcent|
+-----+-----+-------+
|  0.0|22471| 0.6278|
|  1.0|13325| 0.3722|
+-----+-----+-------+



In [34]:
# Crianndo Class weight // Para os dados de treino
# Conta o número de ocorrências de cada classe
class_counts = train_df.groupBy('label').count().collect()
# Classes no dataframe
unique_classes = [row[0] for row in class_counts]
# qtd Clases 
qtd_classes = len(unique_classes)
# Qtd de registros por Classe
class_counts = [row[1] for row in class_counts]
# Calcula o número total de amostras no dataframe
total_samples = train_df.count()

# Calcula os pesos de classe
class_weights = {}
for class_label, class_count in zip(unique_classes, class_counts):
    class_weight = total_samples / (qtd_classes* class_count)
    class_weights[class_label] = class_weight

# Criando uma coluna ao Dataframe
train_df_w = train_df.withColumn('weight',when(col('label') == 0.0, class_weights[0.0])\
                    .when(col('label') == 1.0, class_weights[1.0])\
                    .otherwise(None))

### Criando modelo

In [23]:
# from pyspark.sql.functions import sample_by

# # Definindo a coluna de data
# data_column = "data"

# # Definindo a proporção de divisão (80% treino, 20% teste)
# train_ratio = 0.8

# # Amostragem estratificada por data
# df_train = df.sample_by(data_column, fractions={train_ratio: "train", 1-train_ratio: "test"})

# # Separando os conjuntos de treino e teste
# X_train = df_train.filter(df_train.split == "train")
# X_test = df_train.filter(df_train.split == "test")

# # Extraindo as labels
# y_train = X_train[target_column]
# y_test = X_test[target_column]

In [24]:
# pipeline = Pipeline(stages=stages)

In [25]:
# pipeline_trained = pipeline.fit(df_clean)
# df_trannsformed = pipeline_trained.transform(df_clean)

In [26]:
# colunas_trannsformadas = [encoder.getOutputCol() for encoder in encoders]+imputer.getOutputCols()
# df_trannsformed.select(*colunas_trannsformadas, col(assembler.getOutputCol()), col(label)).show(3)

In [35]:
label = 'label'
weight = 'weight'
colunas_categoricas = [coluna for coluna, tipo in df_clean.dtypes if tipo in ['string'] and coluna not in (label,weight)]
colunas_numericas = [coluna for coluna, tipo in df_clean.dtypes if tipo in ['int', 'double'] and coluna not in (label,weight)]

In [36]:
# qtd_colunas = 
# qtd_colunas_em listas=
print(f"""Verificação Colunas:
Colunas no dataset: {len(train_df_w.columns)}
Colunas nas listas: {len(colunas_categoricas) + len(colunas_numericas) + len([label,weight])}""")

Verificação Colunas:
Colunas no dataset: 27
Colunas nas listas: 27


In [37]:
stages = []
# Imputer for numeric columns
imputer = Imputer(inputCols=colunas_numericas,
                  outputCols=[f"{c}_imputed" for c in colunas_numericas],
                  strategy= 'mean',
                  missingValue= -666,
                 )
stages.append(imputer)

# StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_indexed", 
                          handleInvalid='keep',
                          stringOrderType='frequencyDesc'
                         ) for c in colunas_categoricas]
stages += indexers

# OneHotEncoder for indexed categorical columns
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), 
                          outputCol=f"{indexer.getOutputCol()}_encoded",
                          handleInvalid = 'keep',
                          dropLast=False
                         ) for indexer in indexers]
stages += encoders

In [38]:
# Criar VectorAssembler para combinar todas as variáveis
assembler = VectorAssembler(inputCols=imputer.getOutputCols()+[encoder.getOutputCol() for encoder in encoders]  #[f"{c}_encoded" for c in colunas_categoricas]
                            , outputCol="features_raw")
stages.append(assembler)

In [41]:
 # Criar UnivariateFeatureSelector para selecionar as melhores características
selector = UnivariateFeatureSelector()\
                  .setFeatureType("continuous")\
                  .setLabelType("categorical")\
                  .setSelectionMode("percentile")\
                  .setFeaturesCol("features_raw")\
                  .setLabelCol("label")\
                  .setOutputCol("features")
                  # .setSelectionThreshold(0.50)
                    # SelectorType (tipo de teste estatistico

stages.append(selector)
# .setSelectionThreshold(1)



# featuresCol="features_raw"
#                                      , labelCol="label"
#                                      # ,selectionMode='percentile'
#                                      # ,selectionThreshold=0.8
#                                      # , selectionMethod="chi2"
#                                      , outputCol="features")

In [57]:
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 5, 10]) \
    .addGrid(gbt.maxIter, [10, 20]) \
    .addGrid(gbt.stepSize, [0.1]) \
    .addGrid(selector.selectionThreshold, [0.70, 0.90]) \
    .build()
# .addGrid(gbt.weightCol, ['weight', "None"]) \
pipeline = Pipeline(stages= [imputer]+indexers+encoders+[assembler, selector, gbt])

evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')

# Criar CrossValidator
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=5,
                    parallelism=2)

In [59]:
# Treinar o modelo e avaliar o desempenho
cv_model = cv.fit(train_df_w)

In [45]:
# Lista de parametros para cada interação do grid
for params in paramGrid:
    print(params)

{Param(parent='GBTClassifier_d5714ef9e724', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2, Param(parent='GBTClassifier_d5714ef9e724', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='GBTClassifier_d5714ef9e724', name='stepSize', doc='Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the contribution of each estimator.'): 0.1, Param(parent='UnivariateFeatureSelector_45833190c056', name='selectionThreshold', doc='The upper bound of the features that selector will select.'): 0.7}
{Param(parent='GBTClassifier_d5714ef9e724', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2, Param(parent='GBTClassifier_d5714ef9e724', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='GBTClassifier_d5714ef9e724'

In [None]:
# tentativa de utilização de tdqm para seguimento do processo 
from tqdm import tqdm

# Criando a barra de progresso
with tqdm(total=len(paramGrid)) as pbar:
  for params in paramGrid:
    # Treinamento e avaliação do pipeline
    cv = CrossValidator(estimator=pipeline, 
                        estimatorParamMaps=params, 
                        evaluator=evaluator, 
                        numFolds=5)
      
    cv_model = cv.fit(df_clean)

    # Atualizando a barra de progresso
    pbar.update()



In [None]:
# import mlflow

# # Registrando os parâmetros do pipeline
# with mlflow.start_run():
#   cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
#   ccv_model = cv.fit(df_clean)

# # Visualizando os parâmetros no dashboard
# mlflow.ui.launch_app()

In [60]:
## 2 maneiras de acessar os parametros:
# bestModel
best_model = cv_model.bestModel
best_model

PipelineModel_ab04e7d1ae1f

In [61]:
# Acessando o melhor modelo de regressão linear dentro do pipeline
melhor_modelo_gbt = best_model.stages[-1]

# Imprimindo os parâmetros do melhor modelo do GBT
print(melhor_modelo_gbt.extractParamMap())

{Param(parent='GBTClassifier_d5714ef9e724', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='GBTClassifier_d5714ef9e724', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='GBTClassifier_d5714ef9e724', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for reg

In [62]:
# Imprimindo os parametros do melhor  Seletor de variaveis
best_model.stages[-2].extractParamMap()

{Param(parent='UnivariateFeatureSelector_45833190c056', name='featuresCol', doc='features column name.'): 'features_raw',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='labelCol', doc='label column name.'): 'label',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='outputCol', doc='output column name.'): 'features',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='selectionMode', doc='The selection mode. Supported options: numTopFeatures (default), percentile, fpr, fdr, fwe.'): 'percentile',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='featureType', doc='The feature type. Supported options: categorical, continuous.'): 'continuous',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='labelType', doc='The label type. Supported options: categorical, continuous.'): 'categorical',
 Param(parent='UnivariateFeatureSelector_45833190c056', name='selectionThreshold', doc='The upper bound of the features that selector will select.

In [63]:
# Predição original com o melhor modelo
predicao_teste = best_model.transform(test_df)
predicao_teste.first()

Row(hotel='Resort Hotel', label=0.0, lead_time=0, arrival_date_year=2015, arrival_date_month='August', arrival_date_week_number=32, arrival_date_day_of_month=4, stays_in_weekend_nights=0, stays_in_week_nights=1, adults=1, babies=0, meal='BB', market_segment='Direct', distribution_channel='Direct', is_repeated_guest=0, previous_cancellations=0, previous_bookings_not_canceled=0, reserved_room_type='F', assigned_room_type='F', booking_changes=0, deposit_type='No Deposit', days_in_waiting_list=0, customer_type='Transient', required_car_parking_spaces=0, total_of_special_requests=0, reservation_status='Check-Out', lead_time_imputed=0, arrival_date_year_imputed=2015, arrival_date_week_number_imputed=32, arrival_date_day_of_month_imputed=4, stays_in_weekend_nights_imputed=0, stays_in_week_nights_imputed=1, adults_imputed=1, babies_imputed=0, is_repeated_guest_imputed=0, previous_cancellations_imputed=0, previous_bookings_not_canceled_imputed=0, booking_changes_imputed=0, days_in_waiting_list_

In [65]:
dt_hyper_eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label")
dt_hyper_AUC  = dt_hyper_eval.evaluate(predicao_teste)
print("AUC = %.2f" % dt_hyper_AUC)

AUC = 1.00


In [67]:
#preds_and_labels = predictionAndLabels.select(['predictions','d']).withColumn('label', F.col('d').cast(FloatType())).orderBy('prediction')
#important: need to cast to float type, and order by prediction, else it won't work

#select only prediction and label columns
# preds_and_labels = predicao_teste.select(['prediction','label']).withColumn('label', col('label').cast(FloatType())).orderBy('prediction')
# metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
# matriz_array = metrics.confusionMatrix().toArray()
# print(matriz_array)

# Criação da Matriz de confução
cm_dt_result = predicao_teste.crosstab("prediction", "label").orderBy(col('prediction_label').asc())
cm_dt_result.show()

#Convertir para listas
novas_colunas = [f"{c}_raname".replace(".",",") for c in cm_dt_result.columns]
cm_dt_result_t = cm_dt_result.toDF(*novas_colunas)
cm_dt_result_t

vals = [linha[:][:] for linha in cm_dt_result_t.select(*novas_colunas[1:]).collect()] 
# cm_dt_result_t.select(col(novas_colunas[1])).collect()

# [(22471, 0), (0, 13325)]
#calculate accuracy, sensitivity, specificity and precision
TP = vals[1][1]
FP = vals[1][0]
TN = vals[0][0]
FN = vals[0][1]

Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )


+----------------+-----+-----+
|prediction_label|  0.0|  1.0|
+----------------+-----+-----+
|             0.0|22471|    0|
|             1.0|    0|13325|
+----------------+-----+-----+

Accuracy = 1.00
Sensitivity = 1.00
Specificity = 1.00
Precision = 1.00


In [69]:
df_teste_prob = predicao_teste.select('probability', 'prediction')

In [70]:
df_teste_prob.first()

Row(probability=DenseVector([0.9341, 0.0659]), prediction=0.0)

In [72]:
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import col, udf

def extract_second_value(probability_struct):
  # Assuming "values" is the field containing the probabilities
  return probability_struct.values[1]

# Register the UDF
extract_second_value_udf = spark.udf.register("extract_second_value", extract_second_value, DoubleType())

df_com_segundo_valor = df_teste_prob.withColumn("segundo_valor", extract_second_value_udf(col("probability")))

# Imprimindo o dataframe
print(df_com_segundo_valor.show())

Py4JJavaError: An error occurred while calling o238275.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14520.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14520.0 (TID 72180) (af9d95f152c7 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$2(BatchEvalPythonExec.scala:67)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$2(BatchEvalPythonExec.scala:67)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


### Predição com diferentes tresholds

In [85]:
# FAzendo predição com o modelo com trheshould modificado
treshold_value = [0.3,0.7]
best_model.stages[-1].setThresholds(treshold_value)
print(f"Threshold: {best_model.stages[-1].getThresholds()}")
# Predição original com o melhor modelo
predicao_teste_treshold = best_model.transform(test_df)

# predicao_teste_thesold = best_model_treshold.transform(test_df)
# predicao_teste_treshold.first()

Threshold: [0.3, 0.7]


In [86]:
dt_hyper_eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label")
dt_hyper_AUC  = dt_hyper_eval.evaluate(predicao_teste_treshold)
print("AUC = %.2f" % dt_hyper_AUC)

AUC = 1.00


In [87]:
# Criação da Matriz de confução
cm_dt_result = predicao_teste_treshold.crosstab("prediction", "label").orderBy(col('prediction_label').asc())
cm_dt_result.show()

#Convertir para listas
novas_colunas = [f"{c}_raname".replace(".",",") for c in cm_dt_result.columns]
cm_dt_result_t = cm_dt_result.toDF(*novas_colunas)
cm_dt_result_t

vals = [linha[:][:] for linha in cm_dt_result_t.select(*novas_colunas[1:]).collect()] 
# cm_dt_result_t.select(col(novas_colunas[1])).collect()

# [(22471, 0), (0, 13325)]
#calculate accuracy, sensitivity, specificity and precision
TP = vals[1][1]
FP = vals[1][0]
TN = vals[0][0]
FN = vals[0][1]

Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )


+----------------+-----+-----+
|prediction_label|  0.0|  1.0|
+----------------+-----+-----+
|             0.0|22471|    0|
|             1.0|    0|13325|
+----------------+-----+-----+

Accuracy = 1.00
Sensitivity = 1.00
Specificity = 1.00
Precision = 1.00


In [93]:
# Salvar a predição
caminho_parquet = 'work/data/Output/predictions_threshold.parquet'
predicao_teste_treshold.write.format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .option("path", caminho_parquet) \
    .save()
