#  <center> Telecons + DLmath (CNN+ LSTM) </center>

## 1. Data Preparation

In [1]:
import os
import pandas as pd
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import cv2
import numpy as np
from torch.utils.data import Dataset
import random
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

## 2. Define the PyTorch Dataset 

In [2]:
# 데이터셋 정의
class VideoDataset(Dataset):
    def __init__(self, videos, labels, transform=None):
        self.videos = videos
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video = self.videos[idx]  # (T, H, W, C)
        label = self.labels[idx]
        
        # 동영상 프레임 별로 전처리
        if self.transform:
            video = torch.stack([self.transform(frame) for frame in video])
        
        return video, label

## 3. CNN-LSTM Model Design

In [3]:
class CNNLSTMModel(nn.Module):
    def __init__(self, num_classes):
        super(CNNLSTMModel, self).__init__()
        
        # CNN Layers
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 64 -> 16

            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 16 -> 4

            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 4 -> 1
        )
        
        # LSTM Layer
        self.lstm = nn.LSTM(input_size=32, hidden_size=128, num_layers=2, batch_first=True)
        
        # Fully Connected Layer
        self.fc = nn.Linear(128, num_classes)
    
    def forward(self, x):
        batch_size, seq_len, c, h, w = x.shape # 4, 20, 3, 64, 64
        x = x.view(batch_size * seq_len, c, h, w)
        x = self.cnn(x)
        x = x.view(batch_size, seq_len, -1)  # (batch_size, seq_len, feature_dim) # 4 x 20 x 32
        _, (hidden, _) = self.lstm(x)
        x = self.fc(hidden[-1])  # 마지막 LSTM layer의 hidden state 사용
        return x

In [4]:
import os
import cv2
import numpy as np

def load_videos_split_frames(base_dir, folders, frame_size=(64, 64), num_frames=5):
    """
    동영상을 5등분하여 각 구간에서 1프레임씩 총 5프레임을 추출

    Parameters:
        base_dir (str): 동영상 폴더의 기본 디렉토리
        folders (list): 하위 폴더 리스트
        frame_size (tuple): 프레임 크기 (H, W)
        num_frames (int): 추출할 프레임 수 (기본값 5)

    Returns:
        videos (list): 영상 데이터 리스트 [(T, H, W, C)]
        labels (list): 라벨 리스트
    """
    videos, labels = [], []
    label_map = {"C1": 0, "C2": 1, "C5": 2}  # 클래스 라벨링

    for folder in folders:
        folder_path = os.path.join(base_dir, folder)
        if os.path.isdir(folder_path):
            for file_name in os.listdir(folder_path):
                if file_name.endswith(".gif"):
                    video_path = os.path.join(folder_path, file_name)
                    cap = cv2.VideoCapture(video_path)
                    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

                    if total_frames >= num_frames:
                        step = total_frames // num_frames
                        selected_frames = []

                        for i in range(num_frames):
                            frame_idx = i * step
                            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                            ret, frame = cap.read()
                            if not ret:
                                break
                            frame = cv2.resize(frame, frame_size)
                            selected_frames.append(frame)

                        if len(selected_frames) == num_frames:
                            videos.append(np.array(selected_frames))  # (T, H, W, C)
                            labels.append(label_map[folder.split("_")[0]])

                    cap.release()
    return videos, labels

## 4. Data Loader

In [5]:
# 주요 파라미터 및 실행
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_dir = "/home/work/DLmath/Seulbin/Telecons/data/thermal_processed/"
folders = ["C1_KNG_0207", "C1_SHM_0201", "C2_KNG_0207", "C2_SHM_0201_0229", "C5_SHM_0229"]
videos, video_labels = load_videos_split_frames(base_dir, folders)

# Train-Val-Test 분할
train_videos, temp_videos, train_labels, temp_labels = train_test_split(videos, video_labels, test_size=0.4, random_state=42)
val_videos, test_videos, val_labels, test_labels = train_test_split(temp_videos, temp_labels, test_size=0.5, random_state=42)

# 데이터셋 및 데이터로더 생성
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
train_dataset = VideoDataset(train_videos, train_labels, transform)
val_dataset = VideoDataset(val_videos, val_labels, transform)
test_dataset = VideoDataset(test_videos, test_labels, transform)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

KeyboardInterrupt: 

In [None]:
import os
import cv2
import numpy as np

def load_videos_split_frames(base_dir, folders, frame_size=(64, 64), num_frames=5):
    """
    동영상을 5등분하여 각 구간에서 1프레임씩 총 5프레임을 추출

    Parameters:
        base_dir (str): 동영상 폴더의 기본 디렉토리
        folders (list): 하위 폴더 리스트
        frame_size (tuple): 프레임 크기 (H, W)
        num_frames (int): 추출할 프레임 수 (기본값 5)

    Returns:
        videos (list): 영상 데이터 리스트 [(T, H, W, C)]
        labels (list): 라벨 리스트
    """
    videos, labels = [], []
    label_map = {"C1": 0, "C2": 1, "C5": 2}  # 클래스 라벨링

    for folder in folders:
        folder_path = os.path.join(base_dir, folder)
        if os.path.isdir(folder_path):
            for file_name in os.listdir(folder_path):
                if file_name.endswith(".gif"):
                    video_path = os.path.join(folder_path, file_name)
                    cap = cv2.VideoCapture(video_path)
                    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

                    if total_frames >= num_frames:
                        step = total_frames // num_frames
                        selected_frames = []

                        for i in range(num_frames):
                            frame_idx = i * step
                            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                            ret, frame = cap.read()
                            if not ret:
                                break
                            frame = cv2.resize(frame, frame_size)
                            selected_frames.append(frame)

                        if len(selected_frames) == num_frames:
                            videos.append(np.array(selected_frames))  # (T, H, W, C)
                            labels.append(label_map[folder.split("_")[0]])

                    cap.release()
    return videos, labels

## 5. Training Loop

In [None]:
# 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=50):
    best_model = None
    best_val_acc = 0.0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        correct_preds, total_preds = 0, 0

        for videos, labels in train_loader:
            videos, labels = videos.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(videos)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

            # 정확도 계산
            _, preds = torch.max(outputs, dim=1)
            correct_preds += (preds == labels).sum().item()
            total_preds += labels.size(0)

        train_acc = correct_preds / total_preds

        # Validation
        model.eval()
        val_loss = 0.0
        correct_preds, total_preds = 0, 0
        with torch.no_grad():
            for videos, labels in val_loader:
                videos, labels = videos.to(device), labels.to(device)
                outputs = model(videos)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # 정확도 계산
                _, preds = torch.max(outputs, dim=1)
                correct_preds += (preds == labels).sum().item()
                total_preds += labels.size(0)

        val_loss /= len(val_loader)
        val_acc = correct_preds / total_preds

        print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")

        # Best Model 저장
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model = model.state_dict()
            torch.save(best_model, "best_model_cnn_lstm.pth")
            print(f"Best model saved with validation accuracy: {best_val_acc:.4f}")

    # 최종적으로 Best Model 로드
    if best_model:
        model.load_state_dict(best_model)
        print(f"Best model loaded with validation accuracy: {best_val_acc:.4f}")

    return model

# 테스트 함수
def test_model(model, test_loader, device):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for videos, labels in test_loader:
            videos = videos.to(device)
            outputs = model(videos)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())

    acc = accuracy_score(all_labels, all_preds)
    print(f"Test Accuracy: {acc:.4f}")
    return acc

In [None]:
# 모델, 손실 함수, 옵티마이저 정의
model = CNNLSTMModel(num_classes=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# 학습 및 테스트
model = train_model(model, train_loader, val_loader, criterion, optimizer, device)

In [None]:
test_model(model, test_loader, device)