In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PowerTransformer
from sklearn.metrics import (roc_auc_score, average_precision_score, 
                             f1_score, accuracy_score, precision_score, 
                             recall_score, make_scorer)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.base import clone
from xgboost import XGBClassifier
from scipy.stats import loguniform, randint
from typing import Dict, Any
from lightgbm import LGBMClassifier

import joblib
import json
import time
import os
import warnings
from joblib import Memory

warnings.filterwarnings('ignore')
memory = Memory(location='./.cache', verbose=0)


class CreditRiskModel:
    def __init__(self, model_type: str, metric: str = 'roc_auc'):
        self.model_type = model_type
        self.metric = metric
        self.best_model = None
        self.metrics_history = []
        self.scorer = self._get_scorer()
        self._setup_model_config()

    def _get_scorer(self):
        scoring = {
            'roc_auc': make_scorer(roc_auc_score, needs_proba=True),
            'precision': make_scorer(precision_score),
            'recall': make_scorer(recall_score),
            'f1': make_scorer(f1_score),
            'accuracy': make_scorer(accuracy_score),
            'average_precision': make_scorer(average_precision_score, needs_proba=True)
        }
        return scoring.get(self.metric, scoring['roc_auc'])

    def _setup_model_config(self):
        self.model_config = {
            'logistic': {
                'model': LogisticRegression(max_iter=1000, class_weight='balanced', solver='saga'),
                'params': {
                    'classifier__C': loguniform(1e-3, 1e2),
                    'classifier__penalty': ['l1', 'l2']
                }
            },
            'random_forest': {
                'model': RandomForestClassifier(n_jobs=-1, class_weight='balanced_subsample'),
                'params': {
                    'classifier__n_estimators': randint(100, 300),
                    'classifier__max_depth': randint(5, 30),
                    'classifier__min_samples_split': randint(2, 15)
                }
            },
            'xgboost': {
                'model': XGBClassifier(objective='binary:logistic', eval_metric='logloss', use_label_encoder=False, n_jobs=-1, tree_method='hist', scale_pos_weight=1),
                'params': {
                    'classifier__n_estimators': randint(100, 500),
                    'classifier__learning_rate': loguniform(0.01, 0.2),
                    'classifier__max_depth': randint(3, 10),
                    'classifier__subsample': [0.6, 0.8, 1.0],
                    'classifier__colsample_bytree': [0.6, 0.8, 1.0]
                }
            },
            'lightgbm': {
                'model': LGBMClassifier(objective='binary', class_weight='balanced', n_jobs=-1),
                'params': {
                    'classifier__n_estimators': randint(100, 500),
                    'classifier__learning_rate': loguniform(1e-3, 0.2),
                    'classifier__max_depth': randint(3, 12),
                    'classifier__num_leaves': randint(10, 100),
                    'classifier__subsample': [0.6, 0.8, 1.0]
                }
            }
        }

    @staticmethod
    def _feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
        df['PAYMENT_RATIO'] = df['PAY_AMT1'] / (df['BILL_AMT1'].abs() + 1)
        df['UTILIZATION'] = df['BILL_AMT1'] / (df['LIMIT_BAL'] + 1)
        df['AVG_BILL'] = df[[f'BILL_AMT{i}' for i in range(1, 7)]].mean(axis=1)
        df['AVG_PAY'] = df[[f'PAY_AMT{i}' for i in range(1, 7)]].mean(axis=1)
        return df

    def load_data(self, url: str) -> pd.DataFrame:
        df = pd.read_excel(url, header=1)
        df = df.rename(columns={'default payment next month': 'DEFAULT'})
        df = self._feature_engineering(df)
        return df

    def prepare_data(self, df: pd.DataFrame):
        numeric = df.select_dtypes(include=['number']).columns.drop(['ID', 'DEFAULT'], errors='ignore')
        X = df[numeric]
        y = df['DEFAULT'].astype('int8')
        return train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

    def build_pipeline(self) -> Pipeline:
        return Pipeline([
            ('scaler', PowerTransformer(method='yeo-johnson')),
            ('classifier', clone(self.model_config[self.model_type]['model']))
        ], memory=memory)

    def optimize_model(self, X_train, y_train):
        pipeline = self.build_pipeline()
        search = RandomizedSearchCV(
            pipeline,
            self.model_config[self.model_type]['params'],
            n_iter=30,
            cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
            scoring=self.scorer,
            n_jobs=-1,
            random_state=42,
            verbose=0
        )
        search.fit(X_train, y_train)
        return search.best_estimator_, search.best_params_

    def evaluate_model(self, model, X_test, y_test) -> Dict[str, float | str]:
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)[:, 1]

        return {
            'roc_auc': float(roc_auc_score(y_test, y_proba)),
            'average_precision': float(average_precision_score(y_test, y_proba)),
            'accuracy': float(accuracy_score(y_test, y_pred)),
            'precision': float(precision_score(y_test, y_pred)),
            'recall': float(recall_score(y_test, y_pred)),
            'f1': float(f1_score(y_test, y_pred)),
            'optimization_metric': self.metric
        }

    def train_and_evaluate(self, url: str) -> Dict[str, Any]:
        df = self.load_data(url)
        X_train, X_test, y_train, y_test = self.prepare_data(df)

        start = time.time()
        best_model, best_params = self.optimize_model(X_train, y_train)
        train_time = time.time() - start

        metrics = self.evaluate_model(best_model, X_test, y_test)
        results = {
            'model_type': self.model_type,
            'best_params': best_params,
            'metrics': metrics,
            'training_time': train_time,
            'features_used': X_train.columns.tolist()
        }

        self.best_model = best_model
        self.metrics_history.append(results)
        return results

    def save_results(self, results: Dict[str, Any], path: str = './models'):
        os.makedirs(path, exist_ok=True)
        joblib.dump(self.best_model, f'{path}/{self.model_type}_model.pkl')
        
        with open(f'{path}/{self.model_type}_metrics.json', 'w') as f:
            json.dump(results, f, indent=2)


# Execução principal
if __name__ == "__main__":
    URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls"
    MODELOS = ['logistic', 'random_forest', 'xgboost', 'lightgbm']
    METRICAS = ['roc_auc', 'f1', 'average_precision']
    all_results = {}

    for modelo in MODELOS:
        for metrica in METRICAS:
            print(f"\n==> Treinando {modelo} com métrica {metrica}")
            cr = CreditRiskModel(modelo, metrica)
            res = cr.train_and_evaluate(URL)
            cr.save_results(res)
            all_results[f"{modelo}_{metrica}"] = res
            print(f"AUC: {res['metrics']['roc_auc']:.4f} | F1: {res['metrics']['f1']:.4f} | Tempo: {res['training_time']:.2f}s")

    with open('./models/all_results.json', 'w') as f:
        json.dump(all_results, f, indent=2)