# Action Recognition Model 
**RNN 을 이용한 Action Recognition Model.**

...

In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchtext import data, datasets

In [2]:
# 하이퍼파라미터
BATCH_SIZE = 64
lr = 0.001
EPOCHS = 10
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("다음 기기로 학습합니다:", DEVICE)

다음 기기로 학습합니다: cpu


In [3]:
path="D:\\KHU_Desktop\\4-2\\Capstone Design2\\Project\\MS-G3D-master\\data\\ntu\\xsub\\"
data=np.load(path+"train_data_joint.npy")

In [4]:
data.shape

(40091, 3, 300, 25, 2)

In [5]:
label=np.load(path+"train_label.pkl.npy", allow_pickle=True)

In [6]:
names=label[0]
names

['S001C001P001R001A001.skeleton',
 'S001C001P001R001A002.skeleton',
 'S001C001P001R001A003.skeleton',
 'S001C001P001R001A004.skeleton',
 'S001C001P001R001A005.skeleton',
 'S001C001P001R001A006.skeleton',
 'S001C001P001R001A007.skeleton',
 'S001C001P001R001A008.skeleton',
 'S001C001P001R001A009.skeleton',
 'S001C001P001R001A010.skeleton',
 'S001C001P001R001A011.skeleton',
 'S001C001P001R001A012.skeleton',
 'S001C001P001R001A013.skeleton',
 'S001C001P001R001A014.skeleton',
 'S001C001P001R001A015.skeleton',
 'S001C001P001R001A016.skeleton',
 'S001C001P001R001A017.skeleton',
 'S001C001P001R001A018.skeleton',
 'S001C001P001R001A019.skeleton',
 'S001C001P001R001A020.skeleton',
 'S001C001P001R001A021.skeleton',
 'S001C001P001R001A022.skeleton',
 'S001C001P001R001A023.skeleton',
 'S001C001P001R001A024.skeleton',
 'S001C001P001R001A025.skeleton',
 'S001C001P001R001A026.skeleton',
 'S001C001P001R001A027.skeleton',
 'S001C001P001R001A028.skeleton',
 'S001C001P001R001A029.skeleton',
 'S001C001P001

In [7]:
label=label[1]

In [50]:
data_idx=[]
y_label=[]
cnt=0
for idx, i in enumerate(label):
    #print(idx)
    if i==23: #kicking sth
        cnt+=1
        #true
        data_idx.append(idx)
        y_label.append(torch.tensor([0]))
        #false
        data_idx.append(idx+np.random.randint(1,59-23))
        y_label.append(torch.tensor([1]))

In [55]:
test_data=np.load(path+"val_data_joint.npy")
test_label=np.load(path+"val_label.pkl.npy", allow_pickle=True)
test_label=test_label[1]

test_data_idx=[]
test_y_label=[]
for idx, i in enumerate(test_label):
    #print(idx)
    if i==23: #kicking sth
        #true
        test_data_idx.append(idx)
        test_y_label.append(torch.tensor([0]))
        #false
        test_data_idx.append(idx+np.random.randint(1,59-23))
        test_y_label.append(torch.tensor([1]))

In [None]:
# 데이터 로딩하기
print("데이터 로딩중...")
TEXT = data.Field(sequential=True, batch_first=True, lower=True)
LABEL = data.Field(sequential=False, batch_first=True)
trainset, testset = datasets.IMDB.splits(TEXT, LABEL)
TEXT.build_vocab(trainset, min_freq=5)
LABEL.build_vocab(trainset)

# 학습용 데이터를 학습셋 80% 검증셋 20% 로 나누기
trainset, valset = trainset.split(split_ratio=0.8)
train_iter, val_iter, test_iter = data.BucketIterator.splits(
        (trainset, valset, testset), batch_size=BATCH_SIZE,
        shuffle=True, repeat=False)


vocab_size = len(TEXT.vocab)
n_classes = 2

In [None]:
print("[학습셋]: %d [검증셋]: %d [테스트셋]: %d [단어수]: %d [클래스] %d"
      % (len(trainset),len(valset), len(testset), vocab_size, n_classes))

In [None]:
for b, batch in enumerate(train_iter):
    print(b, batch)

In [9]:
input_shape=(3*25)

In [None]:
#need to preprocess the data to sequence data

In [58]:
class LSTM(nn.Module):
    #def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p=0.2):
    def __init__(self, n_layers, hidden_dim, input_shape, n_classes, dropout_p=0.2):
        super(LSTM, self).__init__()
        print("Building Basic LSTM model...")
        self.n_layers = n_layers 
        #self.embed = nn.Embedding(n_vocab, embed_dim)
        self.hidden_dim = hidden_dim
        self.dropout = nn.Dropout(dropout_p)
        self.lstm = nn.LSTM(input_shape, self.hidden_dim,
                          num_layers=self.n_layers,
                          batch_first=True)
        self.out = nn.Linear(self.hidden_dim, n_classes)

    def forward(self, x):
        #x = self.embed(x)
        h_0 = self._init_state(batch_size=x.size(0))
        c_0 = self._init_state(batch_size=x.size(0))
        x, _ = self.lstm(x, (h_0, c_0))  # [i, b, h]
        h_t = x[:,-1,:]
        self.dropout(h_t)
        logit = self.out(h_t)  # [b, h] -> [b, o]
        return logit
    
    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data
        return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()


In [54]:
def train(model, optimizer, data_idx):
    model.train()
    for idx, i in enumerate(data_idx): #suppose no batch
        sample=data[i][...,0].transpose(1,0,2) #shape 300,3,25
        #print(sample.shape)
        
        x=[] #Devide 300frame into 10 sectors and choose 1 frame from each sector
        
        for i in range(10):
            x.append(sample[np.random.randint(i*10,(1+i)*10)].reshape(-1))
        
        X=torch.tensor(np.array(x)).unsqueeze(0).to(DEVICE)
        y=torch.tensor(y_label[idx]).to(DEVICE)
        optimizer.zero_grad()
        logit = model(X)
        loss = F.cross_entropy(logit, y)
        loss.backward()
        optimizer.step()
        
    """
def train(model, optimizer, train_iter):
    model.train()
    for b, batch in enumerate(train_iter):
        x, y = batch.text.to(DEVICE), batch.label.to(DEVICE)
        #y.data.sub_(1)  
        # 레이블 값을 0과 1로 변환
        optimizer.zero_grad()

        logit = model(x)
        loss = F.cross_entropy(logit, y)
        loss.backward()
        optimizer.step()
    """

In [56]:
def evaluate(model, val_iter):
    #evaluate model
    model.eval()
    corrects, total_loss = 0, 0
    for idx, i in enumerate(test_data_idx): #suppose no batch
        sample=test_data[i][...,0].transpose(1,0,2) #shape 300,3,25
        #print(sample.shape)
        
        x=[] #Devide 300frame into 10 sectors and choose 1 frame from each sector
        
        for i in range(10):
            x.append(sample[np.random.randint(i*10,(1+i)*10)].reshape(-1))
        
        X=torch.tensor(np.array(x)).unsqueeze(0).to(DEVICE)
        y=torch.tensor(test_y_label[idx]).to(DEVICE)
    
        logit = model(X)
        loss = F.cross_entropy(logit, y, reduction='sum')
        total_loss += loss.item()
        corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum()
    size = len(val_iter.dataset)
    avg_loss = total_loss / size
    avg_accuracy = 100.0 * corrects / size
    return avg_loss, avg_accuracy

    """"
def evaluate(model, val_iter):
    #evaluate model
    model.eval()
    corrects, total_loss = 0, 0
    for batch in val_iter:
        x, y = batch.text.to(DEVICE), batch.label.to(DEVICE)
        #y.data.sub_(1) # 레이블 값을 0과 1로 변환
        logit = model(x)
        loss = F.cross_entropy(logit, y, reduction='sum')
        total_loss += loss.item()
        corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum()
    size = len(val_iter.dataset)
    avg_loss = total_loss / size
    avg_accuracy = 100.0 * corrects / size
    return avg_loss, avg_accuracy
    """

In [60]:
model = LSTM(1, 256, input_shape, 2, 0.2).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

Building Basic LSTM model...


## 바로 아래 코드가 dataset iterator 만드는 방법이다. 
# 참고해서 나머지 코드도 고치자

In [80]:
#MAKING DATASET
import torch.utils.data as data_utils

new_test_data=[]
new_test_y_label=[]
for idx,i in enumerate(test_data_idx):
    new_test_data.append(np.array(test_data[i][...,0]).transpose(1,0,2).reshape(300,-1))
    new_test_y_label.append(test_y_label[idx])

In [82]:
test = data_utils.TensorDataset(torch.tensor(new_test_data), torch.tensor(new_test_y_label))
test_loader=data_utils.DataLoader(test, batch_size=1, shuffle=False)

In [79]:
print(np.array(new_test_data).shape, np.array(new_test_y_label).shape, np.array(test_data_idx).shape)

(552, 300, 75) (552,) (552,)


In [83]:
best_val_loss = None
for e in range(1, EPOCHS+1):
    train(model, optimizer, data_idx)
    val_loss, val_accuracy = evaluate(model, test_loader)

    print("[이폭: %d] 검증 오차:%5.2f | 검증 정확도:%5.2f" % (e, val_loss, val_accuracy))
    
    # 검증 오차가 가장 적은 최적의 모델을 저장
    if not best_val_loss or val_loss < best_val_loss:
        if not os.path.isdir("snapshot"):
            os.makedirs("snapshot")
        torch.save(model.state_dict(), './snapshot/txtclassification.pt')
        best_val_loss = val_loss

  del sys.path[0]
  from ipykernel import kernelapp as app


[이폭: 1] 검증 오차: 0.40 | 검증 정확도:82.43
[이폭: 2] 검증 오차: 0.30 | 검증 정확도:87.50
[이폭: 3] 검증 오차: 0.25 | 검증 정확도:89.67
[이폭: 4] 검증 오차: 0.31 | 검증 정확도:85.87
[이폭: 5] 검증 오차: 0.32 | 검증 정확도:86.59
[이폭: 6] 검증 오차: 0.24 | 검증 정확도:90.58
[이폭: 7] 검증 오차: 0.24 | 검증 정확도:91.12
[이폭: 8] 검증 오차: 0.22 | 검증 정확도:91.67
[이폭: 9] 검증 오차: 0.25 | 검증 정확도:90.04
[이폭: 10] 검증 오차: 0.20 | 검증 정확도:92.75


In [86]:
#model.load_state_dict(torch.load('./snapshot/txtclassification.pt'))
test_loss, test_acc = evaluate(model, test_loader)
print('테스트 오차: %5.2f | 테스트 정확도: %5.2f' % (test_loss, test_acc))

  from ipykernel import kernelapp as app


테스트 오차:  0.19 | 테스트 정확도: 92.03
