### 데이터 로드

In [74]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F

In [75]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [76]:
seed = 123
# Numpy
np.random.seed(seed)
# Pytorch
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms = True

In [77]:
data = np.load("animal_data/data.npy")
label = np.load("animal_data/label.npy")


In [78]:
print(data.shape)
print(label.shape)

(38494, 3, 60, 15)
(38494,)


In [79]:
print(label[:20])

[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [80]:
print(data.shape[0]*0.6)
print(data.shape[0]*0.8)
print(round(data.shape[0]*0.6))
print(round(data.shape[0]*0.8))

23096.399999999998
30795.2
23096
30795


In [81]:
train_data = data[:round(data.shape[0]*0.6)]
val_data = data[round(data.shape[0]*0.6):round(data.shape[0]*0.8)]
test_data = data[round(data.shape[0]*0.8):]

In [82]:
train_label = label[:round(label.shape[0]*0.6)]
val_label = label[round(label.shape[0]*0.6):round(label.shape[0]*0.8)]
test_label = label[round(label.shape[0]*0.8):]

In [83]:
np.save('animal_data/train_data',train_data)
np.save('animal_data/val_data',val_data)
np.save('animal_data/test_data',test_data)

In [84]:
np.save('animal_data/train_label',train_label)
np.save('animal_data/val_label',val_label)
np.save('animal_data/test_label',test_label)

In [85]:
train_data = np.load("animal_data/train_data.npy")
val_data = np.load("animal_data/val_data.npy")
test_data = np.load("animal_data/test_data.npy")

In [86]:
train_label = np.load("animal_data/train_label.npy")
val_label = np.load("animal_data/val_label.npy")
test_label = np.load("animal_data/test_label.npy")

In [87]:
print(train_data.shape)
print(val_data.shape)
print(test_data.shape)

(23096, 3, 60, 15)
(7699, 3, 60, 15)
(7699, 3, 60, 15)


In [88]:
print(train_label.shape)
print(val_label.shape)
print(test_label.shape)

(23096,)
(7699,)
(7699,)


### model

In [89]:
#데이터 읽기
class Feeder(torch.utils.data.Dataset):
  def __init__(self, data_path, label_path):
      super().__init__()
      self.label = np.load(label_path)
      self.data = np.load(data_path)

  def __len__(self):
      return len(self.label)

  def __iter__(self):
      return self

  def __getitem__(self, index):
      data = np.array(self.data[index])
      label = self.label[index]

      return data, label

In [90]:
#연결 관계를 정의하고 그래프로 만듦
#연결 관계를 배열로 만든 후 인접 행렬 생성 

class Graph():
  def __init__(self, hop_size):
    #엣지 배열 선언 
    self.get_edge()

    #hop: 노드 간의 연결을 나타내는 단위거리 
    #hop = 2이면 손목을 팔꿈치와 어깨와 연결
    self.hop_size = hop_size 
    self.hop_dis = self.get_hop_distance(self.num_node, self.edge, hop_size=hop_size)
    #hop 수마다 인접행렬 생성
    self.get_adjacency() 

  def __str__(self):
    return self.A

  def get_edge(self):
    self.num_node = 15
    self_link = [(i, i) for i in range(self.num_node)]
    neighbor_base = [
          (0, 1),  (0,3), (2,3), (3, 4), (4, 5),
          (4, 6), (5, 7), (6, 8), (4, 13), (13, 9), 
          (13, 10), (13,14), (9, 11), (10,12)
    ]
    neighbor_link = [(i - 1, j - 1) for (i, j) in neighbor_base]
    self.edge = self_link + neighbor_link


  def get_adjacency(self):
    valid_hop = range(0, self.hop_size + 1, 1)
    adjacency = np.zeros((self.num_node, self.num_node))
    for hop in valid_hop:
        adjacency[self.hop_dis == hop] = 1
    normalize_adjacency = self.normalize_digraph(adjacency)
    A = np.zeros((len(valid_hop), self.num_node, self.num_node))
    for i, hop in enumerate(valid_hop):
        A[i][self.hop_dis == hop] = normalize_adjacency[self.hop_dis == hop]
    self.A = A

  def get_hop_distance(self, num_node, edge, hop_size):
    A = np.zeros((num_node, num_node))
    for i, j in edge:
        A[j, i] = 1
        A[i, j] = 1
    hop_dis = np.zeros((num_node, num_node)) + np.inf
    transfer_mat = [np.linalg.matrix_power(A, d) for d in range(hop_size + 1)]
    arrive_mat = (np.stack(transfer_mat) > 0)
    for d in range(hop_size, -1, -1):
        hop_dis[arrive_mat[d]] = d
    return hop_dis

  def normalize_digraph(self, A):
    Dl = np.sum(A, 0)
    num_node = A.shape[0]
    Dn = np.zeros((num_node, num_node))
    for i in range(num_node):
        if Dl[i] > 0:
            Dn[i, i] = Dl[i]**(-1)
    DAD = np.dot(A, Dn)
    return DAD

### Spatial Graph

In [91]:
class SpatialGraphConvolution(nn.Module):
  def __init__(self, in_channels, out_channels, s_kernel_size):
    super().__init__()
    self.s_kernel_size = s_kernel_size
    self.conv = nn.Conv2d(in_channels=in_channels,
                          out_channels=out_channels * s_kernel_size,
                          kernel_size=1)
    
  def forward(self, x, A):
    x = self.conv(x)
    n, kc, t, v = x.size()
    x = x.view(n, self.s_kernel_size, kc//self.s_kernel_size, t, v)
    #인접 행렬에 graph convolution 진행 후 특징 더해줌 
    x = torch.einsum('nkctv,kvw->nctw', (x, A))
    return x.contiguous()

In [92]:
#공간, 시간 그래프를 번갈아 수행하기 위해 

class STGC_block(nn.Module):
  def __init__(self, in_channels, out_channels, stride, t_kernel_size, A_size, dropout=0.5):
    super().__init__()
    # 공간 그래프 convolution
    self.sgc = SpatialGraphConvolution(in_channels=in_channels,
                                       out_channels=out_channels,
                                       s_kernel_size=A_size[0])
    
    # M: weight matirx -> 가장자리에 가중치 부여 
    self.M = nn.Parameter(torch.ones(A_size))

    self.tgc = nn.Sequential(nn.BatchNorm2d(out_channels),
                            nn.ReLU(),
                            nn.Dropout(dropout),
                            nn.Conv2d(out_channels,
                                      out_channels,
                                      (t_kernel_size, 1),
                                      (stride, 1),
                                      ((t_kernel_size - 1) // 2, 0)),
                            nn.BatchNorm2d(out_channels),
                            nn.ReLU())

  def forward(self, x, A):
    x = self.tgc(self.sgc(x, A * self.M))
    return x

In [93]:
#네트워크 모델
class ST_GCN(nn.Module):
  def __init__(self, num_classes, in_channels, t_kernel_size, hop_size):
    super().__init__()
    # 그래프 작성 
    graph = Graph(hop_size)
    A = torch.tensor(graph.A, dtype=torch.float32, requires_grad=False)
    self.register_buffer('A', A)
    A_size = A.size()
  
    # Batch Normalization
    self.bn = nn.BatchNorm1d(in_channels * A_size[1])
    
    # STGC_blocks
    self.stgc1 = STGC_block(in_channels, 32, 1, t_kernel_size, A_size)
    self.stgc2 = STGC_block(32, 32, 1, t_kernel_size, A_size)
    self.stgc3 = STGC_block(32, 32, 1, t_kernel_size, A_size)
    self.stgc4 = STGC_block(32, 64, 2, t_kernel_size, A_size)
    self.stgc5 = STGC_block(64, 64, 1, t_kernel_size, A_size)
    self.stgc6 = STGC_block(64, 64, 1, t_kernel_size, A_size)

    # Prediction
    self.fc = nn.Conv2d(64, num_classes, kernel_size=1)

  def forward(self, x):
    # Batch Normalization
    N, C, T, V = x.size() # batch, channel, frame, node
    x = x.permute(0, 3, 1, 2).contiguous().view(N, V * C, T)
    x = x.float().to(device)
    x = self.bn(x)
    x = x.view(N, V, C, T).permute(0, 2, 3, 1).contiguous()

    # STGC_blocks
    x = self.stgc1(x, self.A)
    x = self.stgc2(x, self.A)
    x = self.stgc3(x, self.A)
    x = self.stgc4(x, self.A)
    x = self.stgc5(x, self.A)
    x = self.stgc6(x, self.A)

    # Prediction
    x = F.avg_pool2d(x, x.size()[2:])
    x = x.view(N, -1, 1, 1)
    x = self.fc(x)
    x = x.view(x.size(0), -1)
    return x

### 학습

In [99]:
#모델 학습
NUM_EPOCH = 300
BATCH_SIZE = 64

best_valid_loss = float('inf')
patience = 10

# 모델 만들기
model = ST_GCN(num_classes=13, 
                  in_channels=3,
                  t_kernel_size=9, #시간 그래프 convolution 커널 크기(t_kernel_size x 1)
                  hop_size=2).to(device)

# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.05)

# loss function
criterion = torch.nn.CrossEntropyLoss()

# data 준비
data_loader = dict()
data_loader['train'] = torch.utils.data.DataLoader(dataset=Feeder(data_path='animal_data/train_data.npy', label_path='data/train_label.npy'), batch_size=BATCH_SIZE, shuffle=True,)
data_loader['val'] = torch.utils.data.DataLoader(dataset=Feeder(data_path='animal_data/val_data.npy', label_path='data/val_label.npy'), batch_size=BATCH_SIZE, shuffle=False,)
data_loader['test'] = torch.utils.data.DataLoader(dataset=Feeder(data_path='animal_data/test_data.npy', label_path='data/test_label.npy'), batch_size=BATCH_SIZE, shuffle=False,)

# 학습 준비
model.train()

# 학습 진행 
for epoch in range(1, NUM_EPOCH+1):
  correct = 0
  sum_loss = 0

  for batch_idx, (data, label) in enumerate(data_loader['train']):
    data = data.to(device)
    label = label.to(device)

    output = model(data).to(device)
    print(output)
    print(label)

    loss = criterion(output, label)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    sum_loss += loss.item()
    _, predict = torch.max(output.data, 1)
    correct += (predict == label).sum().item()

  print('# train; Epoch: {} | Loss: {:.4f} | Accuracy: {:.4f}'.format(epoch, sum_loss/len(data_loader['train'].dataset), (100. * correct / len(data_loader['train'].dataset))))

  if epoch % 10 == 0:
    model.eval()

    correct = 0
    sum_loss = 0

    confusion_matrix = np.zeros((10, 10))
    with torch.no_grad():
      for batch_idx, (data, label) in enumerate(data_loader['val']):
        data = data.to(device)
        label = label.to(device)

        output = model(data)
        loss = criterion(output,label)
        sum_loss += loss.item()

        _, predict = torch.max(output.data, 1)
        correct += (predict == label).sum().item()
        val_loss = sum_loss/len(data_loader['val'].dataset)

      print('# validation; Epoch: {} | Loss: {:.4f} | Accuracy: {:.4f}'.format(epoch, sum_loss/len(data_loader['val'].dataset), (100. * correct / len(data_loader['val'].dataset))))


      if val_loss < best_valid_loss:
        best_valid_loss = best_valid_loss
        patience_counter = 0
      else:
        patience_counter +=1

      if patience_counter >= patience:
        print('Early stopping tirggered')

TypeError: 'torch.Size' object is not callable

In [None]:

model.eval()

correct = 0
confusion_matrix = np.zeros((10, 10))
with torch.no_grad():
  for batch_idx, (data, label) in enumerate(data_loader['test']):
    data = data.to(device)
    label = label.to(device)

    output = model(data)

    _, predict = torch.max(output.data, 1)
    correct += (predict == label).sum().item()

    for l, p in zip(label.view(-1), predict.view(-1)):
      confusion_matrix[l.long(), p.long()] += 1

len_cm = len(confusion_matrix)
for i in range(len_cm):
    sum_cm = np.sum(confusion_matrix[i])
    for j in range(len_cm):
        confusion_matrix[i][j] = 100 * (confusion_matrix[i][j] / sum_cm)

classes = ['drink', 'throw', 'sit down', 'stand up', 'clapping', 'hand waving', 'kicking', 'jump up', 'salute', 'falling down']
plt.imshow(confusion_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion matrix')
plt.tight_layout()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
plt.ylabel('True')
plt.xlabel('Predicted')
plt.show()


print('# Test Accuracy: {:.3f}[%]'.format(100. * correct / len(data_loader['test'].dataset)))