# Importação de Dependências

In [None]:
!pip install numpy matplotlib tensorflow

In [None]:
# =============================
# Built-in Libraries
# =============================
import datetime
import os
import string
import sys
import time
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed

# =============================
# Third-Party Libraries
# =============================

# Timezone
import pytz

# NumPy & Visualization
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# Scikit-learn
from sklearn.metrics import classification_report, confusion_matrix, f1_score

# TensorFlow / Keras
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.layers import Dense, Dropout, LSTM, LayerNormalization, TimeDistributed
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model, to_categorical

# Progress Bar
from tqdm.notebook import tqdm

# Variáveis e Configurações

In [None]:
# Caminho dos diretorios das amostras
lstm_path_data_train = '/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/train/'  # Alterar para o caminho correto do seu dataset
lstm_path_data_test = '/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/test/'    #Alterar para o caminho correto do seu dataset

# Configurações globais
actions = np.array(sorted(os.listdir(lstm_path_data_train)))
EPOCHS = 300
SEQUENCE_LENGTH = 30
FEATURES_PER_FRAME = 126
N = 100
FILE_NAME = 'lstm_model_'
LABEL_MAP = {label: num for num, label in enumerate(actions)}

print(LABEL_MAP)

In [None]:
print("Letras detectadas em train:")
print(actions)
print(f"Total de letras: {len(actions)}")

# Declaração de Funções

In [None]:
def get_log_filename():
    """
    Gera um nome de arquivo de log baseado na data e hora atuais.

    Returns:
        str: Nome do arquivo no formato 'lstm_model_DD_MM_YYYY_HH_MM_log.log'.
    """
    tz = pytz.timezone('America/Sao_Paulo')
    now = datetime.datetime.now(tz)
    return f'lstm_model_{now:%d_%m_%Y_%H_%M}_log.log'


def get_date_str():
    """
    Gera uma string de data e hora atual com fuso horário de São Paulo.

    Returns:
        str: Data formatada como 'DD_MM_YYYY_HH_MM'.
    """
    tz = pytz.timezone('America/Sao_Paulo')
    now = datetime.datetime.now(tz)
    return now.strftime('%d_%m_%Y_%H_%M')


def get_time_min(start, end):
    """
    Calcula a diferença entre dois instantes de tempo em minutos.

    Args:
        start (float): Tempo inicial (em segundos, geralmente de time.time()).
        end (float): Tempo final (em segundos, geralmente de time.time()).

    Returns:
        float: Diferença de tempo em minutos.
    """
    return (end - start) / 60

def load_sequence(seq_path, sequence_length):
    """
    Carrega uma sequência de frames a partir de arquivos `.npy`.

    Args:
        seq_path (str): Caminho do diretório contendo os arquivos da sequência.
        sequence_length (int): Número de frames (arquivos `.npy`) a serem carregados.

    Returns:
        list[np.ndarray] | None: Lista contendo os arrays carregados, ou `None` em caso de erro.
    """
    try:
        return [np.load(os.path.join(seq_path, f"{i}.npy")) for i in range(sequence_length)]
    except Exception as e:
        print(f"[WARN] Falha ao carregar {seq_path}: {e}")
        return None

def process_data(data_path, actions, label_map, sequence_length, max_workers=8, max_sequences_per_action=None):
    """
    Carrega os dados em formato de sequência de frames para cada gesto/ação.

    Args:
        data_path (str): Caminho base dos dados.
        actions (list): Lista de ações/gestos.
        label_map (dict): Mapeamento de ação para rótulo.
        sequence_length (int): Número de frames por sequência.
        max_sequences_per_action (int, opcional): Quantidade máxima de sequências por ação.

    Returns:
        sequences (np.ndarray): Array com shape (num_sequencias, sequence_length, features).
        labels (np.ndarray): Array de rótulos (num_sequencias,).
    """
    sequences, labels = [], []
    tasks = []

    for action in actions:
        action_dir = os.path.join(data_path, action)
        sequence_names = [s for s in os.listdir(action_dir) if s.isdigit()]
        sequence_names = sorted(sequence_names, key=int)
        if max_sequences_per_action:
            sequence_names = sequence_names[:max_sequences_per_action]
        for seq in sequence_names:
            seq_path = os.path.join(action_dir, seq)
            tasks.append((action, seq_path, label_map[action]))

    sequences_tmp = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for action, seq_path, label in tasks:
            futures.append((executor.submit(load_sequence, seq_path, sequence_length), label))

        for (future, label) in tqdm(futures, desc="Carregando sequências", total=len(futures)):
            window = future.result()
            if window is not None:
                sequences.append(window)
                labels.append(label)

    sequences = np.array(sequences)
    labels = to_categorical(labels, num_classes=len(label_map)).astype(int)
    return sequences, labels


def augment_sequence(sequence, noise_std=0.01, shuffle_prob=0.1):
    """
    Aplica data augmentation leve a uma sequência: ruído gaussiano e shuffle suave.

    Args:
        sequence (np.ndarray): Sequência (sequence_length, features)
        noise_std (float): Desvio padrão do ruído.
        shuffle_prob (float): Probabilidade de trocar dois frames.

    Returns:
        np.ndarray: Sequência aumentada.
    """
    # Aplica ruído
    noisy = sequence + np.random.normal(0, noise_std, size=sequence.shape)

    # Shuffle suave
    if shuffle_prob > 0:
        new_seq = noisy.copy()
        for i in range(len(new_seq) - 1):
            if np.random.rand() < shuffle_prob:
                new_seq[i], new_seq[i+1] = new_seq[i+1], new_seq[i]
        return new_seq
    else:
        return noisy


def augment_and_balance_data(X, y, augment_per_class=100, noise_std=0.01, shuffle_prob=0.1):
    """
    Aumenta e balanceia o dataset de treino aplicando oversampling com data augmentation.

    Args:
        X (np.ndarray): Dados de entrada (num_samples, seq_len, features).
        y (np.ndarray): Labels one-hot (num_samples, num_classes).
        augment_per_class (int): Número alvo mínimo de amostras por classe.
        noise_std (float): Intensidade do ruído aplicado.
        shuffle_prob (float): Probabilidade de trocar dois frames adjacentes.

    Returns:
        tuple: (X_aug, y_aug) balanceados e aumentados.
    """
    X_aug, y_aug = list(X), list(y)
    y_int = np.argmax(y, axis=1)
    class_counts = Counter(y_int)

    num_classes = y.shape[1]

    for class_idx in range(num_classes):
        current_samples = [i for i in range(len(y_int)) if y_int[i] == class_idx]
        needed = augment_per_class - len(current_samples)

        if needed > 0:
            for _ in range(needed):
                idx = np.random.choice(current_samples)
                new_seq = augment_sequence(X[idx], noise_std=noise_std, shuffle_prob=shuffle_prob)
                X_aug.append(new_seq)
                y_aug.append(y[idx])

    X_aug = np.array(X_aug)
    y_aug = np.array(y_aug)
    return X_aug, y_aug

# Organização da Saída de Log

In [None]:
# Caminho para o diretório de logs
log_dir = '/content/drive/MyDrive/Academico/TCC/Projeto_alterado/logs'
os.makedirs(log_dir, exist_ok=True)

# Caminho completo para o arquivo de log
log_file_path = os.path.join(log_dir, get_log_filename())

# Redirecionar a saída padrão para o arquivo de log
sys.stdout = open(log_file_path, 'w')

# Configuração de callbacks
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
print("\n\n ----------------------INICIO --------------------------\n")
print('[INFO] [INICIO]: ' + get_date_str())

start = time.time()

# Processamento 1

Caso os dados já tenham sido processados, ignorem a célula abaixo e vão diretamente para a célula que carrega os dados nas suas respectivas variáveis com:

```
X_train = np.load('caminho_do_diretório')
y_train = np.load('caminho_do_diretório')
X_test  = np.load('caminho_do_diretório')
y_test  = np.load('caminho_do_diretório')
```



In [None]:
X_train, y_train = process_data(lstm_path_data_train, actions, LABEL_MAP, SEQUENCE_LENGTH, max_sequences_per_action=N)
X_test, y_test = process_data(lstm_path_data_test, actions, LABEL_MAP, SEQUENCE_LENGTH, max_sequences_per_action=N)

In [None]:
os.makedirs('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data', exist_ok=True)

np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/X_train.npy', X_train)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/y_train.npy', y_train)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/X_test.npy', X_test)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/y_test.npy', y_test)

In [None]:
X_train = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/X_train.npy')
y_train = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/y_train.npy')
X_test  = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/X_test.npy')
y_test  = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data/y_test.npy')

In [None]:
print("Formato de X_train:", X_train.shape)
print("Formato de y_train:", y_train.shape)
print("Formato de X_test:", X_test.shape)
print("Formato de y_test:", y_test.shape)

In [None]:
y_true = np.argmax(y_test, axis=1)
unique, counts = np.unique(y_true, return_counts=True)

print("Distribuição de classes no conjunto de teste:")
for idx, count in zip(unique, counts):
    print(f"{actions[idx]}: {count} amostras")

In [None]:
model = Sequential([
    LSTM(128, return_sequences=True, input_shape=(SEQUENCE_LENGTH, FEATURES_PER_FRAME)),
    LayerNormalization(),
    Dropout(0.3),

    LSTM(128),
    LayerNormalization(),
    Dropout(0.4),

    Dense(64, activation='relu'),
    Dropout(0.3),

    Dense(len(actions), activation='softmax')
])

# Compilação
model.compile(optimizer=Adam(learning_rate=0.0003),
              loss='categorical_crossentropy',
              metrics=['categorical_accuracy'])

# Callbacks
early_stop = EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-5)
checkpoint = ModelCheckpoint('best_lstm_model.h5', monitor='val_categorical_accuracy', save_best_only=True, mode='max')

# Treinamento
print("\n[INFO] Treinando o modelo LSTM otimizado...")
history = model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stop, reduce_lr, checkpoint]
)

# Processamento 2 (Uso de Data Augmentation)

In [None]:
X_train, y_train = process_data(lstm_path_data_train, actions, LABEL_MAP, SEQUENCE_LENGTH, max_sequences_per_action=N)
X_test, y_test = process_data(lstm_path_data_test, actions, LABEL_MAP, SEQUENCE_LENGTH, max_sequences_per_action=N)

In [None]:
X_train_aug, y_train_aug = augment_and_balance_data(X_train, y_train, augment_per_class=100)

In [None]:
os.makedirs('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1', exist_ok=True)

np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/X_train.npy', X_train)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/y_train.npy', y_train)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/X_test.npy', X_test)
np.save('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/y_test.npy', y_test)

In [None]:
X_train_aug = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/X_train.npy')
y_train_aug = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/y_train.npy')
X_test  = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/X_test.npy')
y_test  = np.load('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/dataset/lstm/processed_data1/y_test.npy')

In [None]:
model = Sequential([
    # Extração temporal de padrões com LSTM
    LSTM(128, return_sequences=True, input_shape=(SEQUENCE_LENGTH, FEATURES_PER_FRAME)),
    LayerNormalization(),
    Dropout(0.3),

    LSTM(256, return_sequences=True),
    LayerNormalization(),
    Dropout(0.3),

    LSTM(128),
    LayerNormalization(),
    Dropout(0.4),

    Dense(128, activation='relu'),
    LayerNormalization(),
    Dropout(0.4),

    Dense(64, activation='relu'),
    Dropout(0.3),

    Dense(len(actions), activation='softmax')
])

# Compilação
model.compile(optimizer=Adam(learning_rate=0.0003),
              loss='categorical_crossentropy',
              metrics=['categorical_accuracy'])

# Callbacks
early_stop = EarlyStopping(monitor='val_loss', patience=60, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-5)
checkpoint = ModelCheckpoint('best_lstm_model.h5', monitor='val_categorical_accuracy', save_best_only=True, mode='max')

# Treinamento
print("\n[INFO] Treinando o modelo LSTM com data aug...")
history = model.fit(
    X_train_aug, y_train_aug,  # <-- dados aumentados
    epochs=EPOCHS,
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stop, reduce_lr, checkpoint]
)

# Plotagem dos dados

In [None]:
# Registrar métricas de cada época
for epoch in range(len(history.history['loss'])):
    print(f"Epoch {epoch + 1}/{EPOCHS} - loss: {history.history['loss'][epoch]:.4f} - acc: {history.history['categorical_accuracy'][epoch]:.4f} - val_loss: {history.history['val_loss'][epoch]:.4f} - val_acc: {history.history['val_categorical_accuracy'][epoch]:.4f}")

In [None]:
# Avaliação
print("\n[INFO] Avaliando a LSTM...")
score = model.evaluate(X_test, y_test, verbose=1)
print(f"[INFO] Accuracy: {score[1] * 100:.2f}% | Loss: {score[0]:.5f}")

In [None]:
# Salvamento do modelo
file_date = get_date_str()
model_path = f'/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/{FILE_NAME}{file_date}.h5'
model.save(model_path)
print(f'[INFO] Modelo salvo em: {model_path}')

end = time.time()
print("[INFO] LSTM Runtime: %.1f min" % (get_time_min(start, end)))



In [None]:
# Carregar o melhor modelo salvo
model = load_model(model_path)
print("[INFO] Modelo carregado com sucesso.")



In [None]:
# Resumo do modelo
print('[INFO] Summary: ')
model.summary()

In [None]:
# Criação de diretórios para gráficos
os.makedirs('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/graphics', exist_ok=True)
os.makedirs('/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/image', exist_ok=True)

In [None]:
# Gráficos de resultados
print("[INFO] Generating loss and accuracy graph")
plt.style.use("ggplot")
plt.figure()

num_epochs_trained = len(history.history["loss"])

plt.plot(np.arange(0, num_epochs_trained), history.history["loss"], label="train_loss")
plt.plot(np.arange(0, num_epochs_trained), history.history["val_loss"], label="val_loss")
plt.plot(np.arange(0, num_epochs_trained), history.history["categorical_accuracy"], label="train_acc")
plt.plot(np.arange(0, num_epochs_trained), history.history["val_categorical_accuracy"], label="val_acc")

plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend()
plt.savefig(f'/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/graphics/{FILE_NAME}{file_date}.png', bbox_inches='tight')

In [None]:
# Gerar previsões para o conjunto de teste
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

# Calcular F1-score
f1 = f1_score(y_true, y_pred_classes, average='weighted')

print(f"F1-score: {f1:.4f}")

# Gerar relatório completo
print(classification_report(y_true, y_pred_classes))

# Calcular a matriz de confusão
labels = np.arange(len(actions))
conf_matrix = confusion_matrix(y_true, y_pred_classes)

# Plotar a matriz de confusão
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=actions, yticklabels=actions, vmin=0, vmax=20)
plt.title('Matriz de Confusão')
plt.ylabel('Rótulo Verdadeiro')
plt.xlabel('Rótulo Predito')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()

# Salvar a imagem
confusion_matrix_path = f'/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/graphics/{FILE_NAME}{file_date}_confusion_matrix.png'
plt.savefig(confusion_matrix_path, bbox_inches='tight')
plt.show()

print(f"[INFO] Matriz de confusão salva em: {confusion_matrix_path}")

In [None]:
# Plot da arquitetura do modelo
print('[INFO] Generating image of model architecture')
plot_model(model, to_file=f'/content/drive/MyDrive/Academico/TCC/Projeto_alterado/models/image/{FILE_NAME}{file_date}.png', show_shapes=True)

print('\n[INFO] [FIM]: ' + get_date_str())

# Fechar o arquivo de log
sys.stdout.close()
sys.stdout = sys.__stdout__  # Restaurar a saída padrão

print(f"[INFO] Log salvo em: {log_file_path}")