In [None]:
import os
import cv2
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# ==============================
# 1. 폴더로부터 .avi 파일들 읽어오기
# ==============================
folder = "/workspace/seq-vision/UCF101"
files = [f for f in os.listdir(folder) if f.lower().endswith('.avi') and os.path.isfile(os.path.join(folder, f))]

# ==============================
# 2. 파일명에서 1번째 '_'와 2번째 '_' 사이의 문자열을 추출하여
#    label로 사용 (원래 문자열이므로 unique하게 모은 후 정렬하여 0~100 할당, 총 101개여야 함)
# ==============================
# key: label(string), value: 파일 경로 리스트
class_to_files = {}
for file in files:
    filename = os.path.splitext(file)[0]
    first_us = filename.find('_')
    if first_us == -1:
        print(f"파일 '{file}'에 '_'가 없습니다. 건너뜁니다.")
        continue
    second_us = filename.find('_', first_us + 1)
    if second_us == -1:
        print(f"파일 '{file}'에 두 번째 '_'가 없습니다. 건너뜁니다.")
        continue
    # 1번째 '_'와 2번째 '_' 사이의 문자열 추출 (문자열 label)
    label_str = filename[first_us+1:second_us]
    class_to_files.setdefault(label_str, []).append(os.path.join(folder, file))

# 고유 label들을 추출하여 정렬 (반드시 101개여야 함)
unique_labels = sorted(class_to_files.keys())
if len(unique_labels) != 101:
    print(f"경고: 고유 label의 개수가 101개가 아닙니다. (총 {len(unique_labels)}개)")
global_label2idx = { label: idx for idx, label in enumerate(unique_labels) }
print("생성된 label mapping (문자열 -> 정수):")
for label, idx in global_label2idx.items():
    print(f"'{label}' -> {idx}")

# ==============================
# 3. 각 클래스마다 10개 샘플 채취 (각 비디오는 75프레임, 부족 시 제로 패딩)
# ==============================
target_frames = 75
class_to_videos = {}
for label_str, file_list in class_to_files.items():
    if len(file_list) < 10:
        print(f"클래스 '{label_str}'의 파일 개수가 10개 미만입니다 (총 {len(file_list)}개). 건너뜁니다.")
        continue
    # 10개보다 많으면 랜덤 샘플링
    selected_files = random.sample(file_list, 10) if len(file_list) > 10 else file_list
    videos = []
    for video_path in selected_files:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"영상 '{video_path}' 열기 실패. 건너뜁니다.")
            continue
        frames = []
        # 최대 target_frames 만큼 프레임 읽기
        for _ in range(target_frames):
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
        cap.release()
        if len(frames) == 0:
            print(f"영상 '{video_path}'에서 프레임을 하나도 읽지 못했습니다.")
            continue
        # 75프레임 미만이면 제로 패딩 (첫 프레임과 동일한 shape 사용)
        if len(frames) < target_frames:
            pad_frame = np.zeros_like(frames[0])
            for _ in range(target_frames - len(frames)):
                frames.append(pad_frame)
        # 75프레임 초과 시 처음 75프레임만 사용
        if len(frames) > target_frames:
            frames = frames[:target_frames]
        videos.append(frames)
    if len(videos) != 10:
        print(f"클래스 '{label_str}'의 10개 샘플을 모으지 못했습니다. (모은 샘플 수: {len(videos)})")
        continue
    class_to_videos[label_str] = videos

# ==============================
# 4. 각 클래스의 10개 샘플을 (6, 2, 2) 비율로 train, valid, test 분할
# ==============================
train_list = []
valid_list = []
test_list = []
for label_str, videos in class_to_videos.items():
    random.shuffle(videos)
    train_videos = videos[:6]
    valid_videos = videos[6:8]
    test_videos  = videos[8:10]
    for video in train_videos:
        train_list.append((label_str, video))
    for video in valid_videos:
        valid_list.append((label_str, video))
    for video in test_videos:
        test_list.append((label_str, video))

# ==============================
# 5. PyTorch Dataset 및 DataLoader 생성
#    - 각 영상은 프레임 리스트를 torch tensor로 변환 (shape: [num_frames, C, H, W])
#    - transform을 통해 모든 프레임을 (224, 224)로 리사이즈
#    - label은 global_label2idx를 사용하여 (1,) shape의 tensor로 반환
# ==============================
def resize_video_frames(video_tensor, size=(224, 224)):
    # video_tensor: (num_frames, C, H, W)
    return F.interpolate(video_tensor, size=size, mode='bilinear', align_corners=False)

class VideoDataset(Dataset):
    def __init__(self, video_list, label2idx, transform=None):
        """
        video_list: 각 항목이 (label_str, frames(list)) 형태
        label2idx: 전역 label mapping (문자열 -> 정수)
        transform: 영상 텐서에 적용할 함수. 기본적으로 모든 프레임을 (224,224)로 리사이즈함.
        """
        self.video_list = video_list
        self.label2idx = label2idx
        self.transform = transform if transform is not None else lambda video: resize_video_frames(video, size=(224, 224))
    
    def __len__(self):
        return len(self.video_list)
    
    def __getitem__(self, idx):
        label_str, frames = self.video_list[idx]
        # 각 프레임: numpy array (H, W, C) -> torch tensor (C, H, W)
        video_tensor = torch.stack([torch.from_numpy(frame).permute(2, 0, 1) for frame in frames])
        if self.transform:
            video_tensor = self.transform(video_tensor)
        # label은 global_label2idx를 통해 정수로 변환, (1,) shape의 tensor 반환
        label_tensor = torch.tensor([self.label2idx[label_str]], dtype=torch.long)
        return video_tensor, label_tensor

# Dataset 생성
train_dataset = VideoDataset(train_list, label2idx=global_label2idx)
valid_dataset = VideoDataset(valid_list, label2idx=global_label2idx)
test_dataset  = VideoDataset(test_list,  label2idx=global_label2idx)

# DataLoader 생성 (배치 사이즈는 4로 설정)
batch_size = 4
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import os
import cv2
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from collections import defaultdict

def load_avi_files(folder):
    """
    지정된 폴더에서 .avi 파일들을 읽어와 파일명 리스트를 반환합니다.
    """
    files = [f for f in os.listdir(folder) if f.lower().endswith('.avi') and os.path.isfile(os.path.join(folder, f))]
    return files

def extract_label_from_filename(filename):
    """
    파일명에서 1번째 '_'와 2번째 '_' 사이의 문자열을 추출합니다.
    반환값: label 문자열 또는 올바른 구분자가 없으면 None
    """
    basename = os.path.splitext(filename)[0]
    first_us = basename.find('_')
    if first_us == -1:
        return None
    second_us = basename.find('_', first_us + 1)
    if second_us == -1:
        return None
    return basename[first_us+1:second_us]

def group_files_by_label(folder, files):
    """
    파일들을 label(파일명에서 1번째 '_'와 2번째 '_' 사이의 문자열) 기준으로 그룹화합니다.
    반환값: { label_str: [파일 경로, ...] }
    """
    class_to_files = {}
    for file in files:
        label_str = extract_label_from_filename(file)
        if label_str is None:
            print(f"파일 '{file}'에 올바른 '_' 구분자가 없습니다. 건너뜁니다.")
            continue
        file_path = os.path.join(folder, file)
        class_to_files.setdefault(label_str, []).append(file_path)
    return class_to_files

def create_label_mapping(class_to_files):
    """
    그룹화된 파일 딕셔너리에서 고유 label을 추출하여 정렬한 후,
    label mapping (문자열 -> 정수)을 생성합니다.
    (총 101개의 label이어야 함)
    """
    unique_labels = sorted(class_to_files.keys())
    if len(unique_labels) != 101:
        print(f"경고: 고유 label의 개수가 101개가 아닙니다. (총 {len(unique_labels)}개)")
    global_label2idx = { label: idx for idx, label in enumerate(unique_labels) }
    print("생성된 label mapping (문자열 -> 정수):")
    for label, idx in global_label2idx.items():
        print(f"'{label}' -> {idx}")
    return global_label2idx

def sample_videos_for_each_class(class_to_files, target_frames=75, num_samples=10):
    """
    각 클래스마다 num_samples 개의 영상을 선택하고,
    각 영상은 최대 target_frames 프레임을 읽어 75프레임 미만이면 제로 패딩을 추가합니다.
    반환값: { label_str: [video_frames_list, ...] }
      - video_frames_list: 각 영상의 프레임 리스트 (각 프레임은 numpy array)
    """
    class_to_videos = {}
    for label_str, file_list in class_to_files.items():
        if len(file_list) < num_samples:
            print(f"클래스 '{label_str}'의 파일 개수가 {num_samples}개 미만입니다 (총 {len(file_list)}개). 건너뜁니다.")
            continue
        selected_files = random.sample(file_list, num_samples) if len(file_list) > num_samples else file_list
        videos = []
        for video_path in selected_files:
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                print(f"영상 '{video_path}' 열기 실패. 건너뜁니다.")
                continue
            frames = []
            for _ in range(target_frames):
                ret, frame = cap.read()
                if not ret:
                    break
                frames.append(frame)
            cap.release()
            if len(frames) == 0:
                print(f"영상 '{video_path}'에서 프레임을 하나도 읽지 못했습니다.")
                continue
            if len(frames) < target_frames:
                pad_frame = np.zeros_like(frames[0])
                for _ in range(target_frames - len(frames)):
                    frames.append(pad_frame)
            if len(frames) > target_frames:
                frames = frames[:target_frames]
            videos.append(frames)
        if len(videos) != num_samples:
            print(f"클래스 '{label_str}'의 {num_samples}개 샘플을 모으지 못했습니다. (모은 샘플 수: {len(videos)})")
            continue
        class_to_videos[label_str] = videos
    return class_to_videos

def split_videos(class_to_videos, split_ratios=(6, 2, 2)):
    """
    각 클래스의 10개 샘플을 split_ratios (train, valid, test) 비율로 분할합니다.
    반환값:
      - train_list, valid_list, test_list: 각 항목은 (label_str, video_frames_list) 형태
    """
    train_list, valid_list, test_list = [], [], []
    for label_str, videos in class_to_videos.items():
        random.shuffle(videos)
        n_train, n_valid, n_test = split_ratios
        train_videos = videos[:n_train]
        valid_videos = videos[n_train:n_train+n_valid]
        test_videos  = videos[n_train+n_valid:n_train+n_valid+n_test]
        for video in train_videos:
            train_list.append((label_str, video))
        for video in valid_videos:
            valid_list.append((label_str, video))
        for video in test_videos:
            test_list.append((label_str, video))
    return train_list, valid_list, test_list

def resize_video_frames(video_tensor, size=(224, 224)):
    """
    주어진 영상 텐서(video_tensor, shape: [num_frames, C, H, W])의 모든 프레임을 지정한 크기(size)로 리사이즈합니다.
    """
    return F.interpolate(video_tensor, size=size, mode='bilinear', align_corners=False)

class VideoDataset(Dataset):
    def __init__(self, video_list, label2idx, transform=None):
        """
        video_list: 각 항목이 (label_str, frames(list)) 형태
        label2idx: 전역 label mapping (문자열 -> 정수)
        transform: 영상 텐서에 적용할 함수 (기본: 모든 프레임을 (224,224)로 리사이즈)
        """
        self.video_list = video_list
        self.label2idx = label2idx
        self.transform = transform if transform is not None else lambda video: resize_video_frames(video, size=(224, 224))
    
    def __len__(self):
        return len(self.video_list)
    
    def __getitem__(self, idx):
        label_str, frames = self.video_list[idx]
        # 각 프레임: numpy array (H, W, C) -> torch tensor (C, H, W)
        video_tensor = torch.stack([torch.from_numpy(frame).permute(2, 0, 1) for frame in frames])
        if self.transform:
            video_tensor = self.transform(video_tensor)
        # label은 global_label2idx를 통해 정수로 변환하며, (1,) shape의 tensor 반환
        label_tensor = torch.tensor([self.label2idx[label_str]], dtype=torch.long)
        return video_tensor, label_tensor

def create_dataloaders(train_dataset, valid_dataset, test_dataset, batch_size=4):
    """
    주어진 Dataset을 이용하여 DataLoader를 생성합니다.
    """
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, valid_loader, test_loader

def main():
    folder = "/workspace/seq-vision/UCF101"
    print("AVI 파일들을 로드하는 중...")
    files = load_avi_files(folder)
    print(f"총 {len(files)}개의 AVI 파일이 발견되었습니다.")
    
    print("파일들을 label 기준으로 그룹화하는 중...")
    class_to_files = group_files_by_label(folder, files)
    
    print("label mapping 생성 중...")
    global_label2idx = create_label_mapping(class_to_files)
    
    print("각 클래스별로 비디오 샘플 채취 중...")
    class_to_videos = sample_videos_for_each_class(class_to_files, target_frames=75, num_samples=10)
    
    print("비디오 데이터를 train, valid, test로 분할하는 중...")
    train_list, valid_list, test_list = split_videos(class_to_videos, split_ratios=(6, 2, 2))
    print(f"Train samples: {len(train_list)}, Valid samples: {len(valid_list)}, Test samples: {len(test_list)}")
    
    print("PyTorch Dataset 및 DataLoader 생성 중...")
    train_dataset = VideoDataset(train_list, label2idx=global_label2idx)
    valid_dataset = VideoDataset(valid_list, label2idx=global_label2idx)
    test_dataset  = VideoDataset(test_list,  label2idx=global_label2idx)
    
    batch_size = 4
    train_loader, valid_loader, test_loader = create_dataloaders(train_dataset, valid_dataset, test_dataset, batch_size)
    
    print("Train Loader:")
    for i, (videos, labels) in enumerate(train_loader):
        print(f"Batch {i+1}: Videos shape: {videos.shape}, Labels shape: {labels.shape}")
        if i == 4:
            break

    print("\nValid Loader:")
    for i, (videos, labels) in enumerate(valid_loader):
        print(f"Batch {i+1}: Videos shape: {videos.shape}, Labels shape: {labels.shape}")
        if i == 4:
            break

    print("\nTest Loader:")
    for i, (videos, labels) in enumerate(test_loader):
        print(f"Batch {i+1}: Videos shape: {videos.shape}, Labels shape: {labels.shape}")
        if i == 4:
            break

    # test_loader 내 label 분포 출력
    label_counts = defaultdict(int)
    for _, labels in test_loader:
        for label in labels:
            label_counts[label.item()] += 1

    print("\nTest Loader Label Distribution:")
    for label in sorted(label_counts.keys()):
        print(f"Label {label}: {label_counts[label]} samples")

if __name__ == '__main__':
    main()


AVI 파일들을 로드하는 중...
총 13320개의 AVI 파일이 발견되었습니다.
파일들을 label 기준으로 그룹화하는 중...
label mapping 생성 중...
생성된 label mapping (문자열 -> 정수):
'ApplyEyeMakeup' -> 0
'ApplyLipstick' -> 1
'Archery' -> 2
'BabyCrawling' -> 3
'BalanceBeam' -> 4
'BandMarching' -> 5
'BaseballPitch' -> 6
'Basketball' -> 7
'BasketballDunk' -> 8
'BenchPress' -> 9
'Biking' -> 10
'Billiards' -> 11
'BlowDryHair' -> 12
'BlowingCandles' -> 13
'BodyWeightSquats' -> 14
'Bowling' -> 15
'BoxingPunchingBag' -> 16
'BoxingSpeedBag' -> 17
'BreastStroke' -> 18
'BrushingTeeth' -> 19
'CleanAndJerk' -> 20
'CliffDiving' -> 21
'CricketBowling' -> 22
'CricketShot' -> 23
'CuttingInKitchen' -> 24
'Diving' -> 25
'Drumming' -> 26
'Fencing' -> 27
'FieldHockeyPenalty' -> 28
'FloorGymnastics' -> 29
'FrisbeeCatch' -> 30
'FrontCrawl' -> 31
'GolfSwing' -> 32
'Haircut' -> 33
'HammerThrow' -> 34
'Hammering' -> 35
'HandStandPushups' -> 36
'HandstandWalking' -> 37
'HeadMassage' -> 38
'HighJump' -> 39
'HorseRace' -> 40
'HorseRiding' -> 41
'HulaHoop' -> 42
'IceDan

: 