In [1]:
import os
import numpy as np

import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import torch.nn.init as init

import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, confusion_matrix, classification_report, accuracy_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC

### Carregando Dataset do professor

In [2]:
# testando amostras do professor
df_train = pd.read_csv('dataset/har-example-mo436/train.csv')
df_test = pd.read_csv('dataset/har-example-mo436/test.csv')
df_val = pd.read_csv('dataset/har-example-mo436/validation.csv')

# pegando só porcentagens das amostras
percent = 0.6
porcentagem = f"{percent} dos dados de treino do professor - com backbone"
df_90, df_10 = train_test_split(df_train, test_size=percent, random_state=42) # dataframes de treino e validacao
df_train = df_10

X_train = df_train.drop(columns=[df_train.columns[0], 'gyro-end-time', 'level_0', 'accel-end-time', 'gyro-start-time', 'index', 'user', 'serial', 'accel-start-time', 'csv', 'timestamp diff', 'activity code', 'window', 'standard activity code'])
X_test = df_test.drop(columns=[df_train.columns[0], 'gyro-end-time', 'level_0', 'accel-end-time', 'gyro-start-time', 'index', 'user', 'serial', 'accel-start-time', 'csv', 'timestamp diff', 'activity code', 'window', 'standard activity code'])
X_val = df_val.drop(columns=[df_train.columns[0], 'gyro-end-time', 'level_0', 'accel-end-time', 'gyro-start-time', 'index', 'user', 'serial', 'accel-start-time', 'csv', 'timestamp diff', 'activity code', 'window', 'standard activity code'])

y_train = df_train.pop('standard activity code')
y_test = df_test.pop('standard activity code')
y_val = df_val.pop('standard activity code')

print("shape train: ", X_train.shape)
print("shape test: ", X_test.shape)
print("shape valid: ", X_val.shape)

shape train:  (36, 360)
shape test:  (24, 360)
shape valid:  (24, 360)


### Dataset

In [3]:
class CSVLabeledDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.transform = transform

        self.features = X
        self.labels = y

        if self.transform:
            self.features = self.transform(self.features)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        sample = torch.tensor(self.features[idx], dtype=torch.float32).unsqueeze(0)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return sample, label

### Carregando VAE

In [4]:
class TemporalVAE(nn.Module):
    def __init__(self, device, input_dim, hidden_dim, latent_dim, num_layers=1):
        super(TemporalVAE, self).__init__()
        self.device = device

        self.encoder_lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True, dropout=0.2)
        self.hidden2mean = nn.Linear(hidden_dim, latent_dim)
        self.hidden2logvar = nn.Linear(hidden_dim, latent_dim)
        
        self.latent2hidden = nn.Linear(latent_dim, hidden_dim)
        self.decoder_lstm = nn.LSTM(hidden_dim, input_dim, num_layers, batch_first=True, bidirectional=False, dropout=0.2)
        self.output_layer = nn.Linear(input_dim, input_dim)
        self.dropout = nn.Dropout(0.2)
    
    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std).to(self.device)
        return mean + eps * std
    
    def encode(self, x):
        residual = x
        output, (h_n, _) = self.encoder_lstm(x)
        output = output[:, :, :output.size(2)//2] + output[:, :, output.size(2)//2:]
        h_n = output[:, -1, :]
        # h_n = h_n[-1, :, :]
        h_n = self.dropout(h_n)
        mean = F.leaky_relu(self.hidden2mean(h_n), negative_slope=0.01)
        logvar = F.leaky_relu(self.hidden2logvar(h_n), negative_slope=0.01)
        return mean, logvar, residual
    
    def decode(self, z, seq_len, residual):
        # hidden = self.latent2hidden(z).unsqueeze(0).repeat(seq_len, 1, 1).transpose(0, 1)
        hidden = self.latent2hidden(z).unsqueeze(1).repeat(1, seq_len, 1)

        output, _ = self.decoder_lstm(hidden)
        output = self.dropout(output)

        # output += residual

        output = self.output_layer(output)
        return output
    
    def forward(self, x):
        mean, logvar, residual = self.encode(x)
        z = self.reparameterize(mean, logvar)
        seq_len = x.size(1)
        recon_x = self.decode(z, seq_len, residual)
        return recon_x, mean, logvar

In [5]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    print ("MPS device not found.")
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # cuda
print(device)

input_dim = 360#train_dataset.features.shape[1]
hidden_dim = 128 # 400
latent_dim = 64 # 20
num_layers = 2
epochs = 350

print(input_dim)

MPS device not found.
cuda
360


In [6]:
# carregando modelo
model_path = "model/vae.pth"

# Criar uma instância do modelo
vae = TemporalVAE(device, input_dim, hidden_dim, latent_dim, num_layers).to(device)

# Carregar o estado salvo
state_dict = torch.load(model_path)

# Carregar o estado no modelo
vae.load_state_dict(state_dict)

print(vae)

TemporalVAE(
  (encoder_lstm): LSTM(360, 128, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (hidden2mean): Linear(in_features=128, out_features=64, bias=True)
  (hidden2logvar): Linear(in_features=128, out_features=64, bias=True)
  (latent2hidden): Linear(in_features=64, out_features=128, bias=True)
  (decoder_lstm): LSTM(128, 360, num_layers=2, batch_first=True, dropout=0.2)
  (output_layer): Linear(in_features=360, out_features=360, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
)


### Extraindo representações latentes do VAE

In [7]:
# Converter para tensores
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(device)
X_train_tensor = X_train_tensor.unsqueeze(1) # [batch_size, sequence_length, input_dim]
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
X_test_tensor = X_test_tensor.unsqueeze(1)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).to(device)
X_val_tensor = X_val_tensor.unsqueeze(1)

vae.eval()

# Extrair representações latentes
with torch.no_grad():
    mean_train, _, _ = vae.encode(X_train_tensor)
    mean_test, _, _ = vae.encode(X_test_tensor)
    mean_val, _, _ = vae.encode(X_val_tensor)

# Converter para numpy
X_train_latent = mean_train.cpu().numpy()
X_test_latent = mean_test.cpu().numpy()
X_val_latent = mean_val.cpu().numpy()

In [13]:
# Normalizar as representações latentes
scaler = StandardScaler()
X_train_latent = scaler.fit_transform(X_train_latent)
X_test_latent = scaler.transform(X_test_latent)
X_val_latent = scaler.transform(X_val_latent)

# Treinar o Random Forest
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train_latent, y_train)

# Predição e avaliação
y_pred_val = rf_model.predict(X_val_latent)
accuracy_val = accuracy_score(y_val, y_pred_val)

y_pred_test = rf_model.predict(X_test_latent)
accuracy_test = accuracy_score(y_test, y_pred_test)

print(f'Acurácia Validação: {accuracy_val * 100:.2f}%')
print(f'Acurácia Teste: {accuracy_test * 100:.2f}%')

Acurácia Validação: 37.50%
Acurácia Teste: 50.00%


In [9]:
# Treinar o GradientBoostingClassifier
rf_model = GradientBoostingClassifier(n_estimators=250, learning_rate=0.1, max_depth=40, random_state=42)
rf_model.fit(X_train_latent, y_train)

# Predição e avaliação
y_pred_val = rf_model.predict(X_val_latent)
accuracy_val = accuracy_score(y_val, y_pred_val)

y_pred_test = rf_model.predict(X_test_latent)
accuracy_test = accuracy_score(y_test, y_pred_test)

print(f'Acurácia Validação: {accuracy_val * 100:.2f}%')
print(f'Acurácia Teste: {accuracy_test * 100:.2f}%')

Acurácia Validação: 37.50%
Acurácia Teste: 45.83%


In [10]:
svm_model = SVC(kernel='linear', random_state=42)  # Você pode experimentar diferentes kernels, como 'rbf' ou 'poly'
svm_model.fit(X_train_latent, y_train)

# Predição
y_pred_val = svm_model.predict(X_val_latent)
y_pred_test = svm_model.predict(X_test_latent)

accuracy_val = accuracy_score(y_val, y_pred_val)
accuracy_test = accuracy_score(y_test, y_pred_test)

print(f'Acurácia na Validação: {accuracy_val * 100:.2f}%')
print(f'Acurácia no Teste: {accuracy_test * 100:.2f}%')

Acurácia na Validação: 58.33%
Acurácia no Teste: 25.00%
