### 필요한 라이브러리 로드

In [None]:
from ultralytics import YOLO
import mediapipe as mp
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader, random_split
import cv2
import numpy as np
import os
import random
import platform
import yaml
from tqdm import tqdm
import gc
from torch.cuda import empty_cache
import pandas as pd


if platform.system() == "Darwin":
    print("your system is mac os")
    device = torch.device("mps") if torch.backends.mps.is_available() else "cpu"
else:
    print("your system is cuda")
    device = torch.device("cuda") if torch.cuda.is_available() else "cpu"

device

E0000 00:00:1743647376.039775    4807 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1743647376.048070    4807 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1743647376.074019    4807 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743647376.074081    4807 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743647376.074085    4807 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743647376.074086    4807 computation_placer.cc:177] computation placer already registered. Please check linka

your system is cuda


device(type='cuda')

In [2]:
data_dir = '/home/pepsi/dev_ws/mldl/Training/datasets/person'
model_name = "yolov8n.pt"

classes = ['person']
nc = len(classes)
yaml_file = 'data.yaml'

In [3]:
data = {
    'train' : data_dir + '/train/',
    'val' : data_dir + '/valid/',
    'test' : data_dir + '/test/',
    'nc' : nc,
    'names' : classes,
}

In [4]:
os.makedirs(data_dir, exist_ok=True)

with open(data_dir + '/' + yaml_file, 'wt') as fw:
    yaml.dump(data, fw)

In [5]:
with open(data_dir + '/' + yaml_file, 'rt') as fr:
    d = yaml.safe_load(fr)
    print(type(d))
    print(d)

<class 'dict'>
{'names': ['person'], 'nc': 1, 'test': '/home/pepsi/dev_ws/mldl/Training/datasets/person/test/', 'train': '/home/pepsi/dev_ws/mldl/Training/datasets/person/train/', 'val': '/home/pepsi/dev_ws/mldl/Training/datasets/person/valid/'}


In [6]:
train_epoch = 100
patience = 30
batch = 32
imgsz = 640
LR = 0.001
optimizer = 'AdamW'

In [None]:
yolo_model = YOLO(model_name).to(device)

yolo_model.train(data=data_dir + '/' + yaml_file,
            epochs = train_epoch,
            patience=patience,
            batch=batch,
            imgsz = imgsz,
            optimizer=optimizer)

In [None]:
YOLO_best_model_path = '/Users/wjsong/dev_ws/Hosbot/runs/detect/train/weights/best.pt'

YOLO_best_model = YOLO(YOLO_best_model_path).to(device)
metrics = YOLO_best_model.val()

for label, ap in zip(classes, metrics.box.maps):
    print(label, ':', ap)

In [2]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

yolo_best_model = '/home/pepsi/dev_ws/mldl/Training/runs/detect/train3/weights/best.pt'
yolo_model = YOLO(yolo_best_model).to(device)

In [3]:
def get_YOLO_box(img, yolo_model, detect_cls):
    box_results = yolo_model.predict(img, conf=0.6, verbose=False, show=False)
    box_results = box_results[0].boxes

    boxes = box_results.xyxy.cpu().tolist()
    box_class = box_results.cls.cpu().tolist()

    p1x1, p1y1, p1x2, p1y2 = 0, 0, 0, 0
    p2x1, p2y1, p2x2, p2y2 = 0, 0, 0, 0
    for idx, cls in enumerate(box_class):
        if int(cls) == detect_cls:
            p1x1, p1y1, p1x2, p1y2 = boxes[0]
            p1x1, p1y1, p1x2, p1y2 = int(p1x1), int(p1y1), int(p1x2), int(p1y2)

            if len(boxes) > 1:
                p2x1, p2y1, p2x2, p2y2 = boxes[1]
                p2x1, p2y1, p2x2, p2y2 = int(p2x1), int(p2y1), int(p2x2), int(p2y2)

    return p1x1, p1y1, p1x2, p1y2, p2x1, p2y1, p2x2, p2y2

In [4]:
def get_pose_landmarks(results):
    xyz_list = []
    for landmark in results.pose_landmarks.landmark:
        xyz_list.append(landmark.x)
        xyz_list.append(landmark.y)
        xyz_list.append(landmark.z)
    return xyz_list 

In [5]:
def append_data_xyz_list_list(xyz_list_list, p1x1, p1y1, p1x2, p1y2, p2x1, p2y1, p2x2, p2y2, xyz_list):
    xyz_list.append(abs(p1x1 - p2x1) / 640)
    xyz_list.append(abs(p1x2 - p2x2) / 640)
    xyz_list.append(abs(p1y1 - p2y1) / 640)
    xyz_list.append(abs(p1y2 - p2y2) / 640)
    xyz_list_list.append(xyz_list)

    return xyz_list_list

In [6]:
def generate_dataset(mp_pose, video_path, detect_cls):
    xyz_list_list = []
    poses = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)

    cap = cv2.VideoCapture(video_path)

    if cap.isOpened():
        while True:
            ret, img = cap.read()
            if ret == True:
                xyz_list = []
                img = cv2.resize(img, (640, 640))
                results = poses.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

                if not results.pose_landmarks: continue

                xyz_list = get_pose_landmarks(results)
                p1x1, p1y1, p1x2, p1y2, p2x1, p2y1, p2x2, p2y2 = get_YOLO_box(img, yolo_model, detect_cls)

                if (p1x1 == 0 and p1y1 == 0 and p1x2 == 0 and p1y2== 0) and (p2x1 == 0 and p2y1 == 0 and p2x2 == 0 and p2y2== 0): continue

                xyz_list_list = append_data_xyz_list_list(xyz_list_list, p1x1, p1y1, p1x2, p1y2, p2x1, p2y1, p2x2, p2y2, xyz_list)

                cv2.waitKey(1)
            else:
                break

    cap.release()
    gc.collect()
    torch.cuda.empty_cache()
    
    return xyz_list_list

In [48]:
Video_path = './datasets/pose/train2'
video_name_list = os.listdir(Video_path)
dataset = []
length = 20
detect_cls = 0

for video_name in tqdm(video_name_list):
    if 'normal' in video_name: label = 0
    elif 'fighting' in video_name: label = 1
    elif 'lying' in video_name: label = 2
    elif 'smoking' in video_name: label = 3

    pose_data = generate_dataset(mp_pose, '{}/{}'.format(Video_path, video_name), detect_cls)

    for idx in range(0, len(pose_data), int(length)):
        seq_list = pose_data[idx : idx + length]
        if len(seq_list) == length:
            dataset.append({'key' : label, 'value': seq_list})

random.shuffle(dataset)

  0%|          | 0/16 [00:00<?, ?it/s]

I0000 00:00:1743645700.343415    5760 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1743645700.347369    9428 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.2.8-1ubuntu1~24.04.1), renderer: Mesa Intel(R) UHD Graphics (CML GT2)
W0000 00:00:1743645700.442730    9416 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1743645700.478107    9420 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
  6%|▋         | 1/16 [00:29<07:23, 29.58s/it]I0000 00:00:1743645729.931986    5760 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1743645729.933269    9456 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.2.8-1ubuntu1~24.04.1), renderer: Mesa Intel(R) UHD Graphics (CML GT2)
W0000 00:00:1743645730.029043    9449 in

In [49]:
print(len(dataset))

237


In [50]:
print('input data', dataset[0]['value'][0])
print('input data length', len(dataset[0]['value'][0]))

input data [0.4049217700958252, 0.3418542146682739, -0.09563499689102173, 0.40718451142311096, 0.3306730389595032, -0.08019842952489853, 0.4082857668399811, 0.3302232027053833, -0.08024564385414124, 0.4095185399055481, 0.3297785520553589, -0.08028195798397064, 0.4030199944972992, 0.3291739821434021, -0.10445760190486908, 0.4002353549003601, 0.32796043157577515, -0.10442928969860077, 0.39815306663513184, 0.32661283016204834, -0.10451358556747437, 0.4051211476325989, 0.32861778140068054, 0.017215870320796967, 0.3886374235153198, 0.32688695192337036, -0.095341756939888, 0.4048277735710144, 0.3509773015975952, -0.05451038479804993, 0.39755570888519287, 0.3493660092353821, -0.08712806552648544, 0.3967532217502594, 0.37723299860954285, 0.11964359134435654, 0.36059829592704773, 0.37002426385879517, -0.11708519607782364, 0.4187869429588318, 0.4673067629337311, 0.13907867670059204, 0.38053032755851746, 0.4461829662322998, -0.20440462231636047, 0.4292142987251282, 0.4716995358467102, 0.060258768

In [7]:
class MyDataset(Dataset):
    def __init__(self, seq_list):
        self.X = []
        self.y = []
        for dic in seq_list:
            self.y.append(dic['key'])
            self.X.append(dic['value'])

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        data = self.X[index]
        label = self.y[index]
        return torch.Tensor(np.array(data)), torch.tensor(np.array(int(label)))

In [8]:
split_ratio = [0.7, 0.2, 0.1]
train_len = int(len(dataset) * split_ratio[0])
val_len = int(len(dataset) * split_ratio[1])
test_len = len(dataset) - train_len - val_len

NameError: name 'dataset' is not defined

In [9]:
train_dataset = MyDataset(dataset)
train_data, valid_data, test_data = random_split(train_dataset, [train_len, val_len, test_len])

train_loader = DataLoader(train_data, batch_size=8)
val_loader = DataLoader(valid_data, batch_size=8)
test_loader = DataLoader(test_data, batch_size=8)

NameError: name 'dataset' is not defined

In [10]:
class LSTM(nn.Module):
    def __init__(self, num_layers=1):
        super(LSTM, self).__init__()
        self.lstm1 = nn.LSTM(103, 128, num_layers, batch_first=True, bidirectional=True)
        self.layer_norm1 = nn.LayerNorm(256)
        self.dropout1 = nn.Dropout(0.1)

        self.lstm2 = nn.LSTM(256, 64, num_layers, batch_first=True, bidirectional=True)
        self.layer_norm2 = nn.LayerNorm(128)
        self.dropout2 = nn.Dropout(0.1)

        self.lstm3 = nn.LSTM(128, 32, num_layers, batch_first=True, bidirectional=True)
        self.layer_norm3 = nn.LayerNorm(64)
        self.dropout3 = nn.Dropout(0.1)

        self.attention = nn.Linear(64, 1)
        self.fc = nn.Linear(64, 4)


    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.layer_norm1(x)
        x = self.dropout1(x)

        x, _ = self.lstm2(x)
        x = self.layer_norm2(x)
        x = self.dropout2(x)

        x, _ = self.lstm3(x)
        x = self.layer_norm3(x)
        x = self.dropout3(x)

        attention_weights = torch.softmax(self.attention(x), dim=1)
        x = torch.sum(attention_weights * x, dim=1)

        x = self.fc(x)
        return x

In [11]:
def init_model():
    global net, loss_fn, optim
    net = LSTM().to(device)
    loss_fn = nn.CrossEntropyLoss()
    optim = AdamW(net.parameters(), lr = 0.001)

In [12]:
def init_epoch():
    global epoch_cnt
    epoch_cnt = 0

In [13]:
def init_log():
    global iter_log, tloss_log, tacc_log, vloss_log, vacc_log, log_stack, time_log
    iter_log, tloss_log, tacc_log, vloss_log, vacc_log = [], [], [], [], []
    log_stack, time_log = [], []

In [14]:
def record_train_log(_tloss, _tacc, _time):
    time_log.append(_time)
    tloss_log.append(_tloss)
    tacc_log.append(_tacc)
    iter_log.append(epoch_cnt)

In [15]:
def record_valid_log(_vloss, _vacc):
    vloss_log.append(_vloss)
    vacc_log.append(_vacc)

In [16]:
def last(log_list):
    if len(log_list) > 0:
        return log_list[len(log_list) - 1]
    else:
        return -1

In [17]:
def print_log():
    train_loss = round(float(last(tloss_log)), 3)
    train_acc = round(float(last(tacc_log)), 3)
    val_loss = round(float(last(vloss_log)), 3)
    val_acc = round(float(last(vacc_log)), 3)
    time_spent = round(float(last(time_log)), 3)

    log_str = 'Epoch: {:3}| T_Loss {:5} | T_acc {:5}| V_Loss {:5}| V_acc {:5} | {:5}'.format(last(iter_log), train_loss, train_acc, val_loss, val_acc, time_spent)

    log_stack.append(log_str) #프린트 준비

    for idx in reversed(range(len(log_stack))):
        print(log_stack[idx])

In [18]:
def clear_memory():
    if device != 'cpu':
        empty_cache()
    gc.collect()
    
def epoch_not_finished():
    return epoch_cnt < maximum_epoch

In [19]:
def epoch(data_loader, mode='train'):
    global epoch_cnt

    iter_loss, iter_acc, last_grad_performed = [], [], False

    for _data, _label in data_loader:
        data, label = _data.to(device), _label.type(torch.LongTensor).to(device)

        if mode == 'train' : net.train()
        else: net.eval()

        result = net(data)
        _, out = torch.max(result, 1)

        loss = loss_fn(result, label)
        iter_loss.append(loss.item())

        if mode == 'train':
            optim.zero_grad()
            loss.backward()
            optim.step()
            last_grad_performed = True

        acc_partial = (out == label).float().sum()
        acc_partial = acc_partial / len(label)
        iter_acc.append(acc_partial.item())


    if last_grad_performed:
        epoch_cnt += 1

    clear_memory()

    return np.average(iter_loss), np.average(iter_acc)

In [20]:
import time

init_model()
init_epoch()
init_log()
maximum_epoch = 50

In [72]:
while epoch_not_finished():
    start_time = time.time()
    tloss, tacc = epoch(train_loader, mode='train')
    end_time = time.time()
    time_taken = end_time - start_time
    record_train_log(tloss, tacc, time_taken)
    with torch.no_grad():
        vloss, vacc = epoch(val_loader, mode= 'val')
        record_valid_log(vloss, vacc)
    print_log()

print('\n Training completed!')

Epoch:   1| T_Loss 1.021 | T_acc 0.606| V_Loss 0.508| V_acc 0.792 | 0.473
Epoch:   2| T_Loss 0.518 | T_acc 0.758| V_Loss 0.364| V_acc 0.747 | 0.967
Epoch:   1| T_Loss 1.021 | T_acc 0.606| V_Loss 0.508| V_acc 0.792 | 0.473
Epoch:   3| T_Loss  0.49 | T_acc 0.719| V_Loss 0.349| V_acc  0.81 | 0.929
Epoch:   2| T_Loss 0.518 | T_acc 0.758| V_Loss 0.364| V_acc 0.747 | 0.967
Epoch:   1| T_Loss 1.021 | T_acc 0.606| V_Loss 0.508| V_acc 0.792 | 0.473
Epoch:   4| T_Loss 0.446 | T_acc 0.758| V_Loss  0.34| V_acc 0.747 | 0.888
Epoch:   3| T_Loss  0.49 | T_acc 0.719| V_Loss 0.349| V_acc  0.81 | 0.929
Epoch:   2| T_Loss 0.518 | T_acc 0.758| V_Loss 0.364| V_acc 0.747 | 0.967
Epoch:   1| T_Loss 1.021 | T_acc 0.606| V_Loss 0.508| V_acc 0.792 | 0.473
Epoch:   5| T_Loss 0.488 | T_acc 0.683| V_Loss 0.307| V_acc 0.833 | 0.888
Epoch:   4| T_Loss 0.446 | T_acc 0.758| V_Loss  0.34| V_acc 0.747 | 0.888
Epoch:   3| T_Loss  0.49 | T_acc 0.719| V_Loss 0.349| V_acc  0.81 | 0.929
Epoch:   2| T_Loss 0.518 | T_acc 0.758

In [73]:
with torch.no_grad():
    test_loss, test_acc = epoch(test_loader, mode='test')
    test_acc = round(test_acc, 4)
    test_loss = round(test_loss, 4)
    print('Test Acc: {}'.format(test_acc))
    print('Test Loss: {}'.format(test_loss))

Test Acc: 0.875
Test Loss: 0.4573


In [74]:
model_path = './models/lstm_model.pth'

torch.save(net.state_dict(), model_path)
print('모델이 저장되었습니다.')

모델이 저장되었습니다.


In [21]:
model_path = './models/lstm_model.pth'

lstm_model = LSTM().to(device)
lstm_model.load_state_dict(torch.load(model_path, map_location=device))
lstm_model.eval()
print("모델이 성공적으로 불러와졌습니다.")

모델이 성공적으로 불러와졌습니다.


In [22]:
length = 40
detect_cls = 0

lstm_model.eval()
dataset = []
status = 'None'

mp_pose = mp.solutions.pose
poses = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

xyz_list_list = []
status_dict = {0: 'normal', 1: 'fighting', 2: 'lying', 3: 'smoking'}

I0000 00:00:1743646739.685445    4471 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1743646739.690159    4733 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.2.8-1ubuntu1~24.04.1), renderer: Mesa Intel(R) UHD Graphics (CML GT2)


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1743646739.778149    4721 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1743646739.820520    4723 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [26]:
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    RuntimeError("카메라 열기 실패")

while True:
    ret, frame = cap.read()
    if not ret:  # 프레임 읽기 실패 시 종료
        break

    # 프레임 크기 조정 (선택 사항, YOLO 모델에 따라 필요)
    frame = cv2.resize(frame, (640, 640))

    # Mediapipe 포즈 추출
    results = poses.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    xyz_list = []

    if not results.pose_landmarks:
        continue

    # 포즈 랜드마크 추출 및 그리기
    for landmark in results.pose_landmarks.landmark:
        xyz_list.append(landmark.x)
        xyz_list.append(landmark.y)
        xyz_list.append(landmark.z)

    mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    # YOLO 박스 예측
    box_results = yolo_model.predict(frame, conf=0.6, verbose=False, show=False)[0].boxes
    boxes = box_results.xyxy.cpu().tolist()
    box_class = box_results.cls.cpu().tolist()

    p1x1, p1y1, p1x2, p1y2 = 0, 0, 0, 0
    p2x1, p2y1, p2x2, p2y2 = 0, 0, 0, 0
    for idx, cls in enumerate(box_class):
        if int(cls) == detect_cls:
            p1x1, p1y1, p1x2, p1y2 = map(int, boxes[0])
            if len(boxes) > 1:
                p2x1, p2y1, p2x2, p2y2 = map(int, boxes[1])
            cv2.rectangle(frame, (p1x1, p1y1), (p1x2, p1y2), (0, 255, 0), 2)
            cv2.rectangle(frame, (p2x1, p2y1), (p2x2, p2y2), (0, 255, 0), 2)
            break

    if (p1x1 == 0 and p1y1 == 0 and p1x2 == 0 and p1y2== 0) and (p2x1 == 0 and p2y1 == 0 and p2x2 == 0 and p2y2== 0):
        continue

    # YOLO 박스 좌표 정규화 후 추가
    xyz_list.extend([abs(p1x1 - p2x1) / 640, abs(p1x2 - p2x2) / 640, abs(p1y1 - p2y1) / 640, abs(p1y2 - p2y2) / 640])
    xyz_list_list.append(xyz_list)

    # 시퀀스 길이에 도달하면 LSTM 예측 수행
    if len(xyz_list_list) == length:
        dataset = [{'key': 0, 'value': xyz_list_list}]  # 임시 라벨 0 사용
        dataset = MyDataset(dataset)
        dataset_loader = DataLoader(dataset, batch_size=1)

        for data, _ in dataset_loader:
            data = data.to(device)
            with torch.no_grad():
                result = lstm_model(data)
                _, out = torch.max(result, 1)
                print(out)
                status = status_dict.get(out.item(), 'Unknown')

        xyz_list_list = []  # 시퀀스 초기화

    # 상태 텍스트 표시
    cv2.putText(frame, status, (10, 50), cv2.FONT_HERSHEY_COMPLEX, 1.5, (255, 0, 0), 2)

    # 프레임 표시
    cv2.imshow('frame', frame)

    # 'q' 키로 종료
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 리소스 해제
cap.release()
cv2.destroyAllWindows()

tensor([0], device='cuda:0')
tensor([0], device='cuda:0')


: 