In [None]:
#tensorflow.__version__

# LSTM

import time
# Registrar o momento de início da execução
start_time = time.time()

# Suprimir os warnings
from warnings import simplefilter
simplefilter(action = 'ignore', category = FutureWarning)

import keras
import pandas as pd
import numpy as np

from tensorflow import keras
from tensorflow.keras import layers

import matplotlib.pyplot as plt
import seaborn as sns
import scikitplot as skplt

from keras.models import Sequential
from keras.layers import Dense
from keras.utils import np_utils

from sklearn.metrics import classification_report

from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix

from sklearn.model_selection import train_test_split
from keras.callbacks import LearningRateScheduler

from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

from keras.preprocessing import sequence

# configurar o estilo dos gráficos com o Seaborn
sns.set_style('dark')

In [None]:
# Importando o dataset

# Região Amazônica
df = pd.read_csv('../df_tran/estados_df/regiao_amazonica_csv/datatran_amazonica_2007_2020_processado.csv')

df_teste = pd.read_csv('../df_tran/estados_df/regiao_amazonica_csv/datatran_amazonica_2021_processado.csv')

In [None]:
# Ver o balanceamento das classes
print(df.classificacao_acidente.value_counts())
porc = (df[df.classificacao_acidente == 'Não Grave'].shape[0] / df.shape[0]) * 100

#print("\nOs acidentes não graves representam {:.2f}% do dataset.\n".format(porc))

# Plotar gráfico de barras para as Classes
sns.countplot('classificacao_acidente', data = df, order = ['Não Grave', 'Grave']);

# Nomear os eixos X e Y
plt.xlabel('Classificação do acidente', fontsize = 12)
plt.ylabel('Total de acidentes', fontsize = 12)

plt.text(0.5, 1.05, "Os acidentes não graves representam {:.2f}% do dataset\n\n".format(porc), 
         ha='center', 
         va='center', 
         transform = plt.gca().transAxes, 
         fontsize = 14)

# Exibir o gráfico
plt.show()

In [None]:
df.drop('ufbr', axis = 1, inplace = True)
df.drop('km', axis = 1, inplace = True)

In [None]:
# Criando uma coluna para representar a ordem original das linhas
df['ordem_original'] = range(len(df))

# Linhas que contém a variável "Não Grave"
indices_classificacao_acidente = df[df['classificacao_acidente'] == 'Não Grave'].index

# Selecionando aleatoriamente as linhas que serão excluídas
indices_excluir = np.random.choice(indices_classificacao_acidente, size = int(len(indices_classificacao_acidente) * 0.95), replace = False)

# Excluindo as linhas do df
df = df.drop(indices_excluir)

# Reordenando o DataFrame com base na coluna 'ordem_original'
df = df.sort_values('ordem_original').reset_index(drop = True)

In [None]:
# Dropando algumas colunas não relevantes
df.drop('feridos_graves', axis = 1, inplace = True)
df.drop('uf', axis = 1, inplace = True)
df.drop('ordem_original', axis = 1, inplace = True)

df_teste.drop('ufbr', axis = 1, inplace = True)
df_teste.drop('km', axis = 1, inplace = True)
df_teste.drop('feridos_graves', axis = 1, inplace = True)
df_teste.drop('uf', axis = 1, inplace = True)

# Dropando as linhas duplicadas do dataset
df = df.drop_duplicates(ignore_index = True)

df_teste = df_teste.drop_duplicates(ignore_index = True)

In [None]:
# Substituindo as observações da classificacao_acidente por 0 e 1

df['classificacao_acidente'].replace('Não Grave', int(0), inplace = True)

df['classificacao_acidente'].replace('Grave', int(1), inplace = True)


df_teste['classificacao_acidente'].replace('Não Grave', int(0), inplace = True)

df_teste['classificacao_acidente'].replace('Grave', int(1), inplace = True)

In [None]:
# Dropando algumas features do df de 2021 para que o número de features em ambos os df sejam equivalentes

while df.nunique().sum() != df_teste.nunique().sum():
    
    # Obtém o índice da última linha do DataFrame
    indices = df_teste['causa_acidente'].value_counts().tail(1).index

    # Exclui as linhas com os índices obtidos
    df_teste.drop(df_teste[df_teste['causa_acidente'].isin(indices)].index, inplace = True)

In [None]:
#df
#df_teste

In [None]:
# Ver o balanceamento das classes
print(df.classificacao_acidente.value_counts())
porc = (df[df.classificacao_acidente == 1].shape[0] / df.shape[0]) * 100

#print("\nAcidentes Graves representam {:.2f}% do dataset.\n".format((df[df.classificacao_acidente == 1].shape[0] / df.shape[0]) * 100))

# Plotar gráfico de barras para as Classes
sns.countplot('classificacao_acidente', data = df, order = [1, 0]);

# Adicionar nomes aos valores 0 e 1 no eixo X
plt.xticks(ticks=[0, 1], labels=['Grave', 'Não Grave'])

# Nomear os eixos X e Y
plt.xlabel('Classificação do acidente', fontsize = 12)
plt.ylabel('Total de acidentes', fontsize = 12)

plt.text(0.5, 1.05, "Os acidentes graves representam {:.2f}% do dataset\n".format(porc), 
         ha='center', 
         va='center', 
         transform = plt.gca().transAxes, 
         fontsize = 14)

# Exibir o gráfico
plt.show()

In [None]:
# Ver o balanceamento das classes no conjunto de teste
print(df_teste.classificacao_acidente.value_counts())
print("\nAcidentes Graves representam {:.2f}% do dataset.\n".format((df_teste[df_teste.classificacao_acidente == 1].shape[0] / df_teste.shape[0]) * 100))

# Plotar gráfico de barras para as Classes
sns.countplot('classificacao_acidente', data = df_teste);

In [None]:
# Transformando os dados, concatenando as colunas correspondentes e armazenado isso em um df

from sklearn.preprocessing import OneHotEncoder
onehotencoder = OneHotEncoder()

# Fase dia
fd = ['fase_dia']

onehotencoder.fit_transform(df[fd])
columns_fd = [fd[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
fdDF = pd.DataFrame(onehotencoder.fit_transform(df[fd]).toarray(), columns = columns_fd)

# Condição Meteorológica
cm = ['condicao_metereologica']

onehotencoder.fit_transform(df[cm])
columns_cm = [cm[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
cmDF = pd.DataFrame(onehotencoder.fit_transform(df[cm]).toarray(), columns = columns_cm)

# Tipo Acidente
ta = ['tipo_acidente']

onehotencoder.fit_transform(df[ta])
columns_ta = [ta[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
taDF = pd.DataFrame(onehotencoder.fit_transform(df[ta]).toarray(), columns = columns_ta)

# Causa Acidente
ca = ['causa_acidente']

onehotencoder.fit_transform(df[ca])
columns_ca = [ca[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
caDF = pd.DataFrame(onehotencoder.fit_transform(df[ca]).toarray(), columns = columns_ca)

# Dia Semana
ds = ['dia_semana']

onehotencoder.fit_transform(df[ds])
columns_ds = [ds[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
dsDF = pd.DataFrame(onehotencoder.fit_transform(df[ds]).toarray(), columns = columns_ds)

# Sentido Via
sv = ['sentido_via']

onehotencoder.fit_transform(df[sv])
columns_sv = [sv[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
svDF = pd.DataFrame(onehotencoder.fit_transform(df[sv]).toarray(), columns = columns_sv)

# Tipo Pista
tp = ['tipo_pista']

onehotencoder.fit_transform(df[tp])
columns_tp = [tp[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
tpDF = pd.DataFrame(onehotencoder.fit_transform(df[tp]).toarray(), columns = columns_tp)

# Traçado Via
tv = ['tracado_via']

onehotencoder.fit_transform(df[tv])
columns_tv = [tv[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
tvDF = pd.DataFrame(onehotencoder.fit_transform(df[tv]).toarray(), columns = columns_tv)

basePAInputs = pd.concat([fdDF, cmDF, taDF, caDF, dsDF, svDF, tpDF, tvDF], axis = 1)

#Outputs

# Classificação Acidente
cla_acid = ['classificacao_acidente']

basePAOutputs = pd.DataFrame(df[cla_acid])

In [None]:
# Transformando os dados, concatenando as colunas correspondentes e armazenado isso em um df do teste

from sklearn.preprocessing import OneHotEncoder
onehotencoder = OneHotEncoder()

# Fase dia
fd = ['fase_dia']

onehotencoder.fit_transform(df_teste[fd])
columns_fd = [fd[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
fdDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[fd]).toarray(), columns = columns_fd)

# Condição Meteorológica
cm = ['condicao_metereologica']

onehotencoder.fit_transform(df_teste[cm])
columns_cm = [cm[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
cmDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[cm]).toarray(), columns = columns_cm)

# Tipo Acidente
ta = ['tipo_acidente']

onehotencoder.fit_transform(df_teste[ta])
columns_ta = [ta[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
taDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[ta]).toarray(), columns = columns_ta)

# Causa Acidente
ca = ['causa_acidente']

onehotencoder.fit_transform(df_teste[ca])
columns_ca = [ca[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
caDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[ca]).toarray(), columns = columns_ca)

# Dia Semana
ds = ['dia_semana']

onehotencoder.fit_transform(df_teste[ds])
columns_ds = [ds[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
dsDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[ds]).toarray(), columns = columns_ds)

# Sentido Via
sv = ['sentido_via']

onehotencoder.fit_transform(df_teste[sv])
columns_sv = [sv[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
svDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[sv]).toarray(), columns = columns_sv)

# Tipo Pista
tp = ['tipo_pista']

onehotencoder.fit_transform(df_teste[tp])
columns_tp = [tp[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
tpDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[tp]).toarray(), columns = columns_tp)

# Traçado Via
tv = ['tracado_via']

onehotencoder.fit_transform(df_teste[tv])
columns_tv = [tv[0] + ' - ' + cat_name for cat_name in onehotencoder.categories_][0]
tvDF_teste = pd.DataFrame(onehotencoder.fit_transform(df_teste[tv]).toarray(), columns = columns_tv)

basePAInputs_teste = pd.concat([fdDF_teste, cmDF_teste, taDF_teste, caDF_teste, dsDF_teste, svDF_teste, tpDF_teste, tvDF_teste], axis = 1)

#Outputs

# Classificação Acidente
cla_acid = ['classificacao_acidente']

basePAOutputs_teste = pd.DataFrame(df_teste[cla_acid])

In [None]:
# Separando os dados de treino e teste

# input_training -> treinamento_entrada | input_test -> teste_entrada
# output_training -> treinamento_saida | output_test -> teste_saida

from sklearn.model_selection import train_test_split

input_training, input_test, output_training, output_test = train_test_split(basePAInputs, basePAOutputs, test_size = 0.01)

input_training_t, input_test_t, output_training_t, output_test_t = train_test_split(basePAInputs_teste, basePAOutputs_teste, test_size = 0.99)

In [None]:
# Armazenando os dados de teste e treino em arrays

input_training_array = np.array(input_training)
input_test_array = np.array(input_test)
output_training_array = np.array(output_training)
output_test_array = np.array(output_test)

input_training_array_t = np.array(input_training_t)
input_test_array_t = np.array(input_test_t)
output_training_array_t = np.array(output_training_t)
output_test_array_t = np.array(output_test_t)

In [None]:
# usar técnica under-sampling
rus = RandomUnderSampler(sampling_strategy = 'not minority')
x_res_test, y_res_test = rus.fit_resample(input_test_array_t, output_test_array_t)

# ver o balanceamento das classes
print(pd.Series(y_res_test).value_counts())

# plotar a nova distribuição de classes
sns.countplot(y_res_test);

In [None]:
input_training_array = input_training_array.reshape(-1, 1, len(basePAInputs.columns))
input_test_array = input_test_array.reshape(-1, 1, len(basePAInputs.columns))
output_training_array = output_training_array.reshape(-1, 1, 1)
output_test_array = output_test_array.reshape(-1, 1, 1)

input_training_array_t = input_training_array_t.reshape(-1, 1, len(basePAInputs_teste.columns))
input_test_array_t = input_test_array_t.reshape(-1, 1, len(basePAInputs_teste.columns))
output_training_array_t = output_training_array_t.reshape(-1, 1, 1)
output_test_array_t = output_test_array_t.reshape(-1, 1, 1)

x_res_test = x_res_test.reshape(-1, 1, len(basePAInputs_teste.columns))
y_res_test = y_res_test.reshape(-1, 1, 1)

In [None]:
# Trazendo manualmente algumas métricas

from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2 * ((precision * recall) / (precision + recall + K.epsilon()))

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM

model = Sequential()

model.add(LSTM(80, return_sequences = True, kernel_initializer = 'random_normal', input_shape = (1, len(basePAInputs.columns))))
model.add(LSTM(80, return_sequences = False, kernel_initializer = 'random_normal'))
model.add(Dense(1, activation = 'sigmoid', kernel_initializer = 'random_normal'))

model.compile(optimizer = 'Adam', loss = 'binary_crossentropy', metrics = ['accuracy', precision_m, recall_m, f1_m])

In [None]:
r = model.fit(input_training_array, output_training_array, batch_size = 250, epochs = 5)

metrics_test = model.evaluate(input_test_array_t, output_test_array_t)

In [None]:
plt.plot(r.history['loss'], label = 'loss')

plt.legend()

In [None]:
y_pred = model.predict(input_test_array_t)

y_pred = (y_pred > 0.5)

In [None]:
output_test_array_t = output_test_array_t.reshape(-1, 1)

y_res_test = y_res_test.reshape(-1, 1)

In [None]:
# VN, FN
# FP, VP
# 0 == Não Grave / 1 == Grave

labels = ['Não Grave', 'Grave']

skplt.metrics.plot_confusion_matrix(output_test_array_t, y_pred)

plt.xticks(ticks = [0, 1], labels = labels, size = 12)
plt.yticks(ticks = [0, 1], labels = labels, size = 12)

plt.ylabel('Label Verdadeira', size = 13, labelpad = 15)
plt.xlabel('Label Predita', size = 13, labelpad = 15)

In [None]:
# Registrar o momento de término da execução
end_time = time.time()

# Calcular o tempo de execução
tempo_execucao = end_time - start_time

# Exibir o tempo de execução em segundos
print("Tempo de execução:", tempo_execucao, "segundos")