### 앙상블을 위한 vi model의 prob 추출 코드

In [665]:
import pandas as pd
import numpy as np
import matplotlib.cm as cm
from scipy.signal import spectrogram
import cv2
import os
import csv
import glob
import torch

In [666]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [667]:
label_mapping = {'baekseungdo': 0, 'cheonaeji': 1, 'chunjihun': 2, 'leegihun': 3, 'leeseunglee': 4, 'leeyunguel': 5, 'kimminju': 6, 'kimgangsu': 7}
course_mapping = {"A": 0,"B": 1,"C": 2}
round_mapping = {"1": 0,"2": 1,"3": 2,"4": 3,"5": 4, "6":5}

In [668]:
classes = {'baekseungdo': 0, 'cheonaeji': 1, 'chunjihun': 2, 'leegihun': 3, 'leeseunglee': 4, 'leeyunguel': 5, 'kimminju': 6, 'kimgangsu': 7}

In [669]:
name_lt = ['baekseungdo', 'cheonaeji', 'chunjihun', 'leegihun', 'leeseunglee', 'leeyunguel', 'kimminju', 'kimgangsu']
course_list = ['A','B','C']
event_lt = ['bump','corner']
round_lt = ['1', '2', '3', '4', '5', '6']

In [670]:
num_classes = len(name_lt)

In [671]:
# csv_filename = "./csv/8_vi_list.csv"
# with open(csv_filename, mode='w', newline='') as file:
#     writer = csv.writer(file)
#     writer.writerow(["label", "course", "round", "file"])
#     frame = 0
#     for n in name_lt:
#         for e in event_lt:
#             for c in course_list:
#                 for r in round_lt:
#                     path = f'./visual_stride_10/{n}/{e}/{c}/{r}'
#                     print(path)
#                     file_list = os.listdir(path)  # 파일명

#                     for f in file_list:
#                         frame += 1
#                         file = os.path.join(path, f)

#                         label = label_mapping.get(n, -1)  # Label 매핑
#                         course_name = course_mapping.get(c, -1)  # Course 매핑
#                         round = round_mapping.get(r, -1)
#                         # print(label, course_name, file)
#                         # CSV 파일에 데이터 쓰기
#                         writer.writerow([label, course_name, r, file])
#                     print(f'{n}_{c} --> done')

# print(f'CSV 파일 "{csv_filename}"이 생성되었습니다.')

In [672]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import glob
from tqdm import tqdm
import cv2
import random
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchvideotransforms import video_transforms, volume_transforms
import natsort
from PIL import Image

from model import *

In [673]:
import timm
import torch
import torch, torch.nn as nn, torch.nn.functional as F
import torch
from torch import nn, einsum
import torchvision.models as models
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
from torchvision import transforms

In [700]:
# class CustomDataset(Dataset):
#     def __init__(self, csv, mode):
#         self.csv = csv
#         self.mode = mode

#     def __len__(self):
#         return self.csv.shape[0]

#     def __getitem__(self, idx):
#         file = self.csv.loc[idx, 'file']
#         label = torch.as_tensor(self.csv.loc[idx, 'label'], dtype=torch.long)
#         file = np.load(file, allow_pickle=True)

#         return torch.Tensor(file).permute(2, 0, 1), label

# class CustomDataset(Dataset):
#     def __init__(self, csv, mode, transform=None):
#         """
#         csv: 데이터셋 정보를 담고 있는 Pandas DataFrame.
#         mode: 'train', 'val', 또는 'test'와 같은 데이터셋의 모드.
#         transform (optional): PIL 이미지에 적용될 transform.
#         """
#         self.csv = csv
#         self.mode = mode
#         self.transform = transform

#     def __len__(self):
#         return len(self.csv)

#     def __getitem__(self, idx):
#         # 이미지 파일 경로
#         file_path = self.csv.loc[idx, 'file']
#         # 레이블
#         label = torch.tensor(self.csv.loc[idx, 'label'], dtype=torch.long)
#         # PIL 라이브러리를 사용해 이미지 로드
#         image = Image.open(file_path).convert('RGB')
        
#         # Transform 적용
#         if self.transform:
#             image = self.transform(image)
#         else:
#             # 기본 변환: ToTensor만 적용
#             default_transform = transforms.ToTensor()
#             image = default_transform(image)

#         return image, label

class CustomDataset(Dataset):
    def __init__(self, csv, mode, transform=None):
        self.csv = csv
        self.mode = mode
        self.transform = transform

    def __len__(self):
        return len(self.csv)

    def __getitem__(self, idx):
        # 이미지 시퀀스 경로
        file_path_template = self.csv.loc[idx, 'file'][:-7]   
        image_paths = natsort.natsorted(glob.glob(f'{file_path_template}*.jpg')) 
        images = []
        for image_path in image_paths[:30]:  # 최대 30개의 이미지를 처리
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            images.append(image) 
        
        # 이미지들을 스택으로 쌓아 하나의 텐서로 만듦
        image_stack = torch.stack(images, dim=0) 
        
        label = torch.tensor(self.csv.loc[idx, 'label'], dtype=torch.long) 
        
        return image_stack, label
    
transform = transforms.Compose([
    transforms.CenterCrop((224, 224)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 이미지를 PyTorch 텐서로 변환
    transforms.Normalize(mean=[0.5, 0.5, 0.5],std=[0.5, 0.5, 0.5])
])

In [701]:
class VideoClassifier(nn.Module):
    def __init__(self, num_classes=8, window_num=30, pretrained=True):
        super(VideoClassifier, self).__init__()
        if pretrained:
            weights = EfficientNet_B0_Weights.IMAGENET1K_V1
        else:
            weights = None
        self.window_num = window_num
        self.base_model = efficientnet_b0(weights=weights)
        in_features = self.base_model.classifier[1].in_features
        self.base_model.classifier[1] = nn.Identity()
        self.feature_extractor = nn.Linear(in_features, 1280)

        self.classifier = nn.Linear(1280, num_classes)

    def forward(self, x):
        b, c, f, h, w = x.size()
        # x = x.view(-1, c, h, w)
        x = x.reshape(-1, c, h, w)
        # x = x.view(-1, x.size(2), x.size(3), x.size(4))
        x = self.base_model(x)
        x = self.feature_extractor(x)
        x = x.view(-1, self.window_num, 1280).mean(1)
        x = self.classifier(x)
        return x

In [702]:
model = VideoClassifier(num_classes=num_classes, window_num=30).to(device)

In [733]:
# 모델 상태 로드
state_dict = torch.load('./model_saved_vi/round_3_best_model.pth')

# 'module.' 접두어 제거
new_state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}

# 수정된 상태로 모델 상태 업데이트
model.load_state_dict(new_state_dict)
model.to(device)

VideoClassifier(
  (base_model): EfficientNet(
    (features): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)
      )
      (1): Sequential(
        (0): MBConv(
          (block): Sequential(
            (0): Conv2dNormActivation(
              (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
              (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
              (2): SiLU(inplace=True)
            )
            (1): SqueezeExcitation(
              (avgpool): AdaptiveAvgPool2d(output_size=1)
              (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
              (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
              (activation): SiLU(inplace=True)
              (

In [734]:
model.eval()

VideoClassifier(
  (base_model): EfficientNet(
    (features): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)
      )
      (1): Sequential(
        (0): MBConv(
          (block): Sequential(
            (0): Conv2dNormActivation(
              (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
              (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
              (2): SiLU(inplace=True)
            )
            (1): SqueezeExcitation(
              (avgpool): AdaptiveAvgPool2d(output_size=1)
              (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
              (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
              (activation): SiLU(inplace=True)
              (

In [735]:
batch_size = 10

In [736]:
df = pd.read_csv('./csv/8_vi_list.csv')
df_valid = df[df['round'] == 3]
df_valid = df_valid.reset_index(drop=True)

In [737]:
visual_paths = df_valid['file'].tolist()

In [738]:
# visual_paths

In [739]:
valid_dataset = CustomDataset(df_valid, 'valid', transform=transform) 
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [740]:
labels = []
probabilities = []

In [None]:
# with torch.no_grad():
#     for data, label in valid_loader: 
#         data = data.to(device)   
#         data = data.permute(0, 2, 1, 3, 4)
#         print(data.shape)
#         output = model(data)
#         softmax_probs = F.softmax(output, dim=1).cpu().numpy()
#         probabilities.extend(softmax_probs)
#         labels.extend(label.numpy())

from tqdm import tqdm

with torch.no_grad():
    for data, label in tqdm(valid_loader, desc="Validating"): 
        data = data.to(device)   
        data = data.permute(0, 2, 1, 3, 4)
        output = model(data).to(device)
        softmax_probs = F.softmax(output, dim=1).cpu().numpy()
        probabilities.extend(softmax_probs)
        labels.extend(label.numpy())

Validating:  58%|█████▊    | 1164/2019 [09:59<07:12,  1.98it/s]

In [None]:
unique_labels = np.unique(labels)
average_probabilities = {label: np.mean([prob for l, prob in zip(labels, probabilities) if l == label], axis=0) for label in unique_labels}

In [None]:
avg_probs_df = pd.DataFrame(average_probabilities).transpose()
avg_probs_df

In [None]:
avg_probs_df.columns = [f'{i}' for i in range(num_classes)]
avg_probs_df['val_3'] = avg_probs_df.index

In [None]:
column_order = ['val_3'] + [col for col in avg_probs_df if col != 'val_3']
avg_probs_df = avg_probs_df[column_order]

In [None]:
# DataFrame 생성 및 CSV로 저장
avg_probs_df.to_csv('./prob/vi_round_3_avg_prob.csv', index=False, float_format='%.3f')