In [None]:
# Load de data

import zipfile
import pandas as pd

def load_data(zip_path):
    try:
        with zipfile.ZipFile(zip_path,"r") as z:
            csv_filename=z.namelist()[0]

            with z.open(csv_filename) as f:
                df=pd.read_csv(f)
        print(f"File {csv_filename} uploaded succesfully")
        return df
    except Exception as e:
        print(f" Error loagind the file {e}")




In [None]:
test_data=load_data("../files/input/test_data.csv.zip")
train_data=load_data("../files/input/train_data.csv.zip")

In [None]:
test_data.shape
train_data.shape


In [None]:
# Step 1
def data_cleaning(dataset, name="dataset"):
    dataset.rename(columns={"default payment next month":"default"}, inplace=True)
    dataset.drop(columns="ID",inplace=True)
    dataset.dropna(inplace=True)
    dataset.loc[dataset["EDUCATION"]>4,"EDUCATION"]=4
    print(f"Data {name} cleaned successfully")
    return dataset


In [None]:
train_data=data_cleaning(train_data,"train_data")
test_data=data_cleaning(test_data,"test_data")

In [None]:
train_data.head()

In [None]:
# Step 2
def split_data(train_data,test_data):
    X_train=train_data.drop(columns="default")
    X_test=test_data.drop(columns="default")

    y_train=train_data["default"]
    y_test=test_data["default"]

    print("Correctly split dataset")

    return X_train,y_train,X_test,y_test


In [None]:
X_train, y_train, X_test, y_test = split_data(train_data,test_data)

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.neural_network import MLPClassifier

def classification_pipeline(X_train, y_train, k_best_features=10):

  
    categorical_cols = ['SEX', 'EDUCATION', 'MARRIAGE']
    numeric_cols = [c for c in X_train.columns if c not in categorical_cols]


    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols),  
            ('scaler', StandardScaler(), numeric_cols), 
        ]
    )


    model = Pipeline([
        ('preprocessor', preprocessor),
        ('feature_selection', SelectKBest(score_func=f_classif)),
        ('pca', PCA()),    
        ('mlp', MLPClassifier(max_iter=15000,random_state=42))  # Red neuronal MLP
    ])

   
    model.fit(X_train, y_train)
    
    return model




In [None]:
model=classification_pipeline(X_train,y_train)

In [None]:
# Step #4

from sklearn.model_selection import GridSearchCV, KFold
import numpy as np

def Hyperparameter_optimization(model, X_train, y_train):

    param_grid = {
    'feature_selection__k': [20],
    'pca__n_components': [None],  
    'mlp__hidden_layer_sizes': [(50, 30, 40)], 
    'mlp__alpha': [0.26],  
    'mlp__learning_rate_init': [0.001],  
}


    
    #cv = KFold(n_splits=10, shuffle=True, random_state=132)

    # GridSearchCV
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring='balanced_accuracy',
        cv=10,
        n_jobs=-1,
        refit=True
    )

    # Ajustar el modelo
    grid_search.fit(X_train, y_train)

    return grid_search


In [None]:
model=Hyperparameter_optimization(model,X_train,y_train)

In [None]:
from sklearn.metrics import precision_score, balanced_accuracy_score, recall_score, f1_score

#predicciones
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)

#métricas para entrenamiento
precision_train = precision_score(y_train, y_pred_train, zero_division=0)
balanced_acc_train = balanced_accuracy_score(y_train, y_pred_train)
recall_train = recall_score(y_train, y_pred_train, zero_division=0)
f1_train = f1_score(y_train, y_pred_train, zero_division=0)

#métricas para prueba
precision_test = precision_score(y_test, y_pred_test, zero_division=0)
balanced_acc_test = balanced_accuracy_score(y_test, y_pred_test)
recall_test = recall_score(y_test, y_pred_test, zero_division=0)
f1_test = f1_score(y_test, y_pred_test, zero_division=0)

#resultados
print({'type': 'metrics', 'dataset': 'train',
       'precision': round(precision_train, 4),
       'balanced_accuracy': round(balanced_acc_train, 4),
       'recall': round(recall_train, 4),
       'f1_score': round(f1_train, 4)})

print({'type': 'metrics', 'dataset': 'test',
       'precision': round(precision_test, 4),
       'balanced_accuracy': round(balanced_acc_test, 4),
       'recall': round(recall_test, 4),
       'f1_score': round(f1_test, 4)})


In [None]:
# Step 5
import gzip 
import pickle
import os

def save_model(model,path="file path.gzip"):
    os.makedirs(os.path.dirname(path),exist_ok=True)

    with gzip.open(path,"wb") as f:
        pickle.dump(model,f)
    print("Model saved!")



In [None]:
save_model(model,"../files/models/model.pkl.gz")

In [None]:
# Step 6

from sklearn.metrics import precision_score, recall_score, f1_score, balanced_accuracy_score
import os
import json

def metrics_evaluation(model,X_train,y_train,X_test,y_test,data_set_name="train or test"):
        
          if data_set_name == "train":
               y_true = y_train
               y_pred = model.predict(X_train)
          elif data_set_name == "test":
               y_true = y_test
               y_pred = model.predict(X_test)
          else:
               raise ValueError("data_set_name must be 'train' o 'test'")

          #Metrics 
          precision = precision_score(y_true, y_pred)
          balanced_accuracy = balanced_accuracy_score(y_true, y_pred)
          recall = recall_score(y_true, y_pred)
          f1 = f1_score(y_true, y_pred)

          return {
          "type": "metrics",
          'dataset': data_set_name,
          'precision': float(precision),
          'balanced_accuracy': float(balanced_accuracy),
          'recall': float(recall),
          'f1_score': float(f1)
          }

def load_metrics(path,metrics_train,metrics_test):
      os.makedirs(os.path.dirname(path),exist_ok=True)

      with open(path,'w',encoding='utf-8') as f:
            json.dump(metrics_train,f)
            f.write('\n')
            json.dump(metrics_test,f)
            f.write('\n')
      print("Metrics Saved!!")



In [None]:
metrics_train=metrics_evaluation(model,X_train,y_train,X_test,y_test,"train")
metrics_test=metrics_evaluation(model,X_train,y_train,X_test,y_test,"test")

load_metrics("../files/output/metrics.json",metrics_train,metrics_test)

In [None]:
# Step 7 
from sklearn.metrics import confusion_matrix

def cm_metrics(model,X_train,y_train,X_test,y_test,data_set_name="train or test"):
        
        if data_set_name == "train":
             y_true = y_train
             y_pred = model.predict(X_train)
        elif data_set_name == "test":
             y_true = y_test
             y_pred = model.predict(X_test)
        else:
            raise ValueError("data_set_name must be 'train' o 'test'")
        
        #Confusion matriz 
        cm = confusion_matrix(y_true, y_pred)

          # Desempaquetar valores (para binario)
        tn, fp, fn, tp = cm.ravel()

        return {
        "type": "cm_matrix",
        "dataset": data_set_name,
        "true_0": {"predicted_0": int(tn), "predicted_1": int(fp)},
        "true_1": {"predicted_0": int(fn), "predicted_1": int(tp)}
        }

def load_cm(path,cm_train,cm_test):
      os.makedirs(os.path.dirname(path),exist_ok=True)

      with open(path, 'a', encoding='utf-8') as f:
        json.dump(cm_train, f)
        f.write("\n")
        json.dump(cm_test, f)
        f.write("\n")
        print("Metrics saved!")

In [None]:
from sklearn.metrics import confusion_matrix

y_pred = model.predict(X_test)
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
print("TN:", tn, "FP:", fp, "FN:", fn, "TP:", tp)

In [None]:
cm_train=cm_metrics(model,X_train,y_train,X_test,y_test,data_set_name="train")

cm_test=cm_metrics(model,X_train,y_train,X_test,y_test,data_set_name="test")

load_cm("../files/output/metrics.json",cm_train,cm_test)

In [None]:
print(cm_train)