# Classificação da HbA1c com Modelos Supervisionados

Este notebook executa o pipeline completo de preparação de dados, otimização de hiperparâmetros e avaliação de modelos para predizer a classe A1c (0 = não diabético, 1 = diabético) a partir de dados hematológicos.

## Visão Geral do Notebook

1. Importação de bibliotecas e definição de métricas
2. Funções utilitárias para carregar, limpar e transformar os dados
3. Construção do conjunto de treino/teste preservando o desbalanceamento
4. Definição do pré-processamento com `ColumnTransformer`
5. Busca de hiperparâmetros via `RandomizedSearchCV` (Stratified K-Fold)
6. Seleção do melhor modelo com base no F2-Score
7. Avaliação final no conjunto de teste usando F2-Score e AUC-PR

In [None]:
import os
import sys
import warnings
import numpy as np
import pandas as pd

from sklearn import set_config
from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.metrics import (
    fbeta_score,
    make_scorer,
    average_precision_score,
    confusion_matrix,
    classification_report,
    precision_recall_curve,
    auc,
    roc_auc_score
)

warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
set_config(display="diagram")

try:
    import pymysql
except ImportError:
    pymysql = None
    print("Aviso: pymysql não está disponível; será usada apenas a simulação de dados.")

try:
    import lightgbm as lgb
except ImportError:
    lgb = None
    print("Aviso: lightgbm não está instalado. O modelo LightGBM será ignorado.")

try:
    from xgboost import XGBClassifier
except ImportError:
    XGBClassifier = None
    print("Aviso: xgboost não está instalado. O modelo XGBoost será ignorado.")


In [None]:
def fetch_data_in_batches(query, MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD, DB_NAME, batch_size=10000):
    """Busca dados de um banco MySQL em lotes"""
    if pymysql is None:
        print("fetch_data_in_batches: pymysql indisponível. Retornando None para acionar simulação.")
        return None

    connection = None
    df_list = []

    try:
        connection = pymysql.connect(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            user=MYSQL_USERNAME,
            password=MYSQL_PASSWORD,
            db=DB_NAME,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        print("Conexão estabelecida com sucesso!")

        with connection.cursor() as cursor:
            cursor.execute(query)
            while True:
                results = cursor.fetchmany(batch_size)
                if not results:
                    break
                df_list.append(pd.DataFrame(results))

        if df_list:
            df = pd.concat(df_list, ignore_index=True)
        else:
            df = pd.DataFrame()
        return df

    except Exception as e:
        print(f"Erro durante a conexão/busca: {e}. Retornando None para usar dados simulados.")
        return None

    finally:
        if connection:
            connection.close()
            print("Conexão fechada.")


def clean_data(df):
    df = df.dropna()

    df = df[(df != '').all(axis=1)]
    return df


def fix_data_types(df):
    float_cols = ['Leucócitos', 'Mielócitos', 'Metamielócitos', 'Bastões', 'Segmentados',
                  'Eosinófilos', 'Basófilos', 'Linfócitos', 'Linfócitos Atípicos',
                  'Monócitos', 'Plasmócitos', 'Blastos', 'Eritrócitos', 'Hemoglobina',
                  'Hematócrito', 'HCM', 'CHCM', 'RDW', 'Plaquetas', 'MVP',
                  'Promielócitos', 'A1C']
    int_cols = ['Classe A1c', 'Classe A1c2']

    for col in float_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').astype(float)

    for col in int_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            df[col] = df[col].fillna(-1).astype(int)

    df = df.dropna(subset=[c for c in float_cols if c in df.columns] + [c for c in int_cols if c in df.columns])
    return df


def remove_unusual_variables(df):
    cols_to_drop = [
        'CodigoOs', 'Data Nascimento', 'Classe Idade', 'HCM_1', 'GME', 'G',
        'Paciente', 'Data Cadastro', 'Data Cadastro Date', 'Data Nascimento Data',
        'Classe A1c', 'A1C'
    ]
    existing = [col for col in cols_to_drop if col in df.columns]
    return df.drop(columns=existing, errors='ignore')


def remove_outliers(df):
    thresholds = {
        'Leucócitos': 200000,
        'Mielócitos': 2000,
        'Metamielócitos': 4000,
        'Segmentados': 50000,
        'Eosinófilos': 25000,
        'Basófilos': 400,
        'Linfócitos': 100000,
        'Linfócitos Atípicos': 1700,
        'Monócitos': 10000,
        'Plasmócitos': 1000,
        'Blastos': 50000,
        'Eritrócitos': 8.8,
        'Hemoglobina': (3, None),
        'HCM': 140,
        'CHCM': 38,
        'MVP': (3, None),
        'Plaquetas': 1300
    }
    for col, limit in thresholds.items():
        if col not in df.columns:
            continue
        if isinstance(limit, tuple):
            lower, upper = limit
            if lower is not None:
                df = df[df[col] >= lower]
            if upper is not None:
                df = df[df[col] <= upper]
        else:
            df = df[df[col] <= limit]
    return df


def create_train_test_subsets(df):
    X = df.drop('Classe A1c2', axis=1)
    y = df['Classe A1c2']
    return train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )


def simulate_data():
    np.random.seed(42)
    n_total_samples = 11623
    numerical_features = ['Idade', 'Leucócitos', 'Mielócitos', 'Metamielócitos', 'Bastões', 'Segmentados',
                          'Eosinófilos', 'Basófilos', 'Linfócitos', 'Linfócitos Atípicos', 'Monócitos',
                          'Plasmócitos', 'Blastos', 'Eritrócitos', 'Hemoglobina', 'Hematócrito', 'HCM',
                          'CHCM', 'RDW', 'Plaquetas', 'MVP', 'Promielócitos', 'NPxD']
    X_data = pd.DataFrame(np.random.rand(n_total_samples, len(numerical_features)), columns=numerical_features)
    X_data['Sexo'] = np.random.choice(['F', 'M'], n_total_samples)
    X_data['NPxD'] = np.random.randint(0, 2, n_total_samples)

    y_data = pd.Series(np.zeros(n_total_samples, dtype=int))
    minority_size = int(n_total_samples * 0.16)
    y_data.iloc[np.random.choice(n_total_samples, minority_size, replace=False)] = 1
    y_data.name = 'Classe A1c2'

    print(f"Dados simulados gerados: {n_total_samples} amostras.")

    return train_test_split(
        X_data, y_data, test_size=0.2, random_state=42, stratify=y_data
    )


def get_train_test_data(query, MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD, DB_NAME):
    df = fetch_data_in_batches(query, MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD, DB_NAME)

    if df is None or df.empty:
        print("Aviso: dados reais não disponíveis. Usando simulação.")
        return simulate_data()

    df = clean_data(df)
    df = fix_data_types(df)
    df = remove_unusual_variables(df)
    df = remove_outliers(df)

    return create_train_test_subsets(df)


In [None]:
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USERNAME = "user"
MYSQL_PASSWORD = "password"
DB_NAME = "db_name"
query = "SELECT * FROM dados_hematologicos"

X_train, X_test, y_train, y_test = get_train_test_data(
    query, MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD, DB_NAME
)

print(f"Shape X_train: {X_train.shape}")
print(f"Shape X_test: {X_test.shape}")
print("\nDistribuição da variável alvo (treino):")
print(y_train.value_counts(normalize=True))

categorical_features = ['Sexo'] if 'Sexo' in X_train.columns else []
numerical_features = X_train.select_dtypes(include=np.number).columns.tolist()


In [None]:
numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_pipeline = Pipeline([
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_pipeline, numerical_features),
        ('cat', categorical_pipeline, categorical_features)
    ],
    remainder='drop'
)

class_weights = y_train.value_counts(normalize=True).to_dict()
minority_weight = class_weights[0] / class_weights[1]
class_weight_dict = {0: 1, 1: minority_weight}
print("Pesos de classe calculados:", class_weight_dict)


In [None]:
f2_scorer = make_scorer(fbeta_score, beta=2, average='binary')
auc_pr_scorer = make_scorer(average_precision_score, needs_proba=True)
scoring = {'f2_score': f2_scorer, 'auc_pr': auc_pr_scorer}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)


In [None]:
model_search_space = {}

model_search_space['LogisticRegression'] = {
    'estimator': Pipeline([
        ('preprocessor', preprocessor),
        ('model', LogisticRegression(
            random_state=42,
            solver='liblinear',
            class_weight=class_weight_dict,
            max_iter=2000
        ))
    ]),
    'param_distributions': {
        'model__C': np.logspace(-3, 3, 20),
        'model__penalty': ['l1', 'l2']
    }
}

model_search_space['RandomForest'] = {
    'estimator': Pipeline([
        ('preprocessor', preprocessor),
        ('model', RandomForestClassifier(
            random_state=42,
            class_weight=class_weight_dict,
            n_jobs=-1
        ))
    ]),
    'param_distributions': {
        'model__n_estimators': [200, 400, 600, 800],
        'model__max_depth': [10, 20, 30, None],
        'model__min_samples_split': [2, 5, 10],
        'model__min_samples_leaf': [1, 3, 5]
    }
}

model_search_space['HistGradientBoosting'] = {
    'estimator': Pipeline([
        ('preprocessor', preprocessor),
        ('model', HistGradientBoostingClassifier(
            random_state=42,
            class_weight=class_weight_dict,
        ))
    ]),
    'param_distributions': {
        'model__learning_rate': np.linspace(0.01, 0.3, 20),
        'model__max_depth': [3, 5, 7, None],
        'model__max_leaf_nodes': [15, 31, 63],
        'model__min_samples_leaf': [10, 20, 30]
    }
}

if lgb is not None:
    model_search_space['LightGBM'] = {
        'estimator': Pipeline([
            ('preprocessor', preprocessor),
            ('model', lgb.LGBMClassifier(
                random_state=42,
                class_weight=class_weight_dict,
                n_jobs=-1,
                verbose=-1
            ))
        ]),
        'param_distributions': {
            'model__n_estimators': [300, 600, 900],
            'model__learning_rate': np.linspace(0.01, 0.2, 10),
            'model__num_leaves': [31, 63, 127],
            'model__max_depth': [-1, 10, 20],
            'model__subsample': [0.7, 0.85, 1.0],
            'model__colsample_bytree': [0.6, 0.8, 1.0]
        }
    }

if XGBClassifier is not None:
    model_search_space['XGBoost'] = {
        'estimator': Pipeline([
            ('preprocessor', preprocessor),
            ('model', XGBClassifier(
                random_state=42,
                eval_metric='logloss',
                n_jobs=-1,
                tree_method='hist',
                scale_pos_weight=class_weight_dict[1]
            ))
        ]),
        'param_distributions': {
            'model__n_estimators': [300, 600, 900],
            'model__max_depth': [3, 5, 7],
            'model__learning_rate': np.linspace(0.01, 0.3, 10),
            'model__subsample': [0.6, 0.8, 1.0],
            'model__colsample_bytree': [0.6, 0.8, 1.0]
        }
    }

print(f"Modelos configurados: {list(model_search_space.keys())}")


In [None]:
n_iter_search = 25
random_state = 42

search_results = []
best_model = None
best_model_name = None
best_score = -np.inf

for name, config in model_search_space.items():
    print(f"\n=== Otimizando {name} ===")
    search = RandomizedSearchCV(
        estimator=config['estimator'],
        param_distributions=config['param_distributions'],
        n_iter=n_iter_search,
        scoring=scoring,
        refit='f2_score',
        cv=cv,
        n_jobs=-1,
        random_state=random_state,
        verbose=0
    )
    search.fit(X_train, y_train)

    best_idx = search.best_index_
    mean_f2 = search.cv_results_['mean_test_f2_score'][best_idx]
    mean_auc_pr = search.cv_results_['mean_test_auc_pr'][best_idx]

    search_results.append({
        'model': name,
        'best_params': search.best_params_,
        'mean_f2': mean_f2,
        'mean_auc_pr': mean_auc_pr
    })

    print(f"Melhor F2-Score (CV): {mean_f2:.4f}")
    print(f"Melhor AUC-PR (CV): {mean_auc_pr:.4f}")

    if mean_f2 > best_score:
        best_score = mean_f2
        best_model = search.best_estimator_
        best_model_name = name
        best_search = search

results_df = pd.DataFrame(search_results).sort_values(by='mean_f2', ascending=False)
print("\nResumo dos resultados (ordenado por F2-Score):")
results_df


In [None]:
print(f"\nModelo selecionado: {best_model_name}")
print(f"F2-Score médio (CV): {best_score:.4f}")

best_model.fit(X_train, y_train)

y_pred = best_model.predict(X_test)
y_proba = best_model.predict_proba(X_test)[:, 1]

f2_test = fbeta_score(y_test, y_pred, beta=2)
precision, recall, _ = precision_recall_curve(y_test, y_proba)
auc_pr_test = auc(recall, precision)
auc_roc_test = roc_auc_score(y_test, y_proba)

print("\nMatriz de Confusão:")
print(confusion_matrix(y_test, y_pred))

print("\nRelatório de Classificação:")
print(classification_report(y_test, y_pred))

print("\nMétricas no conjunto de teste:")
print(f"F2-Score: {f2_test:.4f}")
print(f"AUC-PR: {auc_pr_test:.4f}")
print(f"AUC-ROC: {auc_roc_test:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label=f'Curva PR (AUC = {auc_pr_test:.3f})')
plt.xlabel('Recall')
plt.ylabel('Precisão')
plt.title(f'Curva Precisão-Recall - {best_model_name}')
plt.grid(True, linestyle='--', alpha=0.5)
plt.legend(loc='lower left')
plt.show()
