<a href="https://colab.research.google.com/github/CristValen/Acciones-RNR/blob/main/Untitled10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

df8 = df_2.withColumnRenamed('Malo_Dias_tot', 'label')
df8 = df8.withColumn("label", col("label").cast(DoubleType()))

# Set the seed for reproducibility
seed = 12345

# Split the data into training and test sets
train, test = df8.randomSplit([0.7, 0.3], seed=seed)

# Define features and label
features = df8.columns
features.remove('label')

assembler = VectorAssembler(inputCols=features, outputCol="features")

# Create the Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=seed)

# Create the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Define the parameter grid for cross-validation
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]).build()

# Define the evaluator for cross-validation
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

# Create the cross-validator object
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)

# Fit the model on the training data
model = cv.fit(train)

# Make predictions on the training data
predictions_train = model.transform(train)

# Calculate ROC-AUC and accuracy metrics for training data
evaluator_train_roc_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
roc_auc_train = evaluator_train_roc_auc.evaluate(predictions_train)
evaluator_train_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_train = evaluator_train_accuracy.evaluate(predictions_train)

print(f"Training ROC-AUC: {roc_auc_train:.3f}")
print(f"Training Accuracy: {accuracy_train:.3f}")

# Calculate the confusion matrix for training data
predictionAndLabels_train = predictions_train.select("prediction", "label").rdd
metrics_train = MulticlassMetrics(predictionAndLabels_train)
confusion_matrix_train = metrics_train.confusionMatrix().toArray()
print(f"Training Confusion matrix:\n{confusion_matrix_train}")

# Manually calculate recall and F1 score for training data
TP_train = confusion_matrix_train[1, 1]
FP_train = confusion_matrix_train[0, 1]
FN_train = confusion_matrix_train[1, 0]
precision_manual_train = TP_train / (TP_train + FP_train)
recall_manual_train = TP_train / (TP_train + FN_train)
f1_manual_train = 2 * (precision_manual_train * recall_manual_train) / (precision_manual_train + recall_manual_train)
print(f"Training Recall (manually calculated): {recall_manual_train:.3f}")
print(f"Training F1 (manually calculated): {f1_manual_train:.3f}")

def calc_ks(data):
    data_pd=data.toPandas()
    data_pd['good']=(data_pd['label']==0).astype(int)
    data_pd['bad']=(data_pd['label']==1).astype(int)
    data_pd['bucket']=(data_pd['score'].rank(pct=True)*10).astype(int)
    grouped=data_pd.groupby('bucket',as_index=True)
    kstable=grouped.min().score.to_frame(name='min_score')
    kstable['max_score']=grouped.max().score
    kstable['bads']=grouped.sum().bad
    kstable['goods']=grouped.sum().good
    kstable=kstable.reset_index()
    kstable['bad_rate']=kstable.bads/(kstable.bads+kstable.goods)
    kstable['ks']=(kstable.bads/kstable.bads.sum()).cumsum()-(kstable.goods/kstable.goods.sum()).cumsum()
    ks_value=kstable.ks.abs().max()
    return ks_value

# Define a user-defined function to extract the probability of class 1
extract_probability = udf(lambda v: float(v[1]), DoubleType())

# Create a new column with the probability of class 1 in the predictions DataFrame
predictions_train = predictions_train.withColumn('score', extract_probability('probability'))

# Calculate the KS statistic for the training data
ks_value = calc_ks(predictions_train)
print(f"Training KS: {ks_value:.3f}")


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.sql.functions import col, udf, when, percent_rank
from pyspark.sql.types import DoubleType

df8 = df_2.withColumnRenamed('Malo_Dias_tot', 'label')
df8 = df8.withColumn("label", col("label").cast(DoubleType()))

# Set the seed for reproducibility
seed = 12345

# Split the data into training and test sets
train, test = df8.randomSplit([0.7, 0.3], seed=seed)

# Define features and label
features = df8.columns
features.remove('label')

assembler = VectorAssembler(inputCols=features, outputCol="features")

# Create the Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=seed)

# Create the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Define the parameter grid for cross-validation
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]).build()

# Define the evaluator for cross-validation
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

# Create the cross-validator object
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)

# Fit the model on the training data
model = cv.fit(train)

# Make predictions on the training data
predictions_train = model.transform(train)

# Make predictions on the test data
predictions_test = model.transform(test)

# Create a UDF to extract the score from the probability column
def extract_score(vector):
    return float(vector[1])

extract_score_udf = udf(extract_score, DoubleType())

# Create the score column in the predictions DataFrames
predictions_train = predictions_train.withColumn('score', extract_score_udf('probability'))
predictions_test = predictions_test.withColumn('score', extract_score_udf('probability'))

# Convert the predictions to an RDD and calculate metrics using MulticlassMetrics
predictionAndLabels_train = predictions_train.select("prediction", "label").rdd
metrics_train = MulticlassMetrics(predictionAndLabels_train)
confusion_matrix_train = metrics_train.confusionMatrix().toArray()
print(f"Training Confusion matrix:\n{confusion_matrix_train}")

predictionAndLabels_test = predictions_test.select("prediction", "label").rdd
metrics_test = MulticlassMetrics(predictionAndLabels_test)
confusion_matrix_test = metrics_test.confusionMatrix().toArray()
print(f"Test Confusion matrix:\n{confusion_matrix_test}")

# Manually calculate recall and F1 score for training data
TP_train = confusion_matrix_train[1, 1]
FP_train = confusion_matrix_train[0, 1]
FN_train = confusion_matrix_train[1, 0]

precision_train = TP_train / (TP_train + FP_train)
recall_train = TP_train / (TP_train + FN_train)
f1_score_train = 2 * (precision_train * recall_train) / (precision_train + recall_train)

print(f"Training Precision: {precision_train}")
print(f"Training Recall: {recall_train}")
print(f"Training F1 Score: {f1_score_train}")

# Manually calculate recall and F1 score for test data
TP_test = confusion_matrix_test[1, 1]
FP_test = confusion_matrix_test[0, 1]
FN_test = confusion_matrix_test[1, 0]

precision_test = TP_test / (TP_test + FP_test)
recall_test = TP_test / (TP_test + FN_test)
f1_score_test = 2 * (precision_test * recall_test) / (precision_test + recall_test)

print(f"Test Precision: {precision_test}")
print(f"Test Recall: {recall_test}")
print(f"Test F1 Score: {f1_score_test}")

def calc_ks(data):
    data_pd=data.withColumn('good', when(col('label') == 0, 1).otherwise(0)) \
                .withColumn('bad', when(col('label') == 1, 1).otherwise(0)) \
                .withColumn('bucket', (percent_rank().over(Window.orderBy('score'))*10).cast(IntegerType()))
    grouped=data_pd.groupBy('bucket')
    kstable=grouped.agg(min(col('score')).alias('min_score'), max(col('score')).alias('max_score'), sum(col('bad')).alias('bads'), sum(col('good')).alias('goods'))
    kstable=kstable.withColumn('bad_rate', col('bads')/(col('bads')+col('goods')))
    kstable=kstable.withColumn('ks', (sum(col('bads')).over(Window.orderBy('bucket'))/kstable.select(sum(col('bads'))).collect()[0][0])-(sum(col('goods')).over(Window.orderBy('bucket'))/kstable.select(sum(col('goods'))).collect()[0][0]))
    ks_value=kstable.select(max(abs(col('ks')))).collect()[0][0]
    return ks_value

ks_value_train=calc_ks(predictions_train)
ks_value_test=calc_ks(predictions_test)

print(f"Training KS: {ks_value_train}")
print(f"Test KS: {ks_value_test}")



In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from imblearn.under_sampling import NearMiss
import pandas as pd

df8 = df_2.withColumnRenamed('Malo_Dias_tot', 'label')
df8 = df8.withColumn("label", col("label").cast(DoubleType()))

# Set the seed for reproducibility
seed = 12345

# Split the data into training and test sets
train, test = df8.randomSplit([0.7, 0.3], seed=seed)

# Convert the training data to a Pandas DataFrame
train_pd = train.toPandas()

# Define features and label
features = df8.columns
features.remove('label')

# Separate the features and label
X = train_pd[features]
y = train_pd['label']

# Perform NearMiss undersampling
nm = NearMiss()
X_resampled, y_resampled = nm.fit_resample(X, y)

# Convert the resampled data back to a PySpark DataFrame
train_undersampled_pd = pd.concat([X_resampled, y_resampled], axis=1)
train_undersampled = spark.createDataFrame(train_undersampled_pd)

assembler = VectorAssembler(inputCols=features, outputCol="features")

# Create the Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=seed)

# Create the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Define the parameter grid for cross-validation
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]).build()

# Define the evaluator for cross-validation
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

# Create the cross-validator object
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)

# Fit the model on the undersampled training data
model = cv.fit(train_undersampled)

# Make predictions on the original training data
predictions_train_original = model.transform(train)

# Calculate ROC-AUC and accuracy metrics for original training data
evaluator_train_original_roc_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
roc_auc_train_original = evaluator_train_original_roc_auc.evaluate(predictions_train_original)
evaluator_train_original_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_train_original = evaluator_train_original_accuracy.evaluate(predictions_train_original)

print(f"Training (original) ROC-AUC: {roc_auc_train_original:.3f}")
print(f"Training (original) Accuracy: {accuracy_train_original:.3f}")

# Calculate the confusion matrix for original training data
predictionAndLabels_train_original = predictions_train_original.select("prediction", "label").rdd
metrics_train_original = MulticlassMetrics(predictionAndLabels_train_original)
confusion_matrix_train_original = metrics_train_original.confusionMatrix().toArray()
print(f"Training (original) Confusion matrix:\n{confusion_matrix_train_original}")

# Manually calculate recall and F1 score for original training data
TP_train_original = confusion_matrix_train_original[1, 1]
FP_train_original = confusion_matrix_train_original[0, 1]
FN_train_original = confusion_matrix_train_original[1, 0]
precision_manual_train_original = TP_train_original / (TP_train_original + FP_train_original)
recall_manual_train_original = TP_train_original / (TP_train_original + FN_train_original)
f1_manual_train_original = 2 * (precision_manual_train_original * recall_manual_train_original) / (precision_manual_train_original + recall_manual_train_original)
print(f"Training (original) Recall (manually calculated): {recall_manual_train_original:.3f}")
print(f"Training (original) F1 (manually calculated): {f1_manual_train_original:.3f}")

def calc_ks(data):
    data_pd=data.withColumn('good', when(col('label') == 0, 1).otherwise(0)) \
                .withColumn('bad', when(col('label') == 1, 1).otherwise(0)) \
                .withColumn('bucket', (percent_rank().over(Window.orderBy('score'))*10).cast(IntegerType()))
    grouped=data_pd.groupBy('bucket')
    kstable=grouped.agg(min(col('score')).alias('min_score'), max(col('score')).alias('max_score'), sum(col('bad')).alias('bads'), sum(col('good')).alias('goods'))
    kstable=kstable.withColumn('bad_rate', col('bads')/(col('bads')+col('goods')))
    kstable=kstable.withColumn('ks', (sum(col('bads')).over(Window.orderBy('bucket'))/kstable.select(sum(col('bads'))).collect()[0][0])-(sum(col('goods')).over(Window.orderBy('bucket'))/kstable.select(sum(col('goods'))).collect()[0][0]))
    ks_value=kstable.select(max(abs(col('ks')))).collect()[0][0]
    return ks_value

ks_value_train_original=calc_ks(predictions_train_original)
print(f"Training (original) KS: {ks_value_train_original:.3f}")

# Convert the test data to a Pandas DataFrame
test_pd = test.toPandas()

# Separate the features and label in the test data
X_test = test_pd[features]
y_test = test_pd['label']

# Perform NearMiss undersampling on the test data
X_test_resampled, y_test_resampled = nm.fit_resample(X_test, y_test)

# Convert the resampled test data back to a PySpark DataFrame
test_undersampled_pd = pd.concat([X_test_resampled, y_test_resampled], axis=1)
test_undersampled = spark.createDataFrame(test_undersampled_pd)

# Make predictions on the undersampled test data
predictions_test = model.transform(test_undersampled)

# Calculate ROC-AUC and accuracy metrics for test data
evaluator_test_roc_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
roc_auc_test = evaluator_test_roc_auc.evaluate(predictions_test)
evaluator_test_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_test = evaluator_test_accuracy.evaluate(predictions_test)

print(f"Test ROC-AUC: {roc_auc_test:.3f}")
print(f"Test Accuracy: {accuracy_test:.3f}")

# Calculate the confusion matrix for test data
predictionAndLabels_test = predictions_test.select("prediction", "label").rdd
metrics_test = MulticlassMetrics(predictionAndLabels_test)
confusion_matrix_test = metrics_test.confusionMatrix().toArray()
print(f"Test Confusion matrix:\n{confusion_matrix_test}")

# Manually calculate recall and F1 score for test data
TP_test = confusion_matrix_test[1, 1]
FP_test = confusion_matrix_test[0, 1]
FN_test = confusion_matrix_test[1, 0]
precision_manual_test = TP_train / (TP_train + FP_train)
recall_manual_train = TP_train / (TP_train + FN_train)
f1_manual_train = 2 * (precision_manual_train * recall_manual_train) / (precision_manual_train + recall_manual_train)
print(f"Test Recall (manually calculated): {recall_manual_train:.3f}")
print(f"Test F1 (manually calculated): {f1_manual_train:.3f}")

ks_value_test=calc_ks(predictions_test)
print(f"Test KS: {ks_value_train:.3f}")

# Get the most important features
importances = model.bestModel.stages[-1].featureImportances
important_features = sorted(zip(importances, features), reverse=True)
print("Most important features:")
for importance, feature in important_features:
    print(f"{feature}: {importance:.3f}")
