In [None]:
!pip install tensorboard

In [None]:
import datetime
import os
from torch.utils.tensorboard import SummaryWriter

import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt

import torchvision.datasets as datasets
import torchvision.transforms as transforms
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import seaborn as sns
import torch
import torch.nn as nn
import torch.utils.checkpoint as checkpoint
from torchvision import models
from timm.models.layers import DropPath, to_2tuple, trunc_normal_
import torch.nn.functional as F
#from einops import rearrange, repeat
#from einops.layers.torch import Rearrange
import math
import numpy as np
import time
from torch import einsum
import cv2
import scipy.misc
#import utils
import tqdm
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, f1_score, recall_score, precision_score
from sklearn.metrics import balanced_accuracy_score
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
import pandas as pd

#file_path = "../simulate_data/squat_data.csv"
file_path = "sim_squat_data.csv"

df = pd.read_csv(file_path)

print(df.head())

In [None]:
df = df.drop(columns=['vel_x', 'vel_y', 'vel_z'])

In [None]:
tmp = df['updown'].shift(1).fillna(0)
squat_ends = df[(tmp == 1) & (df['updown'] == 0)].index
print(f"Total squat ends: {len(squat_ends)}")

squat_sets = []

start_index = 0
for end_index in squat_ends:
    one_set = df.iloc[start_index:end_index]

    squat_sets.append(one_set)

    #squat_sets.append(one_set)
    start_index = end_index + 1

print(f"Total sets: {len(squat_sets)}")

In [None]:
# 한 sample당 15frames (현재 포함 과거 10개, 미래 5개), stride 5
X = []
y = []
for one_set in squat_sets:
    for i in range(9, len(one_set) - 15, 5):
        X.append(one_set.iloc[i-9:i+6, 1:].values)
        y.append(one_set.iloc[i]['updown'])

X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

In [None]:
import torch

full_dataset = torch.utils.data.TensorDataset(
    torch.tensor(X, dtype=torch.float32),
    torch.tensor(y, dtype=torch.long)
)

In [None]:
import json

with open("index_dict.json", "r") as f:
    index_dict = json.load(f)

train_idx = index_dict["train_idx"]
val_idx = index_dict["val_idx"]
test_idx = index_dict["test_idx"]

In [None]:
from torch.utils.data import Subset

train_dataset = Subset(full_dataset, train_idx)
val_dataset = Subset(full_dataset, val_idx)
test_dataset = Subset(full_dataset, test_idx)

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
class biGRU(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length, laten_size):
        super(biGRU, self).__init__()
        self.num_classes = num_classes
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.seq_length = seq_length

        self.gru = nn.GRU(input_size=input_size,hidden_size=hidden_size,
                         num_layers=num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size*2, 1)
        self.fc_latent = nn.Linear(hidden_size*2, laten_size)

    def forward(self, x):
        h_0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)
        #h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h_0)
        content_out = self.fc_latent(out[:, 10, :])
        dis_out = self.fc(out[:, 10, :]) # 과거 10개, 현재 5개
        #out = self.fc(out[:, -1, :]) # 과거 15개개

        return content_out, dis_out

In [None]:
num_classes = 2
input_size = 7
hidden_size = 128
num_layers = 2
seq_length = 15
latent_size = 4

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import torch.nn.init as init
import random

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def initialize_weights(m):
    if isinstance(m, nn.Linear):
        init.xavier_uniform_(m.weight)
        if m.bias is not None:
            init.constant_(m.bias, 0)
    elif isinstance(m, (nn.BatchNorm1d, nn.BatchNorm2d)):
        init.constant_(m.weight, 1)
        init.constant_(m.bias, 0)

def initialize_gru_weights(gru):
    for name, param in gru.named_parameters():
        if 'weight' in name:
            init.xavier_uniform_(param.data)
        elif 'bias' in name:
            init.constant_(param.data, 0)

# 초기화 실행
set_seed(42)
model = biGRU(num_classes, input_size, hidden_size, num_layers, seq_length, latent_size).to(device)
model.apply(initialize_weights)
initialize_gru_weights(model.gru)

In [None]:
print(model)

In [None]:
num_classes = 2
input_size = 7
hidden_size = 128
num_layers = 2
seq_length = 15

criterion = nn.BCEWithLogitsLoss()
learning_rate = 0.0001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

num_epochs = 50

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

In [None]:
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in tqdm(train_loader, desc="Training", leave=False):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        _, outputs = model(inputs)
        preds = (outputs > 0.5).float()
        labels = labels.unsqueeze(1).float()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        #_, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()

    epoch_loss = running_loss / len(train_loader.dataset)
    accuracy = 100 * correct / total
    print(f"Train Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.2f}%")
    train_losses.append(epoch_loss)
    train_accuracies.append(accuracy)

In [None]:
def evaluate(model, data_loader, criterion, device, phase="validation"):
    global best_val_loss
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for inputs, labels in tqdm(data_loader, desc=f"Evaluating {phase}", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)

            _, outputs = model(inputs)
            preds = (outputs > 0.5).float()
            labels = labels.unsqueeze(1).float()
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            #_, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(preds.cpu().numpy())

    epoch_loss = running_loss / len(data_loader.dataset)
    accuracy = 100 * correct / total
    print(f"{phase.capitalize()} Loss: {epoch_loss:.4f}, {phase.capitalize()} Accuracy: {accuracy:.2f}%")
    if phase == "validation":
        val_losses.append(epoch_loss)
        val_accuracies.append(accuracy)
        if epoch_loss < best_val_loss:
            best_val_loss = epoch_loss
            torch.save(model.state_dict(), "sim_best_model.pth")
            print("Best model saved!")
    if phase == "test":
        test_accuracy = accuracy
        print(f"Test Accuracy: {test_accuracy:.2f}%")

        f1_score_result = f1_score(all_labels, all_predictions)
        recall_score_result = recall_score(all_labels, all_predictions)
        precision_score_result = precision_score(all_labels, all_predictions)

        return f1_score_result, recall_score_result, precision_score_result, all_labels, all_predictions

In [None]:
best_val_loss = float('inf')
for epoch in range(num_epochs):
    print(f"Epoch [{epoch+1}/{num_epochs}]")
    train(model, train_loader, criterion, optimizer, device)
    evaluate(model, val_loader, criterion, device, phase="validation")

In [None]:
model.load_state_dict(torch.load("sim_best_model.pth"))
model = model.to(device)
f1_score_result, recall_score_result, precision_score_result, all_labels, all_predictions = evaluate(model, test_loader, criterion, device, phase="test")

In [None]:
print(f"F1 Score: {f1_score_result:.4f}")
print(f"Recall Score: {recall_score_result:.4f}")
print(f"Precision Score: {precision_score_result:.4f}")

In [None]:
import matplotlib.pyplot as plt

epochs = range(1, num_epochs + 1)

plt.figure(figsize=(12, 5))

# Plot training and validation loss}
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label="Train Loss")
plt.plot(epochs, val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training and Validation Loss")
plt.legend()

# Plot training and validation accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label="Train Accuracy")
plt.plot(epochs, val_accuracies, label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Training and Validation Accuracy")
plt.legend()

plt.tight_layout()
plt.show()

Style encoder & decoder

In [None]:
class style_encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, latent_size):
        super(style_encoder, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            bidirectional=True, batch_first=True)
        self.fc_mu = nn.Linear(hidden_size * 2, latent_size)
        self.var = nn.Linear(hidden_size * 2, latent_size)

        self.hidden_size = hidden_size
        self.num_layers = num_layers

    def forward(self, x):
        h_0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        c_0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)

        out, (hidden, cell) = self.lstm(x, (h_0, c_0))
        style_frame = out[:, 10, :]

        mu = self.fc_mu(style_frame)
        log_var = self.var(style_frame)

        return mu, log_var

In [None]:
class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, latent_size):
        super(Decoder, self).__init__()
        self.latent_to_hidden = nn.Linear(latent_size, hidden_size)
        self.latent_to_input = nn.Linear(latent_size, input_size)
        self.lstm_cell = nn.LSTMCell(input_size, hidden_size)
        self.output_layer = nn.Linear(hidden_size, input_size)

        self.seq_len = 15  # Length of the sequence to generate

    def forward(self, z):
        h = self.latent_to_hidden(z)
        c = torch.zeros_like(h)

        x = self.latent_to_input(z)

        outputs = []
        for _ in range(self.seq_len):
            h, c = self.lstm_cell(x, (h, c))
            out = self.output_layer(h)
            outputs.append(out.unsqueeze(1))
            x = out  # Use the output as the next input

        recon_seq = torch.cat(outputs, dim=1)
        return recon_seq

In [None]:
class Model(nn.Module):
    def __init__(self, style_encoder, decoder):
        super(Model, self).__init__()
        self.style_encoder = style_encoder
        #self.content_encoder = content_encoder
        self.decoder = decoder

        self.alpha = nn.Parameter(torch.tensor([0.0, 0.0]))

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + std * eps

    def forward(self, x, z_c):
        #z_c, pred = self.content_encoder(x)
        mu, logvar = self.style_encoder(x)
        z_s = self.reparameterize(mu, logvar)

        weights = F.softmax(self.alpha, dim=0)
        alpha_c, alpha_s = weights

        z_c = alpha_c * z_c
        z_s = alpha_s * z_s

        z = torch.cat((z_c, z_s), dim=1)  # Concatenate content and style latent vectors
        recon_x = self.decoder(z)
        return recon_x, mu, logvar

In [None]:
class MineNetwork(nn.Module):
    def __init__(self, content_dim, style_dim, hidden_dim=128):
        super(MineNetwork, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(content_dim + style_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, z_c, z_s):
        # z_c, z_s shape: [batch_size, dim]
        joint_input = torch.cat([z_c, z_s], dim=1)
        return self.model(joint_input)

In [None]:
def mine_loss(mine_net, z_c, z_s):

    # joint distribution (same sample pairs)
    joint_scores = mine_net(z_c, z_s)

    # marginal distribution (shuffle z_s to break correlation)
    z_s_shuffled = z_s[torch.randperm(z_s.size(0))]
    marginal_scores = mine_net(z_c, z_s_shuffled)

    # MINE loss = -(E[joint] - log(E[exp(marginal)]))
    loss = -(torch.mean(joint_scores) - torch.log(torch.mean(torch.exp(marginal_scores))))
    return loss

In [None]:
mine_net = MineNetwork(4, 4).to(device)

In [None]:
input_dim = 7
vae_hidden_dim = 64
content_hidden_dim = 128
num_layers = 2
seq_length = 15
encoded_space_dim = 4

style_encoder = style_encoder(input_dim, vae_hidden_dim,
                              num_layers, encoded_space_dim).to(device)
decoder = Decoder(input_dim, vae_hidden_dim, num_layers, encoded_space_dim*2).to(device)

VAE_model = Model(style_encoder, decoder).to(device)

optimizer = torch.optim.Adam(VAE_model.parameters(), lr=1e-3)
#loss = nn.MSELoss()

for param in model.parameters():
    param.requires_grad = False

VAE_model.apply(initialize_weights)
mine_net.apply(initialize_weights)

In [None]:
print(VAE_model)

Discriminator base loss function

In [None]:
class discriminator(nn.Module):
    def __init__(self, input_dim, hidden_size, seq_length, num_layers):
        super(discriminator, self).__init__()

        self.gru = nn.GRU(input_size=input_dim,hidden_size=hidden_size,
                         num_layers=num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size*2, 1)

        self.hidden_size = hidden_size
        self.seq_length = seq_length

    def forward(self, x):
        h_0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h_0)
        out = self.fc(out[:, 10, :])

In [None]:
def loss_function(x, x_hat, mean, var, D_fake):
    #reproduction_loss =  nn.functional.mse_loss(x_hat, x, reduction='sum')
    recon_loss = -torch.mean(torch.log(D_fake + 1e-8))

    KLD = -0.5 * torch.sum(1 + var - mean.pow(2) - var.exp()) / x.size(0)
    return recon_loss, KLD

In [None]:
saved_loc = 'scalar/'
writer = SummaryWriter(saved_loc)
discriminator = discriminator(input_dim=input_dim, hidden_size=hidden_size,
                              seq_length=seq_length, num_layers=num_layers)

#model.train()

def train(epoch, model, VAE_model, train_loader, optimizer):
    train_loss = 0
    model.eval()
    VAE_model.train()
    mine_net.train()

    for batch_idx, (x, _) in enumerate(train_loader):
        x = x.to(device)  # [B, T, D]

        optimizer.zero_grad()

        z_c, _ = model(x)
        x_hat, mu, logvar = VAE_model(x, z_c)  # model: LSTM VAE (Encoder + Decoder)

        # reconstruction loss + KL divergence
        d_real = discriminator(x)
        d_fake = discriminator(x_hat)

        recon_loss, kld = loss_function(x, x_hat, mu, logvar, d_fake)
        mine_loss_value = mine_loss(mine_net, z_c, mu)
        loss = recon_loss + kld + mine_loss_value * 0.1

        # Logging
        step = batch_idx + epoch * len(train_loader)
        writer.add_scalar("Train/Reconstruction_loss", recon_loss.item(), step)
        writer.add_scalar("Train/KLD", kld.item(), step)
        writer.add_scalar("Train/Total_loss", loss.item(), step)
        writer.add_scalar("Train/Mine_loss", mine_loss_value.item(), step)

        train_loss += loss.item()
        loss.backward()
        optimizer.step()

        #dicriminator
        d_real = discriminator(x)
        d_fake = discriminator(x_hat)

        d_loss_real = -torch.mean(torch.log(d_real + 1e-8))
        d_loss_fake = -torch.mean(torch.log(1 - d_fake + 1e-8))
        d_loss = d_loss_real + d_loss_fake

        if batch_idx % 100 == 0:
            print(f'Train epoch: {epoch} [{batch_idx * len(x)}/{len(train_loader.dataset)}] '
                  f'Loss: {loss.item()/len(x):.6f}')

    print(f'====> Epoch: {epoch} Average loss: {train_loss / len(train_loader.dataset):.4f}')