## 운동 분류

In [None]:
import numpy as np
import torch
from torch import nn
import torch.optim as optim
import matplotlib.pyplot as plt
from pytorchtools import EarlyStopping
from gcn_source import PoseGCN
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import cv2
import pickle

## data 

In [None]:
# 데이터 전처리 정의 (프레임 단위로 처리)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 프레임 크기 조정
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화 (ImageNet 기준)
])



In [None]:

class ExerciseDataset(Dataset):
    def __init__(self, root_dir, transform=None, frames_per_video=16):
        """
        :param root_dir: 데이터 폴더의 최상위 디렉토리 (예: "data/")
        :param transform: 영상 데이터에 적용할 변환 함수
        :param frames_per_video: 영상에서 추출할 프레임 수
        """
        self.root_dir = root_dir
        self.transform = transform
        self.frames_per_video = frames_per_video

        # 클래스별 라벨 설정
        self.classes = {
            "push_start": [1, 0, 0],
            "squat_start": [0, 1, 0],
            "lunge_start": [0, 0, 1]
        }

        # 모든 영상 파일의 경로와 라벨 저장
        self.video_paths = []
        self.labels = []

        for class_name, label in self.classes.items():
            class_folder = os.path.join(root_dir, class_name)
            if os.path.isdir(class_folder):
                for file in os.listdir(class_folder):
                    if file.endswith('.mp4'):  # MP4 파일만 처리
                        self.video_paths.append(os.path.join(class_folder, file))
                        self.labels.append(label)

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)

        # 영상 읽기 및 프레임 추출
        cap = cv2.VideoCapture(video_path)
        frames = []
        while len(frames) < self.frames_per_video:
            ret, frame = cap.read()
            if not ret:  # 영상이 끝났으면 루프 중단
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # OpenCV는 BGR로 읽으므로 RGB로 변환
            frames.append(frame)

        cap.release()

        # 프레임 수가 부족할 경우 반복하여 채움
        while len(frames) < self.frames_per_video:
            frames.append(frames[-1])  # 마지막 프레임을 반복해서 추가

        # 텐서 변환 및 전처리
        frames = torch.tensor(frames, dtype=torch.float32).permute(0, 3, 1, 2)  # (T, H, W, C) → (T, C, H, W)
        if self.transform:
            frames = self.transform(frames)

        return frames, label


In [None]:
# 데이터셋 경로 설정
root_dir = "data"

# 데이터셋 생성
dataset = ExerciseDataset(root_dir, transform=transform, frames_per_video=16)

# 데이터 로더 생성
data_loader = DataLoader(dataset, batch_size=4, shuffle=True)

# 데이터 확인
for batch_idx, (videos, labels) in enumerate(data_loader):
    print(f"Batch {batch_idx + 1}")
    print(f"Videos shape: {videos.shape}")  # (Batch, T, C, H, W)
    print(f"Labels shape: {labels.shape}")  # (Batch, Class)
    break


In [None]:
# 영상 데이터가 저장된 폴더 경로
video_folder = 'data'

# 데이터셋 객체 생성
dataset = ExerciseDataset(video_folder, transform=transform)

# 데이터 로더 객체 생성 (배치 크기 32, 셔플 활성화)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# 데이터 로더를 사용하여 배치 단위로 데이터 확인
for videos, labels in data_loader:
    print(videos.shape, labels.shape)
    break  # 한 배치만 출력


In [None]:
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
model = PoseGCN().to(device)
optimizer = optim.RAdam(model.parameters(),lr = 1e-3)
criterion = nn.MSELoss()
scheduler = CosineAnnealingLR(optimizer, T_max=25, eta_min=0.0001)
def saveModel():
    torch.save(model.state_dict(), f'model/classifying.pt')
loss_ = []
valoss_ = []
logger = {"train_loss": list(),
          "validation_loss": list(),

          }



In [None]:
def training(epochs) :

    for epoch in range(epochs):
        running_train_loss = 0.0
        running_vall_loss = 0.0
        total = 0
        model.train()
        for data in train_x:
            inputs = data
            outputs = data

            optimizer.zero_grad()  # zero the parameter gradients
            encoded, predicted_outputs = model((inputs))  # predict output from the model
            train_loss = criterion(predicted_outputs, outputs)# calculate loss for the predicted output

            train_loss.backward()  # backpropagate the loss
            optimizer.step()  # adjust parameters based on the calculated gradients
            running_train_loss += train_loss.item()  # track the loss value

        loss_.append(running_train_loss / n)
        with torch.no_grad():
            model.eval()
            for data in valid_x:
                inputs = data
                outputs = data

                encoded, predicted_outputs = model((inputs))
                val_loss = criterion(predicted_outputs, outputs)

                # The label with the highest value will be our prediction
                _, predicted = torch.max(predicted_outputs, 1)
                running_vall_loss += val_loss.item()
                total += outputs.size(0)
                val_loss_value = running_vall_loss / len(valid_x)


        valoss_.append(val_loss_value)

        avgtrainloss = np.mean(loss_)
        avgvalidloss = np.mean(valoss_)
        print('epoch', epoch + 1)
        print(f'train loss : {avgtrainloss}, validation loss : {avgvalidloss}')
        early_stopping(avgvalidloss, model)
        if early_stopping.early_stop:  # 조건 만족 시 조기 종료
            break
    saveModel()