In [2]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import torch
import torch.nn as nn
import torch.optim as optim

In [16]:
# prompt 전체 출력되게
np.set_printoptions(threshold=np.inf, linewidth=np.inf)

In [5]:
# 데이터 로드드
data = np.load('../data/seq1~3000_60fps_1741667844.npy')
data.shape

### label - idx mapping정보 가져오기
import pickle
with open('../data/label_to_idx.pickle', 'rb') as f:
    label_to_idx = pickle.load(f)
print(label_to_idx)

{'0': 0, '1': 1, '10': 2, '100': 3, '1000': 4, '10000': 5, '11': 6, '112': 7, '119': 8, '12': 9, '13': 10, '14': 11, '15': 12, '16': 13, '17': 14, '18': 15, '19': 16, '2': 17, '20': 18, '21': 19, '22': 20, '23': 21, '24': 22, '25': 23, '26': 24, '27': 25, '28': 26, '29': 27, '3': 28, '30': 29, '31': 30, '32': 31, '33': 32, '34': 33, '35': 34, '36': 35, '37': 36, '38': 37, '39': 38, '4': 39, '40': 40, '41': 41, '42': 42, '43': 43, '44': 44, '45': 45, '46': 46, '47': 47, '48': 48, '49': 49, '5': 50, '50': 51, '51': 52, '52': 53, '53': 54, '54': 55, '55': 56, '56': 57, '57': 58, '58': 59, '59': 60, '6': 61, '60': 62, '61': 63, '62': 64, '63': 65, '64': 66, '65': 67, '66': 68, '67': 69, '68': 70, '69': 71, '7': 72, '70': 73, '71': 74, '72': 75, '73': 76, '74': 77, '75': 78, '76': 79, '77': 80, '78': 81, '79': 82, '8': 83, '80': 84, '81': 85, '82': 86, '83': 87, '84': 88, '85': 89, '86': 90, '87': 91, '88': 92, '89': 93, '9': 94, '90': 95, '91': 96, '92': 97, '93': 98, '94': 99, '95': 100, 

In [6]:
# 라벨 값 분리
x_data = data[:,:,:-1]
labels = data[:, 0, -1]

print(x_data.shape)
print(labels.shape)

(159020, 60, 198)
(159020,)


In [7]:
# train test data split
from sklearn.model_selection import train_test_split
x_data = x_data.astype(np.float32)
y_data = labels.astype(np.float32)

x_train, x_val, y_train, y_val = train_test_split(x_data, y_data, test_size = 0.1, random_state=42)

print(x_train.shape, y_train.shape)
print(x_val.shape, y_val.shape)


(143118, 60, 198) (143118,)
(15902, 60, 198) (15902,)


In [8]:
# 네트워크 변수 설정

batch_size = 64
seq_len = 60
num_angles = 198
num_classes = len(label_to_idx)

In [9]:
# 데이터 로더
from torch.utils.data import DataLoader, Dataset


class NumpyToTensorDataset(Dataset):
    def __init__(self, x, y):
        self.x = x  # numpy 배열
        self.y = y  # numpy 배열

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        x_item = torch.tensor(self.x[idx], dtype=torch.float32)  # numpy → tensor 변환
        y_item = torch.tensor(self.y[idx], dtype=torch.long)  # numpy → tensor 변환
        return x_item, y_item

train_dataset = NumpyToTensorDataset(x_train, y_train)
test_dataset = NumpyToTensorDataset(x_val, y_val)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1)

In [10]:
# 모델 초기화

class Transformer(nn.Module):
    def __init__(self, num_angles, num_classes, d_model=128, num_heads=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.embedding = nn.Linear(num_angles, d_model)# 각 프레임의 앵글 값을 d_model 차원으로 변환
        self.pos_encoder = nn.Parameter(torch.zeros(1,seq_len, d_model)) # 위치 인코딩
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=num_heads, dropout=dropout) 
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(d_model, num_classes) 

    def forward(self, x):
        x = self.embedding(x) + self.pos_encoder # B, T, d_model
        x = self.transformer_encoder(x) # B,T, d_model
        x = x.mean(dim=1) # 전체 시퀀스에 대한 평균 (B, d_model)
        return self.fc(x)  # (B, num_classes)
    




In [12]:
import torch.optim.lr_scheduler as lr_scheduler

model = Transformer(num_angles=num_angles, num_classes=num_classes)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)

cpu


In [None]:


best =0
# 간단한 학습 과정
epochs = 200
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for x, y in train_dataloader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

    test_loss = 0
    val_accuracy =0
    with torch.no_grad():
        model.eval()
        correct = 0
        total = 0
        acc = []
        for x, y in test_dataloader:
            x, y = x.to(device), y.to(device)

            outputs = model(x)
            _,predicted = torch.max(outputs.data, 1)
            total += y.size(0)
            correct += (predicted == y).sum().item()
            test_loss += criterion(outputs, y).item()
            acc.append(100*correct/total)
            val_accuracy = 100*correct/total
    test_loss = test_loss/total
    scheduler.step(test_loss)


        
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
    print('\t [Test] : Average loss : {:.4f}, Accuracy : {}/{}({:.0f}%)\n'
                      .format(test_loss,correct,total,val_accuracy))
    
    if val_accuracy > best:
        best = val_accuracy
        torch.save(model.state_dict(), "model/best_model.pth")
        

In [15]:
# 확인
from sklearn.metrics import confusion_matrix

y_true = []
y_pred = []

loaded_model = Transformer(num_angles=num_angles, num_classes=num_classes)
loaded_model.load_state_dict(torch.load("../model/transformer_60fps.pth", map_location=torch.device('cpu')))
test_loss = 0
with torch.no_grad():
    loaded_model.eval()
    correct = 0
    total = 0
    acc = []
    for x, y in test_dataloader:
        x, y = x.to(device), y.to(device)

        outputs = loaded_model(x)
        _,predicted = torch.max(outputs.data, 1)
        y_true.append(y)
        y_pred.append(predicted)
        total += y.size(0)
        correct += (predicted == y).sum().item()
        test_loss += criterion(outputs, y).item()
        acc.append(100*correct/total)
        val_accuracy = 100*correct/total
    
    test_loss = test_loss/total
    print('\t [Test] : Average loss : {:.4f}, Accuracy : {}/{}({:.0f}%)\n'
                    .format(test_loss,correct,total,val_accuracy))
    
    y_true = torch.cat(y_true, dim=0)
    y_pred = torch.cat(y_pred, dim=0)

    cm = confusion_matrix(y_true.numpy(), y_pred.numpy())
    print(cm)


  y_item = torch.tensor(self.y[idx], dtype=torch.long)  # numpy → tensor 변환


	 [Test] : Average loss : 1.0147, Accuracy : 12784/15902(80%)

[[29  0  0 ...  0  0  0]
 [ 0 17  0 ...  0  0  0]
 [ 0  0 46 ...  0  0  0]
 ...
 [ 0  0  0 ...  7  0  0]
 [ 0  0  0 ...  0 11  0]
 [ 0  0  0 ...  0  0 32]]


In [None]:
print(cm) 

[[29  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0