In [1]:
import os
import glob
from IPython.display import clear_output

import time
from time import sleep
import numpy as np
import pandas as pd
import zipfile
import matplotlib.pyplot as plt
from tqdm import tqdm
import math

from utils.MovingAverager import *
from collections import defaultdict

In [2]:
validation_traj = np.load(f'./training_traj_0.6.npy')
training_traj = np.load(f'./all_user_match_data.npy', allow_pickle=True)

In [3]:
train_data = training_traj.item()
pose_set = set()
for user_name in train_data.keys():
    for path_name in train_data[user_name].keys():
        for posture_name in train_data[user_name][path_name].keys():
            pose_set.add(posture_name)
print(pose_set)

{'pocket', 'swing', 'front_pocket', 'horizontal', 'target'}


In [4]:
posture_data = defaultdict(list)
train_data = training_traj.item()
for user_name in train_data.keys():
    for path_name in train_data[user_name].keys():
        for posture_name in train_data[user_name][path_name].keys():
            data = train_data[user_name][path_name][posture_name]
            pos = (data[:, [0, 1]] + np.array((1, 1))) * 0.6
            imu = data[:, [3, 4, 5, 6, 7, 8, 9, 10, 11]]
            mag = data[:, [12, 13, 14, 15]]

            pos_mag = np.concatenate((pos, imu, mag), axis=-1)
            posture_data[posture_name].append(pos_mag)

In [5]:
posture_data.keys()

dict_keys(['front_pocket', 'horizontal', 'pocket', 'swing', 'target'])

In [6]:
def split_traj(trajs, length=20):
    sp_traj = []
    for traj in trajs:
        for i in range(len(traj) - length + 1):
            sp_traj.append(traj[i:i+length])
    return np.array(sp_traj)

In [7]:
prep_training_traj = split_traj(posture_data['horizontal'])
prep_valid_traj = split_traj(validation_traj)

prep_training_traj.shape, prep_valid_traj.shape

((674, 20, 15), (310, 20, 6))

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score

In [9]:
class IMUDataset(Dataset):
    def __init__(self, data, coords_only=False):

        self.data = data
        self.coords_only = coords_only

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if self.coords_only:
            full_data = self.data[idx]  # [20, 2]
            coords = full_data[:, :2]
            mag = full_data[:, 2:]
            return coords, mag
        else:
            full_data = self.data[idx]  # [20, 11]
            coords = full_data[:, :2]  # 提取前兩列為座標
            imu_data = full_data[:, 2:11]  # 提取剩下的作為IMU數據
            mag = full_data[:, 11:]
            return coords, imu_data, mag


train_data = [torch.from_numpy(sample).float() for sample in prep_training_traj]
impl_data = [torch.from_numpy(sample).float() for sample in prep_valid_traj]

# 創建數據集對象
train_dataset = IMUDataset(train_data)
impl_dataset = IMUDataset(impl_data, coords_only=True)

# 創建DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
impl_loader = DataLoader(impl_dataset, batch_size=32, shuffle=False)

In [10]:
device = torch.device("cuda" if (torch.cuda.is_available()) else "cpu")

In [11]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.lstm1 = nn.LSTM(input_size=2, hidden_size=16, num_layers=1, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=16, hidden_size=64, num_layers=1, batch_first=True)
        self.lstm3 = nn.LSTM(input_size=64, hidden_size=128, num_layers=1, batch_first=True)
        self.batchnorm = nn.BatchNorm1d(128)
        self.fc = nn.Linear(128, 9)  # 假设输出3个IMU读数

    def forward(self, coords):
        # 首先，通过样式映射器处理坐标
        # 通过生成器主体LSTM处理结合后的输入
        output, (h1, c1) = self.lstm1(coords)  # 捕获第一层的隐藏状态和单元状态
        output, (h2, c2) = self.lstm2(output)  # 使用第一层的状态作为第二层的初始状态
        output, (h3, c3) = self.lstm3(output)
        output = self.batchnorm(output.transpose(1, 2)).transpose(1, 2)
        imu_data = self.fc(output)
        return imu_data

# 鉴别器定义保持不变
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.lstm = nn.LSTM(input_size=9, hidden_size=64, num_layers=1, batch_first=True)
        self.fc = nn.Linear(64, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, imu_data):
        _, (hidden, _) = self.lstm(imu_data)
        output = self.fc(hidden.squeeze(0))
        return self.sigmoid(output)

# 使用单个模型实例和相同的训练流程
generator = Generator().to(device)
discriminator = Discriminator().to(device)

In [12]:
def critic_loss(critic, real_data, fake_data, conditions):
    real_scores = critic(real_data, conditions)
    fake_scores = critic(fake_data, conditions)
    return fake_scores.mean() - real_scores.mean()

def generator_loss(critic, fake_data, conditions):
    fake_scores = critic(fake_data, conditions)
    return -fake_scores.mean()

def compute_gradient_penalty(critic, real_data, fake_data, conditions, device):
    alpha = torch.rand(real_data.size(0), 1, device=device)
    alpha = alpha.expand(real_data.size())
    interpolates = alpha * real_data + (1 - alpha) * fake_data
    interpolates = interpolates.requires_grad_(True)
    conditions = conditions.requires_grad_(True)
    d_interpolates = critic(interpolates, conditions)
    gradients = torch.autograd.grad(outputs=d_interpolates, inputs=[interpolates, conditions],
                                    grad_outputs=torch.ones(d_interpolates.size(), device=device),
                                    create_graph=True, retain_graph=True, only_inputs=True)[0]
    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty


In [13]:
losses_G = []
losses_D = []

def evaluate_generator(generator, discriminator, data_loader):
    generator.eval()
    discriminator.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for coords, _ in data_loader:
            coords = coords.to(device)
            fake_imu_data = generator(coords)
            outputs = discriminator(fake_imu_data).view(-1)
            predicted = outputs.round()  # 假设阈值为0.5
            total += coords.size(0)
            correct += (predicted == 0).sum().item()  # 假设假数据标签为0
    return 100 * correct / total

In [15]:
# 设置优化器
optimizer_G = optim.Adam(generator.parameters(), lr=0.0005, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.00005, betas=(0.5, 0.9))
# 损失函数
criterion = nn.BCELoss()
accuracy = []
# 训练参数
num_epochs = 10000
real_label = 0.9
fake_label = 0.1

# 训练过程
for epoch in range(num_epochs):
    generator.train()
    discriminator.train()
    
    G_losses = []
    D_losses_r = []
    D_losses_f = []
    D_losses = []
    for i, (coords, real_imu_data, _) in enumerate(train_loader):
        coords, real_imu_data = coords.to(device), real_imu_data.to(device)
        batch_size = coords.size(0)
        label_real = torch.full((batch_size,), real_label, dtype=torch.float, device=coords.device)
        label_fake = torch.full((batch_size,), fake_label, dtype=torch.float, device=coords.device)
        
        # 训练鉴别器
        discriminator.zero_grad()
        output_real = discriminator(real_imu_data).view(-1)
        loss_D_real = criterion(output_real, label_real)

        fake_imu_data = generator(coords)
        output_fake = discriminator(fake_imu_data.detach()).view(-1)
        loss_D_fake = criterion(output_fake, label_fake)
        loss_D = (loss_D_real + loss_D_fake) / 2
        loss_D.backward()
        if epoch % 5 == 0:
            optimizer_D.step()

        # 训练生成器
        generator.zero_grad()
        output_fake = discriminator(fake_imu_data).view(-1)
        loss_G = criterion(output_fake, label_real)
        loss_G.backward()
        optimizer_G.step()

        G_losses.append(loss_G.item())
        D_losses_r.append(loss_D_real.item())
        D_losses_f.append(loss_D_fake.item())
        D_losses.append(loss_D.item())

    # 每个epoch结束时评估
    G_loss, D_loss = np.mean(G_losses), np.mean(D_losses)
    D_loss_r, D_loss_f = np.mean(D_losses_r), np.mean(D_losses_f)
    losses_G.append(G_loss)
    losses_D.append(D_loss)
    clear_output(wait=True)
    accuracy.append(evaluate_generator(generator, discriminator, impl_loader))
    if (epoch + 1) % 10 == 0: 
        print(f'Epoch {epoch+1:>5d}: Generative loss: {G_loss:>6.3f}, Discrimination loss: {D_loss_r:>6.3f} {D_loss_f:>6.3f}, implement acc: {accuracy[-1]:>6.3f}%')

        # 绘制损失曲线
        plt.figure(figsize=(15, 10))
        plt.subplot(2, 1, 1)
        plt.plot(losses_G, label='Generator Loss')
        plt.plot(losses_D, label='Discriminator Loss')
        plt.title('Training Losses')
        plt.xlabel('Epoch Number')
        plt.ylabel('Loss')
        plt.legend()
        
        plt.subplot(2, 1, 2)
        plt.plot(accuracy, label='Impl Accuracy', color='green')
        plt.title('Discriminator Accuracy')
        plt.xlabel('Epoch Number')
        plt.ylabel('Accuracy (%)')
        plt.xlim(max(0, epoch - 100), epoch)
        plt.ylim(-3, 103)
        plt.legend()
        plt.show()

KeyboardInterrupt: 