In [1]:
import os
import random
import cv2
import pandas as pd
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import mediapipe as mp
import numpy as np
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader, random_split

In [3]:
def seed_everything(seed: int = 22):
       random.seed(seed)
       np.random.seed(seed)
seed_everything()

In [4]:
tot_actions = 6
actions_name = ['ankle', 'fall', 'jump', 'knee', 'run', 'walk']
min_data_len = 35

In [5]:
actions_csv_dir = '../dataset/radius_csv_dataset'
dataset = []

label_mapping = {'ankle': 0,
                 'fall': 1,
                 'jump': 2,
                 'knee': 3,
                 'run' : 4,
                 'walk' : 5
                 }

def map_action_to_label(csv_name):
       for action, label in label_mapping.items():
              if action in csv_name.split('_')[0]:
                     return label
       return -1

for action_csv in os.listdir(actions_csv_dir):
       action_df = pd.read_csv(os.path.join(actions_csv_dir, action_csv))
       label = map_action_to_label(action_csv)
       if label != -1:
              for idx in range(0, len(action_df), int(min_data_len / 2)):
                     seq_df = action_df[idx: idx + min_data_len] #길이만큼 데이터 자른 것(즉 length 만큼의 프레임)
                     if len(seq_df) == min_data_len: # 딱 length에 개수 맞춰서 끊어서 넣으려고
                            dataset.append({'key': label, 'value': seq_df}) # key에 slide, value에는 묶음 프레임 만큼이 담기겠네
       #최종적으로 dataset에는 행동별로 dictionary 가 만들어져 들어간다.

In [6]:
print(dataset[0]['value']) # z축 까지 99 (33 * 3)차원

             0           1           2           3           4           5
0   118.957524  118.957524  118.957524  118.957524  118.957524  118.957524
1   116.456474  116.456474  116.456474  116.456474  116.456474  116.456474
2   120.158757  120.158757  120.158757  120.158757  120.158757  120.158757
3   117.191128  117.191128  117.191128  117.191128  117.191128  117.191128
4   116.849897  116.849897  116.849897  116.849897  116.849897  116.849897
5   118.158333  118.158333  118.158333  118.158333  118.158333  118.158333
6   116.082080  116.082080  116.082080  116.082080  116.082080  116.082080
7   115.984675  115.984675  115.984675  115.984675  115.984675  115.984675
8   114.949566  114.949566  114.949566  114.949566  114.949566  114.949566
9   117.917963  117.917963  117.917963  117.917963  117.917963  117.917963
10  117.909310  117.909310  117.909310  117.909310  117.909310  117.909310
11  116.314775  116.314775  116.314775  116.314775  116.314775  116.314775
12  118.962365  118.96236

In [7]:
class MyDataset(Dataset):
       def __init__(self, dataset): #모든 행동을 통합한 df가 들어가야함
              self.x = []
              self.y = []
              for dic in dataset:
                     self.y.append(dic['key']) #key 값에는 actions 들어감
                     self.x.append(dic['value']) #action마다의 data 들어감

       def __getitem__(self, index): #index는 행동의 index
              data = self.x[index] # x에는 꺼내 쓸 (행동마다 45개 묶음프레임)의 데이터
              label = self.y[index]
              return torch.Tensor(np.array(data)), torch.tensor(np.array(int(label)))

       def __len__(self):
              return len(self.x)


In [None]:
train_test_val_ratio = [0.8, 0.1, 0.1]
print(len(dataset))
train_len = int(len(dataset) * train_test_val_ratio[0])
val_len = int(len(dataset) * train_test_val_ratio[1])
test_len = len(dataset) - train_len - val_len

In [9]:
CFG = {'batch_size': 8,
       'learning_rate': 2e-2,
       'seed': 22,
       'epochs': 30    
}

In [10]:
train_dataset = MyDataset(dataset)
train_data, valid_data, test_data = random_split(train_dataset, [train_len, val_len, test_len])

train_loader = DataLoader(train_data, batch_size=CFG['batch_size'])
val_loader = DataLoader(valid_data, batch_size=CFG['batch_size'])
test_loader = DataLoader(test_data, batch_size=CFG['batch_size'])

In [11]:
from sklearn.utils import shuffle

def shuffle_dataset(lm_list, label_list):
       lm_list, label_list = shuffle(lm_list, label_list, random_state=22)
       return lm_list, label_list

In [12]:
class Model(nn.Module):
       def __init__(self):
              super(Model, self).__init__()
              self.recurrent_layer = nn.LSTM(hidden_size=100, input_size=6,  bidirectional=True)

              self.nonLin = nn.BatchNorm1d(35)
              self.recurrent_layer2 = nn.LSTM(hidden_size=100, input_size=200, bidirectional=True)
              self.nonLin2 = nn.BatchNorm1d(35)
              self.conv = nn.Conv1d(35,36,7,1)
              self.relu1 = nn.Softmax()
              self.classify_layer = nn.Linear(194, 6)

       def forward(self, input, h_t_1=None, c_t_1=None):
              # the size of rnn_outputs is [batch_size, seq_len, rnn_size]
              rnn_outputs, (hn, cn) = self.recurrent_layer(input)
              lin1 = self.nonLin(rnn_outputs)

              rnn_outputs2, (hn2, cn2) = self.recurrent_layer2(lin1)

              lin2 = self.nonLin2(rnn_outputs2)
              conv = self.conv(lin2)
              relu = self.relu1(conv)

              logits = self.classify_layer(relu[:,-1])
              return logits


In [None]:
'''
class Model(nn.Module):
       def __init__(self):
              super(Model, self).__init__()
              self.lstm1 = nn.LSTM(input_size=99, hidden_size=128, num_layers=1, batch_first=True) 
              self.lstm2 = nn.LSTM(input_size=128, hidden_size=256, num_layers=1, batch_first=True)
              self.dropout1 = nn.Dropout(0, 1)
              self.lstm3 = nn.LSTM(input_size=256, hidden_size=128, num_layers=1, batch_first=True)
              self.lstm4 = nn.LSTM(input_size=128, hidden_size=64, num_layers=1, batch_first=True)
              self.dropout2 = nn.Dropout(0, 1)
              self.lstm5 = nn.LSTM(input_size=64, hidden_size=32, num_layers=1, batch_first=True)
              self.fc = nn.Linear(32, 6) #분류할 클래스 6가지

       def forward(self, x):
              x, _ = self.lstm1(x)
              x, _ = self.lstm2(x)
              x, _ = self.lstm3(x)
              x = self.dropout1(x)
              x, _ = self.lstm4(x)
              x, _ = self.lstm5(x)
              x, _ = self.lstm6(x)
              x = self.dropout2(x)
              x, _ = self.lstm7(x)
              x = self.fc(x[:, -1, :])
              return x
'''

In [None]:
num_epochs = CFG['epochs']
best_models = []  # 폴드별로 성능 높은 모델 저장
for i,fold in enumerate(range(4)):
       print('===============',i+1,'fold start===============')
       
       model = Model()
       optimizer = optim.Adam(model.parameters(), lr=CFG['LEARNING_RATE'] )
       criterion = nn.CrossEntropyLoss()
       lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                      step_size = 5,
                                                      gamma = 0.9) # learning rate scheduler 로 학습률 주기적 감소
       

       val_acc_max = 0.9 # 정확도 0.9 이상 저장
       val_loss_min = 0.2 #손실 0.2 이하 저장
       
       # Lists to store training and validation metrics for each epoch
       train_loss_history = []
       val_loss_history = []
       val_acc_history = []
       
       for epoch in range(num_epochs):
              train_loss_list = []
              val_loss_list = []
              val_acc_list = []
       
              # 모델 학습
              for i, (images, targets) in enumerate(train_loader):
                     model.train()
       
                     optimizer.zero_grad()
                     outputs = model(images)
       
                     targets = targets.long()
                     loss = criterion(outputs, targets)
                     loss.backward()
                     optimizer.step()
       
                     train_loss_list.append(loss.item())
       
                     if (i + 1) % 20 == 0:
                            print(f'Epoch: {epoch} - Loss: {loss:.6f}')
       
              train_loss = np.mean(train_loss_list)
       
              # 모델 검증
              for i, (images, targets) in enumerate(val_loader):
                     model.eval()
       
                     with torch.no_grad():
                            outputs = model(images)
       
                            targets = targets.long()
       
                            val_loss = criterion(outputs, targets)
       
                            preds = torch.argmax(outputs, dim=1)
       
                            batch_acc = (preds == targets).float().mean()  # boolean 값의 평균
       
                            val_loss_list.append(val_loss.item())
                            val_acc_list.append(batch_acc.item())
       
              val_loss = np.mean(val_loss_list)
              val_acc = np.mean(val_acc_list)
       
              train_loss_history.append(train_loss)
              val_loss_history.append(val_loss)
              val_acc_history.append(val_acc)
       
              print(f'Epoch: {epoch} - valid Loss: {val_loss:.6f} - valid_acc : {val_acc:.6f}')
       
              if val_acc_max < val_acc or val_loss_min > val_loss:
                     val_acc_max = val_acc
                     best_models.append(model)
                     print('model save, model val acc : ', val_acc)
       
              lr_scheduler.step()

print('Train finished, best_models size : ', len(best_models))

In [None]:
save_directory = 'saved_models'

for i, model in enumerate(best_models):
       model_filename = f"best_model_{i}.pth"
       model_path = save_directory + model_filename
       torch.save(model.state_dict(), model_path)

print(f"Saved {len(best_models)} models to {save_directory}")

In [None]:
# 학습, 검증 과정 손실 그래프
plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label='Training Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss_for_epoch_for_4folds')
plt.legend()
plt.show()

# 학습, 검증 과정 정확도 그래프
plt.figure(figsize=(10, 5))
plt.plot(val_acc_history, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Accuracy_for_epoch_for_4folds.png')
plt.legend()
plt.show()

In [None]:
# 정확도 검증
for m in best_models:
       with torch.no_grad():
              test_loss, test_acc = epoch(test_loader, mode='test')
              test_acc = round(test_acc, 4)
              test_loss = round(test_loss, 4)
              print('Test Acc.: {}'.format(test_acc))
              print('Test Loss: {}'.format(test_loss))


In [None]:
def load_model(model, filepath):
       model.load_state_dict(torch.load(filepath))
       model.eval()
       return model

load_directory = save_directory 

num_models = 5
test_accuracies = []

for i in range(num_models):
       model = Model()
       model_filename = f"best_model_{i}.pth"
       model_path = load_directory + model_filename
       model = load_model(model, model_path)
       
       correct = 0
       total = 0
       with torch.no_grad():
              for images, targets in test_loader:
                     outputs = model(images)
                     _, predicted = torch.max(outputs.data, 1)
                     total += targets.size(0)
                     correct += (predicted == targets).sum().item()
               
       accuracy = correct / total
       test_accuracies.append(accuracy)

       print(f"Model {i+1} - Test Accuracy: {accuracy:.4f}")

In [24]:
# 실시간 영상 테스트
interval = 1
video_path = '../dataset/slide/slide001.mp4'

cap = cv2.VideoCapture(video_path)
img_list = []
if cap.isOpened():
       cnt = 0
       while True:
              ret, img = cap.read()
              if ret:
                     img = cv2.resize(img, (640, 640))
                     if cnt == interval:
                            img_list.append(img)
                            cnt = 0
                     cv2.imshow(video_path, img)
                     cv2.waitKey(1)
                     cnt += 1
              else:
                     break
cap.release()
cv2.destroyAllWindows()

print('저정된 frame의 개수: {}'.format(len(img_list)))

저정된 frame의 개수: 148


In [33]:
# 연속 시퀀스 분석
from tqdm import tqdm
model_filename = f"best_model_1.pth"
model_path = load_directory + model_filename
model = load_model(model, model_path)

model.eval()
out_img_list = []
dataset = []
status = 'None'
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, model_complexity=1, min_detection_confidence=0.7, min_tracking_confidence=0.5)

print('시퀀스 데이터 분석 중...')
xy_list_list = []

for img in tqdm(img_list):
       results = pose.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
       if not results.pose_landmarks: continue
       xy_list = []
       idx = 0
       for x_and_y in results.pose_landmarks.landmark:
              xy_list.append(x_and_y.x)
              xy_list.append(x_and_y.y)
              x, y = int(x_and_y.x * 640), int(x_and_y.y * 640)
       idx += 1
       xy_list_list.append(xy_list)

       length = 45
       if len(xy_list_list) == length:
              dataset = []
              dataset.append({'key': 0, 'value': xy_list_list})
              dataset = MyDataset(dataset)
              dataset = DataLoader(dataset)
              xy_list_list = []
              for data, label in dataset:
                     with torch.no_grad():
                            result = model(data)
                            _, out = torch.max(result, 1)
                            print(out.item())
       cv2.putText(img, status, (0, 50), cv2.FONT_HERSHEY_COMPLEX, 1.5, (0, 0, 255), 2)
       out_img_list.append(img)      


시퀀스 데이터 분석 중...


  return self._call_impl(*args, **kwargs)
 33%|███▎      | 49/148 [00:01<00:03, 32.12it/s]

2


 66%|██████▌   | 97/148 [00:03<00:01, 32.86it/s]

2


 95%|█████████▌| 141/148 [00:04<00:00, 32.68it/s]

2


100%|██████████| 148/148 [00:04<00:00, 32.44it/s]
