In [None]:
%pip install torch torchvision
%pip install numpy
%pip install opencv-python

In [None]:
import numpy as np
import cv2


def get_mhi(prev_img, next_img):
    """
    두 이미지로부터 모션 히스토리 이미지를 계산해 반환하는 함수
    :param prev_img: 이전 이미지
    :param next_img: 이후 이미지
    :return: 모션 히스토리 이미지
    """
    diff = cv2.absdiff(prev_img, next_img)
    gray_diff = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
    return gray_diff


def get_mhis(video):
    """
    비디오 데이터를 모션 히스토리 이미지의 리스트로 반환하는 함수
    :param video: cv2 비디오
    :return: 모션 히스토리 이미지 리스트
    """
    result = []

    prev_img = None
    while True:
        ret, curr_img = video.read()

        if not ret:
            break

        # 모션 히스토리 이미지 생성
        if prev_img is not None:
            mhi = get_mhi(prev_img, curr_img)
            result.append(mhi)

        prev_img = curr_img

    return result

In [None]:
import os
import torch
import cv2
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import numpy as np


class ActionDataset(Dataset):
    def __init__(self, root_dir, frame_count=30):
        self.root_dir = root_dir
        self.samples = []
        self.label_map = {}
        self.frame_count = frame_count
        
        # 라벨 매핑 생성
        for idx, label in enumerate(os.listdir(root_dir)):
            self.label_map[label] = idx
            
        # 데이터 경로와 라벨 수집
        for label in os.listdir(root_dir):
            label_dir = os.path.join(root_dir, label)
            n = 0
            for video_file in os.listdir(label_dir):
                if video_file.endswith('.avi'):
                    n += 1
                    self.samples.append({
                        'path': os.path.join(label_dir, video_file),
                        'label': self.label_map[label]
                    })
            print(f"Loaded {n} samples for label {label}")

    def get_label_map(self):
        return self.label_map
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        video = cv2.VideoCapture(sample['path'])
        
        # MHI 시퀀스 생성
        mhi_sequence = []
        prev_img = None
        frame_count = 0
        
        while frame_count < self.frame_count:
            ret, curr_img = video.read()
            if not ret:
                break
                
            if prev_img is not None:
                mhi = get_mhi(prev_img, curr_img)
                mhi_sequence.append(mhi)
                
            prev_img = curr_img
            frame_count += 1
            
        video.release()
        
        # numpy array로 변환
        mhi_sequence = np.array(mhi_sequence)
        mhi_tensor = torch.from_numpy(mhi_sequence).float()
        # 채널 차원 추가 (N, H, W) -> (N, 1, H, W)
        mhi_tensor = mhi_tensor.unsqueeze(1)
        
        return mhi_tensor, sample['label'], len(mhi_sequence)
    

def collate_fn(batch):
   # 배치 내의 데이터, 라벨, 길이를 분리
   sequences, labels, lengths = zip(*batch)
   
   # 가장 긴 시퀀스에 맞춰 패딩
   padded_sequences = pad_sequence(sequences, batch_first=True)
   
   # 길이에 따라 정렬 (긴 것부터)
   lengths = torch.LongTensor(lengths)
   lengths, sort_idx = lengths.sort(descending=True)
   padded_sequences = padded_sequences[sort_idx]
   labels = torch.LongTensor([labels[i] for i in sort_idx])
   
   return padded_sequences, labels, lengths


def get_dataloader(root_dir, batch_size, frame_count):
    dataset = ActionDataset(root_dir, frame_count)
    train_dataset, test_dataset = random_split(dataset, [len(dataset) - 100, 100])
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    return train_loader, test_loader

In [None]:
from torch import nn
import torch


class ConvLSTMCell(nn.Module):
    def __init__(self,
                 input_channels: int,
                 hidden_channels: int,
                 kernel_size: int,
                 input_size: tuple[int, int]):
        super().__init__()

        self.input_channels = input_channels
        self.hidden_channels = hidden_channels
        self.kernel_size = kernel_size
        self.height, self.width = input_size
        self.padding = kernel_size // 2

        self.conv = nn.Conv2d(
           in_channels=input_channels + hidden_channels,
           out_channels=4 * hidden_channels,  # i, f, o, g gates
           kernel_size=kernel_size,
           padding=self.padding
        )

    def forward(self, x, hidden_state=None):
        batch_size = x.size(0)

        if hidden_state is None:
            h_state = torch.zeros(batch_size, self.hidden_channels, 
                                self.height, self.width).to(x.device)
            c_state = torch.zeros(batch_size, self.hidden_channels, 
                                self.height, self.width).to(x.device)
        else:
            h_state, c_state = hidden_state

        combined = torch.cat([x, h_state], dim=1)
        gates = self.conv(combined)

        # gates를 분리합니다
        i_gate, f_gate, o_gate, g_gate = gates.chunk(4, dim=1)

        # 활성화 함수 적용
        i_gate = torch.sigmoid(i_gate)
        f_gate = torch.sigmoid(f_gate)
        o_gate = torch.sigmoid(o_gate)
        g_gate = torch.tanh(g_gate)

        # 새로운 cell state 계산
        c_state = f_gate * c_state + i_gate * g_gate
        # 새로운 hidden state 계산
        h_state = o_gate * torch.tanh(c_state)

        return h_state, c_state    


class ConvLSTM(nn.Module):
    def __init__(self, input_channels, hidden_channels, kernel_size, input_size, num_layers, num_classes):
        super().__init__()

        self.input_channels = input_channels
        self.hidden_channels = hidden_channels
        self.num_layers = num_layers
        self.num_classes = num_classes

        # 여러 층의 ConvLSTM 셀을 생성
        cell_list = []
        for i in range(num_layers):
           cur_input_channels = input_channels if i == 0 else hidden_channels
           cell_list.append(ConvLSTMCell(
               input_channels=cur_input_channels,
               hidden_channels=hidden_channels,
               kernel_size=kernel_size,
               input_size=input_size
           ))
        self.cell_list = nn.ModuleList(cell_list)

        # 전역 평균 풀링 추가
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        
        # 분류기 추가
        self.classifier = nn.Linear(hidden_channels, num_classes)

    def forward(self, x):
        """
        입력:
            x: (batch_size, time_steps, channels, height, width)
        출력:
            final_output: (batch_size, num_classes)
        """
        time_steps = x.size(1)
        
        # 각 층의 마지막 상태만 저장
        last_states = [None] * self.num_layers
        cur_layer_input = x

        for layer_idx in range(self.num_layers):
            h_state = None
            
            # 각 타임스텝 처리
            for t in range(time_steps):
                h_state, c_state = self.cell_list[layer_idx](
                    cur_layer_input[:, t, :, :, :],
                    last_states[layer_idx]
                )
                last_states[layer_idx] = (h_state, c_state)
            
            # 다음 층의 입력 준비
            if layer_idx < self.num_layers - 1:
                cur_layer_input = h_state.unsqueeze(1).expand(-1, time_steps, -1, -1, -1)
        
        # 마지막 층의 마지막 hidden state 사용
        final_hidden = h_state  # (batch_size, hidden_channels, height, width)
        
        # 전역 평균 풀링
        pooled = self.global_pool(final_hidden)  # (batch_size, hidden_channels, 1, 1)
        flattened = pooled.view(pooled.size(0), -1)  # (batch_size, hidden_channels)
        
        # 분류
        output = self.classifier(flattened)  # (batch_size, num_classes)
        
        return output

In [None]:
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    for batch_idx, (data, target, length) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
       
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        print(f'Training... [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

In [None]:
import torch

def test(model, test_loader, device, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target, length in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            
    test_loss /= len(test_loader.dataset)
    print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{total} ({100. * correct / total:.0f}%)')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

def main():
    # 하이퍼파라미터 설정
    input_channels = 1  # MHI는 그레이스케일
    hidden_channels = 32
    kernel_size = 3
    num_layers = 2
    num_classes = 6  # 분류할 동작 클래스 수
    batch_size = 4
    num_epochs = 100
    frame_count = 50
    learning_rate = 0.001
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # 데이터셋 로드
    train_loader, test_loader = get_dataloader(root_dir='dataset', batch_size=batch_size, frame_count=frame_count)
    
    # 첫 번째 배치로부터 입력 크기 얻기
    sample_data, *_ = next(iter(train_loader))
    input_size = (sample_data.size(3), sample_data.size(4))

    # 모델 초기화
    model = ConvLSTM(
        input_channels=input_channels,
        hidden_channels=hidden_channels,
        kernel_size=kernel_size,
        input_size=input_size,
        num_layers=num_layers,
        num_classes=num_classes,
    ).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # 학습 실행
    for epoch in range(1, num_epochs + 1):
        print(f"Epoch {epoch} of {num_epochs}...")
        train(model, train_loader, criterion, optimizer, device)

    # 테스트
    test(model, test_loader, device, criterion)

In [None]:
main()