In [None]:
!pip install gdown

In [None]:
import pandas as pd
import numpy as np
import gdown

In [None]:
# Variavel para escolher o escopo completo 2017-2024 Km 100 a 239 com Km da PRF ou escopo completo 2017-2024 Km 100 a 239 com Km da ANTT
escopo = 'completo_antt'  # 'completo' ou 'completo_antt'

if escopo == 'completo':
    df_train = pd.read_csv('data/escopo_completo/df_train_antt.csv')
    df_test = pd.read_csv('data/escopo_completo/df_test_antt.csv')
elif escopo == 'completo_antt':
    df_train = pd.read_csv('data/escopo_completo_antt/df_train_dnit.parquet')
    df_test = pd.read_csv('data/escopo_completo_antt/df_test_dnit.csv')

In [None]:
traducao = {
    "temperature_2m": "Temperatura",
    "relative_humidity_2m": "Umidade Relativa",
    "dew_point_2m": "Ponto de Orvalho",
    "apparent_temperature": "Temperatura Aparente",
    "rain": "Precipitação",
    "wind_speed_10m": "Velocidade do Vento",
    "cloud_cover": "Cobertura de Nuvens",
    "wind_gusts_10m": "Rajadas de Vento"
}

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Definir o tema personalizado
sns.set_theme(        
    palette="dark",              
    font='sans-serif',                
    rc={"axes.spines.right": False,  
        "axes.spines.top": False,    
        "figure.figsize": (10, 6),   
        "axes.facecolor": "#ede9e5",
        "figure.facecolor": "#ede9e5"
    }
)

# Aumentar a resolução das figuras
%config InlineBackend.figure_format = 'retina'

In [None]:
numericas = {"temperature_2m": "°C", 
             "relative_humidity_2m": "%",
             "dew_point_2m": "°C",
             "apparent_temperature": "°C",
             "rain": "mm",
             "wind_speed_10m": "m/s",
             "wind_gusts_10m": "m/s",
             "cloud_cover": "%"}

if escopo != 'completo':
    for coluna, unidade in numericas.items():
        sns.regplot(x=coluna, y='acidente', data=df_train, scatter=False)
        plt.title(f'Relação entre {traducao[coluna]} e acidentes')
        plt.xlabel(f'{traducao[coluna]} ({unidade})')
        plt.ylabel('Proporção de acidentes')
        plt.yticks([0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1])
        plt.show()

In [None]:
# Plotar um histograma para a variável 'km'
numericas = ['km']
for coluna in numericas: 
    acidente_0 = df_train[df_train['acidente'] == 0][coluna].value_counts()
    acidente_1 = df_train[df_train['acidente'] == 1][coluna].value_counts()
    # Junta em um DataFrame
    df_plot = pd.DataFrame({
        'Não Acidente': acidente_0,
        'Acidente': acidente_1
    })

    df_plot.plot(kind='area')

    plt.title(f'Histograma da variável {coluna}')
    plt.xlabel(coluna)
    plt.ylabel('Frequêcia')
    plt.legend()
    plt.show()

In [None]:
import matplotlib.pyplot as plt

# Lista de variáveis categóricas
if escopo != 'completo':
    categoricas = ['ano','mes', 'dia', 'hora', 'fim_semana', 'dia_da_semana', 'feriado_br', 
                   'municipio', 'sentido_via', 'tipo_tracado', 'tipo_de_uso_do_solo',
                   'tipo_pavimento', 'tipo_perfil_de_terreno', 'marginal', 'iluminacao', 
                   'velocidade_regulamentada_veiculos_leves','velocidade_regulamentada_veiculos_pesados', 
                   'numero_de_faixas','weather_code']
else:
    categoricas = ['ano','mes', 'dia', 'hora', 'fim_semana', 'dia_da_semana', 'feriado_br', 
                   'municipio', 'sentido_via', 'tipo_tracado', 'tipo_de_uso_do_solo',
                   'tipo_pavimento', 'tipo_perfil_de_terreno', 'marginal', 'iluminacao', 
                   'velocidade_regulamentada_veiculos_leves','velocidade_regulamentada_veiculos_pesados', 
                   'numero_de_faixas']

for coluna in categoricas:
    if coluna not in ["municipio", 'weather_code']:
        acidente_0 = df_train[df_train['acidente'] == 0][coluna].value_counts().sort_index()
        acidente_1 = df_train[df_train['acidente'] == 1][coluna].value_counts().sort_index()
    acidente_0 = df_train[df_train['acidente'] == 0][coluna].value_counts()
    acidente_1 = df_train[df_train['acidente'] == 1][coluna].value_counts()
    # Junta em um DataFrame
    df_plot = pd.DataFrame({
        'Não Acidente': acidente_0,
        'Acidente': acidente_1
    })

    # Faz o gráfico de barras lado a lado
    df_plot.plot(kind='bar', width=0.8)

    plt.title(f'Frêquencia de Acidente vs Não Acidente para {coluna}')
    plt.xlabel(coluna)
    plt.ylabel('Frequêcia')
    plt.legend()
    plt.show()

In [None]:
from sklearn.preprocessing import StandardScaler

# Preparar os dados para treinamento
if escopo == 'completo':
    categoricas = ['km', 'sentido_via', 'tipo_tracado', 'tipo_perfil_de_terreno', 'tipo_pavimento',
       'tipo_de_uso_do_solo', 'numero_de_faixas', 'municipio','velocidade_regulamentada_veiculos_leves',
       'velocidade_regulamentada_veiculos_pesados', 'marginal', 'iluminacao',
       'mes', 'dia', 'hora', 'dia_da_semana', 'fim_semana',
       'feriado_br']
    
    features = ['km', 'sentido_via', 'tipo_tracado', 'tipo_perfil_de_terreno', 'tipo_pavimento',
       'tipo_de_uso_do_solo', 'numero_de_faixas', 'municipio','velocidade_regulamentada_veiculos_leves',
       'velocidade_regulamentada_veiculos_pesados', 'marginal', 'iluminacao',
       'mes', 'dia', 'hora', 'ano', 'dia_da_semana', 'fim_semana',
       'feriado_br']
elif escopo == 'completo_antt':
    categoricas = ['km', 'sentido_via', 'tipo_tracado', 'tipo_perfil_de_terreno', 'tipo_pavimento',
       'tipo_de_uso_do_solo', 'numero_de_faixas', 'municipio','velocidade_regulamentada_veiculos_leves',
       'velocidade_regulamentada_veiculos_pesados', 'marginal', 'iluminacao',
       'mes', 'dia', 'hora', 'dia_da_semana', 'fim_semana', 'feriado_br']
    
    features = ['km', 'sentido_via', 'tipo_tracado', 'tipo_perfil_de_terreno', 'tipo_pavimento',
       'tipo_de_uso_do_solo', 'numero_de_faixas', 'municipio','velocidade_regulamentada_veiculos_leves',
       'velocidade_regulamentada_veiculos_pesados', 'marginal', 'iluminacao',
       'mes', 'dia', 'hora', 'ano', 'dia_da_semana', 'fim_semana',
       'feriado_br', 'temperature_2m', 'relative_humidity_2m',
       'dew_point_2m', 'apparent_temperature', 'rain', 'wind_speed_10m',
       'weather_code', 'cloud_cover', 'wind_gusts_10m', 'A','B','C','D','E','F','G','H','I','J']


# Concatenar X_train e X_test para garantir que ambas as matrizes de características tenham as mesmas colunas
X = pd.concat([df_train[features], df_test[features]])
X = pd.get_dummies(X, columns=categoricas, drop_first=True)

# Identificar colunas numéricas
colunas_numericas = list(set(features) - set(categoricas))

# Padronizar os dados numéricos
scaler = StandardScaler()
X[colunas_numericas] = scaler.fit_transform(X[colunas_numericas])

# Separar novamente em X_train e X_test
X_train = X.iloc[:len(df_train)]
X_test = X.iloc[len(df_train):]

y_train = df_train['acidente']
y_test = df_test['acidente']

In [None]:
import cupy as cp
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from cuml.svm import SVC  # SVM da cuML (GPU)
from cuml.ensemble import RandomForestClassifier  # Random Forest da cuML
from sklearn.model_selection import KFold, ParameterGrid
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, roc_curve
from sklearn.svm import LinearSVC


# Definir o número de folds
kf = KFold(n_splits=5, shuffle=True, random_state=42)


param_grids = {
   "SVM": {
        "C": [1, 2, 5],
        "kernel": ['linear', 'poly', 'rbf', 'sigmoid'],
        "gamma": ['scale', 'auto']
    }
}
params_grids = {
    "MLP": {
        "hidden_layer_sizes": [
            (512, 256, 128, 64),
            (256, 128, 64),
            (128, 64),
        ],
        "activation": ["relu", "tanh"],
        "lr": [0.0001],
        "optimizer": ["adam"],
        "dropout_rate": [0.0, 0.1],
        "weight_decay": [0.0, 1e-4],
    },
    "RF": {
        "n_estimators": [100, 200, 500],
        "split_criterion": ['gini', 'entropy'],
        'bootstrap': [True, False],
        "max_depth": [32, 64]
    },
   "SVM": {
        "C": [1, 2, 5],
        "kernel": ['linear', 'poly', 'rbf', 'sigmoid'],
        "gamma": ['scale', 'auto']
    }
}


# Criar dicionário para armazenar resultados
results = {"SVM": [], "RF": [], "MLP": []}

# Converter os dados para GPU
X_train_gpu = cp.array(X_train.values.astype('float32'))
y_train_gpu = cp.array(y_train.values.astype('int32'))


# Definição do modelo MLP usando PyTorch (GPU)
import torch.nn as nn

class MLPModel(nn.Module):
    def __init__(self, input_size, hidden_layers, activation="relu", dropout_rate=0.0):
        super(MLPModel, self).__init__()
        layers = []
        prev_size = input_size

        # Seleciona a função de ativação
        if activation == "relu":
            act_fn = nn.ReLU()
        elif activation == "tanh":
            act_fn = nn.Tanh()
        elif activation == "leaky_relu":
            act_fn = nn.LeakyReLU()
        else:
            raise ValueError(f"A função de ativação '{activation}' não é suportada.")

        # Camadas ocultas com dropout
        for size in hidden_layers:
            layers.append(nn.Linear(prev_size, size))
            layers.append(act_fn)
            if dropout_rate > 0:
                layers.append(nn.Dropout(dropout_rate))
            prev_size = size

        # Camada de saída
        layers.append(nn.Linear(prev_size, 1))
        layers.append(nn.Sigmoid())

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

round_ = 0
# Loop pelos modelos
for model_name, param_grid in param_grids.items():
    # Loop pelos hiperparâmetros
    for params in ParameterGrid(param_grid):
        print(f"\nTreinando {model_name} {round_}")
        round_ += 1
        recall_list, specificity_list, auc_list = [], [], []

        # Loop pelos folds
        for train_idx, val_idx in kf.split(X_train):
            X_tr, X_val = X_train_gpu[train_idx], X_train_gpu[val_idx]
            y_tr, y_val = y_train_gpu[train_idx], y_train_gpu[val_idx]

            # Criar e treinar o modelo na GPU
            if model_name == "SVM":
                if params["kernel"] == 'linear':
                    model = LinearSVC(C=params["C"], max_iter=10000)
                    model.fit(X_train, y_train)

                    y_pred = model.predict(X_val)
                else:
                    model = SVC(C=params["C"], kernel=params["kernel"], gamma=params["gamma"])
                    model.fit(X_tr, y_tr)
                
                    # Obter as distâncias da margem de decisão
                    decision_scores = model.decision_function(X_val)
                
                    # Aplicar o threshold personalizado
                    y_pred = (decision_scores >  0.5).astype(int)


            elif model_name == "RF":
                model = RandomForestClassifier(
                    n_estimators=params["n_estimators"], 
                    max_depth=params["max_depth"],
                    split_criterion=params['split_criterion'],
                    bootstrap=params['bootstrap']
                )
                model.fit(X_tr, y_tr)
            
                # Obter probabilidades e aplicar threshold
                y_prob = model.predict_proba(X_val)[:, 1]
                y_pred = (y_prob > 0.5).astype(int)


            elif model_name == "MLP":
                input_size = X_train.shape[1]
                model = MLPModel(input_size, params['hidden_layer_sizes'], params['activation'], params['dropout_rate']).cuda()
                if params['optimizer'] == 'adam':
                    optimizer = optim.Adam(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
                else:
                    optimizer = optim.SGD(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
                    
                criterion = nn.BCELoss()  
                
                # Converter dados para PyTorch
                X_tr_torch = torch.tensor(X_tr.get(), dtype=torch.float32).cuda()
                y_tr_torch = torch.tensor(y_tr.get(), dtype=torch.float32).cuda().unsqueeze(1)
            
                X_val_torch = torch.tensor(X_val.get(), dtype=torch.float32).cuda()
            
                # Treinamento
                model.train()
                for _ in range(400):  
                    optimizer.zero_grad()
                    outputs = model(X_tr_torch)
                    loss = criterion(outputs, y_tr_torch)
                    loss.backward()
                    optimizer.step()
            
                # Previsões com threshold personalizado
                model.eval()
                with torch.no_grad():
                    y_prob = model(X_val_torch).cpu().numpy()
                    y_pred = (y_prob >  0.5).astype(int)

                del X_tr_torch, y_tr_torch, X_val_torch

            # Calcular métricas
            y_val_cpu = y_val.get() if hasattr(y_val, "get") else (y_val.cpu().numpy() if hasattr(y_val, "cpu") else y_val)
            y_pred_cpu = y_pred.get() if model_name != "MLP" else y_pred
            tn, fp, fn, tp = confusion_matrix(y_val_cpu, y_pred_cpu).ravel()

            sensitivity = tp / (tp + fn + 1e-8)
            specificity = tn / (tn + fp + 1e-8)
            auc = roc_auc_score(y_val_cpu, y_pred_cpu)

            recall_list.append(sensitivity)
            specificity_list.append(specificity)
            auc_list.append(auc)


        # Salvar resultados médios para essa configuração de hiperparâmetros
        results[model_name].append({
            **params,
            'specificity': specificity_list,
            'recall': recall_list,
            'auc': auc_list,
        })

In [None]:
def calc_score(x):
    return np.mean(x['auc']) + np.mean(x['specificity']) + np.mean(x['recall']) * 1.5

for model_name, hyperparams_list in results.items():
    if hyperparams_list:
        scored_models = [(model, calc_score(model)) for model in hyperparams_list]
        best_model, _ = max(scored_models, key=lambda t: t[1])
        
        print(f"\nMelhores Hiperparâmetros para {model_name}:")
        print(best_model)
        for metric in ['specificity', 'recall', 'auc']:
            media = np.mean(best_model[metric])
            desvio = np.std(best_model[metric])
            print(f"{metric}: média = {media:.4f} ± {desvio:.4f}")
    else:
        print(f"\nNenhum resultado para {model_name}.")


In [None]:
import torch.nn as nn

class MLPModel(nn.Module):
    def __init__(self, input_size, hidden_layers, activation="relu", dropout_rate=0.0):
        super(MLPModel, self).__init__()
        layers = []
        prev_size = input_size

        # Seleciona a função de ativação
        if activation == "relu":
            act_fn = nn.ReLU()
        elif activation == "tanh":
            act_fn = nn.Tanh()
        elif activation == "leaky_relu":
            act_fn = nn.LeakyReLU()
        else:
            raise ValueError(f"A função de ativação '{activation}' não é suportada.")

        # Camadas ocultas com dropout
        for size in hidden_layers:
            layers.append(nn.Linear(prev_size, size))
            layers.append(act_fn)
            if dropout_rate > 0:
                layers.append(nn.Dropout(dropout_rate))
            prev_size = size

        # Camada de saída
        layers.append(nn.Linear(prev_size, 1))
        layers.append(nn.Sigmoid())

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [None]:
best_params = {
    "MLP": {'activation': 'relu', 'dropout_rate': 0.1, 'hidden_layer_sizes': (256, 128, 64), 'lr': 0.0001, 'optimizer': 'adam', 'weight_decay': 0.0
    },
}

import numpy as np
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, roc_curve
import torch
import torch.nn as nn

# Converta os dados para tensores e mande para o dispositivo correto
X_train_tensor = torch.tensor(X_train.astype('float32').values, dtype=torch.float32).to('cpu')
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1).to('cpu')

X_test_tensor = torch.tensor(X_test.astype('float32').values, dtype=torch.float32).to('cpu')
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).view(-1, 1).to('cpu')

# ---------------------- MLP Teste (já existente) ----------------------
input_size = X_train.shape[1]
model = MLPModel(
    input_size, 
    best_params['MLP']['hidden_layer_sizes'], 
    best_params['MLP']['activation'], 
    best_params['MLP']['dropout_rate']
)

# Enviar modelo para GPU se disponível
model = model.to('cpu')

if best_params["MLP"]['optimizer'] == 'adam':
    optimizer = optim.Adam(model.parameters(), lr=best_params["MLP"]['lr'], weight_decay=best_params["MLP"]['weight_decay'])
else:
    optimizer = optim.SGD(model.parameters(), lr=best_params["MLP"]['lr'], weight_decay=best_params["MLP"]['weight_decay'])

criterion = nn.BCELoss()

model.train()
for epochs in range(500):
    optimizer.zero_grad()
    output = model(X_train_tensor)
    loss = criterion(output, y_train_tensor)
    loss.backward()
    optimizer.step()

model.eval()
with torch.no_grad():
  y_prob = model(X_test_tensor).numpy()
  y_pred = (y_prob >  0.5).astype(int)

tn, fp, fn, tp = confusion_matrix(y_test_tensor, y_pred).ravel()
accuracy = (tn + tp) / (tn + fp + fn + tp)
sensitivity = tp / (tp + fn + 1e-8)
specificity = tn / (tn + fp + 1e-8)
auc = roc_auc_score(y_test, y_prob)
    
print("\nMLP - Métricas no conjunto de teste:")
print(f"Acuracia: {accuracy:.4f}")
print(f"Especificidade: {specificity:.4f}")
print(f"Sensibilidade (Recall): {sensitivity:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, roc_curve

print("\nAvaliando SVM nos dados de teste...")

from sklearn.svm import LinearSVC

model = LinearSVC(C=1.0, max_iter=10000)

model.fit(X_train, y_train)

y_pred = model.predict(X_test)

tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
sensitivity = tp / (tp + fn + 1e-8)
specificity = tn / (tn + fp + 1e-8)
auc = roc_auc_score(y_test, y_pred)

print("\nSVM (cuML) - Métricas no conjunto de teste:")
print(f"Especificidade: {specificity:.4f}")
print(f"Sensibilidade (Recall): {sensitivity:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
#import cupy as cp
from sklearn.ensemble import RandomForestClassifier  # Random Forest da cuML
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, roc_curve

# ---------------------- Random Forest com GPU (cuML) ----------------------
print("\nAvaliando Random Forest nos dados de teste...")


rf_best = RandomForestClassifier(
    n_estimators=500,
    criterion='entropy',
    bootstrap=False,
    max_depth=64,
)

rf_best.fit(X_train, y_train)

#y_prob_rf = predict_in_batches(model, X_test.values, batch_size=4096)
y_prob = rf_best.predict_proba(X_test)[:, 1]
y_pred_rf = (y_prob > 0.5).astype(int)

tn, fp, fn, tp = confusion_matrix(y_test, y_pred_rf).ravel()
sensitivity = tp / (tp + fn + 1e-8)
specificity = tn / (tn + fp + 1e-8)
auc = roc_auc_score(y_test, y_pred_rf)

print("\nRandom Forest (cuML) - Métricas no conjunto de teste:")
print(f"Especificidade: {specificity:.4f}")
print(f"Sensibilidade (Recall): {sensitivity:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
tn, fp, fn, tp