<a href="https://colab.research.google.com/github/OverfitSurvivor/code/blob/main/drone_evaluate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import zipfile

zip_path = "/content/drive/MyDrive/ICSV31AIChallengeDataset.zip"  # 업로드한 ZIP 파일 경로
extract_path = "/content/ICSV31AIChallengeDataset"  # 압축을 풀 폴더 경로

# 폴더가 없으면 생성
os.makedirs(extract_path, exist_ok=True)

# 압축 해제
with zipfile.ZipFile(zip_path, "r") as zip_ref:
    zip_ref.extractall(extract_path)

print("압축 해제 완료:", extract_path)

압축 해제 완료: /content/ICSV31AIChallengeDataset


## 모듈 불러오기

In [None]:
!torch.pyc
!_pycache__/torch.cpython-*.pyc


/bin/bash: line 1: torch.pyc: command not found
/bin/bash: line 1: _pycache__/torch.cpython-*.pyc: No such file or directory


In [3]:
import csv
import argparse
import os
from typing import Any, List, Tuple

import torch
import torchaudio
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

## 라벨링 및 데이터셋 로더
+ global scaling STFT 적용

In [4]:
#######################
# 1. Utils
#######################
def read_csv(file_path: str) -> List:
    with open(file_path, "r") as f:
        reader = csv.reader(f)
        return list(reader)

def save_csv(save_data: List[Any], save_file_path: str) -> None:
    with open(save_file_path, "w", newline="") as f:
        writer = csv.writer(f, lineterminator="\n")
        writer.writerows(save_data)

def get_anomaly_label(file_path: str) -> int:
    file_name = os.path.basename(file_path)
    train_mode = file_name.split("_")[0]
    if train_mode == "test":
        return -1
    elif "normal" in file_name:
        return 0
    else:
        return 1

def get_drone_label(file_path: str) -> int:
    file_name = os.path.basename(file_path)
    drone_mode = file_name.split("_")[1]
    if drone_mode == "A":
        return 0
    elif drone_mode == "B":
        return 1
    elif drone_mode == "C":
        return 2
    else:
        return -1

def get_direction_label(file_path: str) -> int:
    file_name = os.path.basename(file_path)
    direction_mode = file_name.split("_")[2]
    if direction_mode == "Back":
        return 0
    elif direction_mode == "Front":
        return 1
    elif direction_mode == "Left":
        return 2
    elif direction_mode == "Right":
        return 3
    elif direction_mode == "Clockwise":
        return 4
    elif direction_mode == "CounterClockwise":
        return 5
    else:
        return -1

Global Mean: -4.636630590752749

Global Std: 12.173359870910645

### 2048/768/192
Global Mean: -4.616434057646901
Global Std: 12.162795066833496

In [5]:
#######################
# 2. Feature Extraction & Augmentation
#######################
# 계산된 전역 평균과 표준편차
# STFT 바꿀때마다 계산해줘야함

GLOBAL_MEAN = -4.616434057646901
GLOBAL_STD = 12.162795066833496
def wav_to_log_stft(
    wav_path: str,
    sr: int,
    n_fft: int,
    win_length: int,
    hop_length: int,
    power: float,
) -> torch.Tensor:
    """
    WAV 파일을 STFT 기반 로그 스펙트럼으로 변환.
    - torchaudio.transforms.Spectrogram로 STFT 계산
    - AmplitudeToDB로 로그 변환 후 global standard scaling 적용 **
    """
    stft_transform = torchaudio.transforms.Spectrogram(
        n_fft=n_fft,
        win_length=win_length,
        hop_length=hop_length,
        power=power
    )
    wav_data, _ = torchaudio.load(wav_path)
    spec = stft_transform(wav_data)
    amp_to_db = torchaudio.transforms.AmplitudeToDB()
    log_spec = amp_to_db(spec)

    # 전체 데이터셋의 평균과 표준편차로 정규화 적용
    log_spec = (log_spec - GLOBAL_MEAN) / (GLOBAL_STD + 1e-9)
    return log_spec


def augment_spec(spec: torch.Tensor) -> torch.Tensor:
    max_shift = int(spec.shape[-1] * 0.1)
    shift = torch.randint(-max_shift, max_shift + 1, (1,)).item()
    spec = torch.roll(spec, shifts=shift, dims=-1)
    time_mask_param = max(1, int(spec.shape[-1] * 0.05))
    time_mask = torchaudio.transforms.TimeMasking(time_mask_param=time_mask_param)
    spec = time_mask(spec)
    freq_mask_param = max(1, int(spec.shape[-2] * 0.05))
    freq_mask = torchaudio.transforms.FrequencyMasking(freq_mask_param=freq_mask_param)
    spec = freq_mask(spec)
    return spec

#######################
# 3. Dataset
#######################
class BaselineDataLoader(Dataset):
    def __init__(
        self,
        file_list: List[str],
        sr: int,
        n_fft: int,
        win_length: int,
        hop_length: int,
        power: float,
        augment: bool = False
    ) -> None:
        self.file_list = file_list
        self.sr = sr
        self.n_fft = n_fft
        self.win_length = win_length
        self.hop_length = hop_length
        self.power = power
        self.augment = augment

    def __len__(self) -> int:
        return len(self.file_list)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int, int, int]:
        wav_path = self.file_list[idx]
        spec = wav_to_log_stft(wav_path, self.sr, self.n_fft, self.win_length, self.hop_length, self.power)
        if self.augment:
            spec = augment_spec(spec)
        anomaly_label = get_anomaly_label(wav_path)
        drone_label = get_drone_label(wav_path)
        direction_label = get_direction_label(wav_path)
        return spec, anomaly_label, drone_label, direction_label

## 모델 학습 및 평가

In [None]:
!pip install pytorch-msssim

Collecting pytorch-msssim
  Using cached pytorch_msssim-1.0.0-py3-none-any.whl.metadata (8.0 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->pytorch-msssim)
  Using cached nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->pytorch-msssim)
  Using cached nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->pytorch-msssim)
  Using cached nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch->pytorch-msssim)
  Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch->pytorch-msssim)
  Using cached nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch->pytor

## 평가 데이터 바꿔서 모델 가중치 불러오기

In [6]:
# ----------------------------------------------------------------
# 기본 GMSD Loss 구현
# ----------------------------------------------------------------
class GMSDLoss(nn.Module):
    def __init__(self, T: float = 0.0026):
        super(GMSDLoss, self).__init__()
        self.T = T
        weight_x = torch.tensor([[-1, 0, 1],
                                 [-2, 0, 2],
                                 [-1, 0, 1]], dtype=torch.float32).view(1, 1, 3, 3)
        weight_y = torch.tensor([[-1, -2, -1],
                                 [0, 0, 0],
                                 [1, 2, 1]], dtype=torch.float32).view(1, 1, 3, 3)
        self.register_buffer('weight_x', weight_x)
        self.register_buffer('weight_y', weight_y)

    def forward(self, img1: torch.Tensor, img2: torch.Tensor) -> torch.Tensor:
        weight_x = self.weight_x.to(dtype=img1.dtype, device=img1.device)
        weight_y = self.weight_y.to(dtype=img1.dtype, device=img1.device)
        grad1_x = F.conv2d(img1, weight_x, padding=1)
        grad1_y = F.conv2d(img1, weight_y, padding=1)
        grad2_x = F.conv2d(img2, weight_x, padding=1)
        grad2_y = F.conv2d(img2, weight_y, padding=1)
        grad1 = torch.sqrt(grad1_x ** 2 + grad1_y ** 2)
        grad2 = torch.sqrt(grad2_x ** 2 + grad2_y ** 2)
        eps = 1e-5
        denom = grad1 ** 2 + grad2 ** 2 + self.T
        denom = torch.clamp(denom, min=eps)
        gms = (2 * grad1 * grad2 + self.T) / denom
        gmsd = torch.std(gms, unbiased=False)
        return gmsd

# ----------------------------------------------------------------
# Multi-Scale GMSD Loss (MS-GMSD)
# ----------------------------------------------------------------
class MultiScaleGMSDLoss(nn.Module):
    def __init__(self, scales=4, T: float = 0.0026):
        super(MultiScaleGMSDLoss, self).__init__()
        self.scales = scales
        self.gmsd_loss = GMSDLoss(T=T)

    def forward(self, img1, img2):
        gmsd_vals = []
        for scale in range(self.scales):
            gmsd_val = self.gmsd_loss(img1, img2)
            gmsd_vals.append(gmsd_val)
            if scale < self.scales - 1:
                img1 = F.interpolate(img1, scale_factor=0.5, mode='bilinear', align_corners=False)
                img2 = F.interpolate(img2, scale_factor=0.5, mode='bilinear', align_corners=False)
        return sum(gmsd_vals) / len(gmsd_vals)

In [18]:
# ---------------------------
# 환경 설정 및 파라미터 정의
# ---------------------------

import os
import csv
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader
from sklearn import metrics

class Args:
    eval_dir = "/content/ICSV31AIChallengeDataset/eval"
    result_dir = "/content/drive/MyDrive"
    model_dir = "/content/drive/MyDrive"
    model_path = "/content/model_dae_0423.pth"
    batch_size = 1
    gpu = 0
    n_workers = 1
    sr = 16000
    n_fft = 2048
    win_length = 768
    hop_length = 192
    power = 2.0
    dropout = 0.0

args = Args()

# ---------------------------
# 기본 함수 및 유틸
# ---------------------------

def set_seed(seed: int):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def get_anomaly_label(wav_path):
    return 0 if "normal" in os.path.basename(wav_path).lower() else 1

def match_size(source, target):
    src_h, src_w = source.size(2), source.size(3)
    tgt_h, tgt_w = target.size(2), target.size(3)
    if src_h > tgt_h or src_w > tgt_w:
        start_h = (src_h - tgt_h) // 2
        start_w = (src_w - tgt_w) // 2
        source = source[:, :, start_h:start_h+tgt_h, start_w:start_w+tgt_w]
    elif src_h < tgt_h or src_w < tgt_w:
        diff_h, diff_w = tgt_h - src_h, tgt_w - src_w
        source = F.pad(source, (diff_w // 2, diff_w - diff_w // 2,
                                diff_h // 2, diff_h - diff_h // 2))
    return source

# ---------------------------
# 손실 함수 구현
# ---------------------------

class GMSDLoss(nn.Module):
    def __init__(self, T: float = 0.0026):
        super().__init__()
        weight_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32).view(1, 1, 3, 3)
        weight_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32).view(1, 1, 3, 3)
        self.register_buffer('weight_x', weight_x)
        self.register_buffer('weight_y', weight_y)
        self.T = T

    def forward(self, img1, img2):
        grad1_x = F.conv2d(img1, self.weight_x, padding=1)
        grad1_y = F.conv2d(img1, self.weight_y, padding=1)
        grad2_x = F.conv2d(img2, self.weight_x, padding=1)
        grad2_y = F.conv2d(img2, self.weight_y, padding=1)
        grad1 = torch.sqrt(grad1_x**2 + grad1_y**2)
        grad2 = torch.sqrt(grad2_x**2 + grad2_y**2)
        gms = (2 * grad1 * grad2 + self.T) / (grad1**2 + grad2**2 + self.T + 1e-5)
        return torch.std(gms, unbiased=False)

class MultiScaleGMSDLoss(nn.Module):
    def __init__(self, scales=4, T: float = 0.0026):
        super().__init__()
        self.gmsd = GMSDLoss(T=T)
        self.scales = scales

    def forward(self, x, y):
        scores = []
        for _ in range(self.scales):
            scores.append(self.gmsd(x, y))
            if _ < self.scales - 1:
                x = F.interpolate(x, scale_factor=0.5, mode='bilinear', align_corners=False)
                y = F.interpolate(y, scale_factor=0.5, mode='bilinear', align_corners=False)
        return sum(scores) / len(scores)


class CombinedLoss(nn.Module):
    def __init__(self, loss1, loss2, alpha=0.2):
        super().__init__()
        self.loss1 = loss1
        self.loss2 = loss2
        self.alpha = alpha

    def forward(self, x, y):
        return self.alpha * self.loss1(x, y) + (1 - self.alpha) * self.loss2(x, y)

def get_loss_functions():
    return {
        "GMSD+L1": CombinedLoss(GMSDLoss(), nn.L1Loss()),
        "MS-GMSD+L1": CombinedLoss(MultiScaleGMSDLoss(), nn.L1Loss())
    }

# ---------------------------
# 데이터셋 정의
# ---------------------------

GLOBAL_MEAN = -4.636630590752749
GLOBAL_STD = 12.173359870910645

def wav_to_log_stft(wav_path, sr, n_fft, win_length, hop_length, power):
    wav, _ = torchaudio.load(wav_path)
    spec = torchaudio.transforms.Spectrogram(n_fft=n_fft, win_length=win_length,
                                             hop_length=hop_length, power=power)(wav)
    log_spec = torchaudio.transforms.AmplitudeToDB()(spec)
    return (log_spec - GLOBAL_MEAN) / (GLOBAL_STD + 1e-9)

class BaselineDataset(Dataset):
    def __init__(self, file_list):
        self.file_list = file_list

    def __len__(self): return len(self.file_list)

    def __getitem__(self, idx):
        path = self.file_list[idx]
        spec = wav_to_log_stft(path, args.sr, args.n_fft, args.win_length, args.hop_length, args.power)
        label = get_anomaly_label(path)
        return spec, label

def get_eval_loader():
    file_list = sorted([os.path.join(args.eval_dir, f) for f in os.listdir(args.eval_dir)])
    dataset = BaselineDataset(file_list)
    return DataLoader(dataset, batch_size=1, shuffle=False, num_workers=args.n_workers), file_list

#############################
# Utility: 크기 맞춤 함수
#############################
def match_size(source: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
    src_h, src_w = source.size(2), source.size(3)
    tgt_h, tgt_w = target.size(2), target.size(3)
    if src_h > tgt_h or src_w > tgt_w:
        start_h = (src_h - tgt_h) // 2
        start_w = (src_w - tgt_w) // 2
        source = source[:, :, start_h:start_h+tgt_h, start_w:start_w+tgt_w]
    elif src_h < tgt_h or src_w < tgt_w:
        diff_h = tgt_h - src_h
        diff_w = tgt_w - src_w
        source = F.pad(source, (diff_w // 2, diff_w - diff_w // 2,
                                diff_h // 2, diff_h - diff_h // 2))
    return source

#############################
# Model Architecture (DAE)
#############################
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, dropout=0.03):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.LeakyReLU(0.2, inplace=True)
        self.dropout = nn.Dropout(dropout) if dropout > 0 else nn.Identity()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.downsample = nn.Identity()

    def forward(self, x):
        identity = self.downsample(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.03):
        super(EncoderBlock, self).__init__()
        self.resblock = ResidualBlock(in_channels, out_channels, stride=2, dropout=dropout)

    def forward(self, x):
        return self.resblock(x)

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.03):
        super(DecoderBlock, self).__init__()
        self.deconv = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=3,
                                         stride=2, padding=1, output_padding=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.LeakyReLU(0.2, inplace=True)
        self.dropout = nn.Dropout(dropout) if dropout > 0 else nn.Identity()
        self.resblock = ResidualBlock(out_channels * 2, out_channels, stride=1, dropout=dropout)

    def forward(self, x, skip):
        x = self.deconv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.dropout(x)
        skip = match_size(skip, x)
        x = torch.cat([x, skip], dim=1)
        x = self.resblock(x)
        return x

class DenoisingAutoencoder(nn.Module):
    def __init__(self, input_channels=1, dropout=0.03):
        super(DenoisingAutoencoder, self).__init__()
        self.enc1 = EncoderBlock(input_channels, 32, dropout)
        self.enc2 = EncoderBlock(32, 64, dropout)
        self.enc3 = EncoderBlock(64, 128, dropout)
        self.enc4 = EncoderBlock(128, 256, dropout)
        self.enc5 = EncoderBlock(256, 512, dropout)
        self.dec5 = DecoderBlock(512, 256, dropout)
        self.dec4 = DecoderBlock(256, 128, dropout)
        self.dec3 = DecoderBlock(128, 64, dropout)
        self.dec2 = DecoderBlock(64, 32, dropout)
        self.dec1 = nn.ConvTranspose2d(32, input_channels, kernel_size=3, stride=2,
                                       padding=1, output_padding=1)

    def forward(self, x):
        e1 = self.enc1(x)   # (B, 32, H/2, W/2)
        e2 = self.enc2(e1)  # (B, 64, H/4, W/4)
        e3 = self.enc3(e2)  # (B, 128, H/8, W/8)
        e4 = self.enc4(e3)  # (B, 256, H/16, W/16)
        e5 = self.enc5(e4)  # (B, 512, H/32, W/32)
        d5 = self.dec5(e5, e4)
        d4 = self.dec4(d5, e3)
        d3 = self.dec3(d4, e2)
        d2 = self.dec2(d3, e1)
        d1 = self.dec1(d2)
        d1 = F.interpolate(d1, size=x.shape[2:], mode="bilinear", align_corners=False)
        return d1

def DAEModel(dropout) -> nn.Module:
    return DenoisingAutoencoder(input_channels=1, dropout=dropout)

# ---------------------------
# 평가 함수
# ---------------------------

def evaluate_model_gridsearch(model, loader, loss_fns, device):
    model.eval()
    results, loss_outputs = {}, {}
    for name, fn in loss_fns.items():
        y_true, y_scores, per_file = [], [], []
        for i, (spec, label) in enumerate(loader):
            spec = spec.to(device)
            with torch.no_grad():
                out = model(spec)
                score = fn(out, spec).item()
            y_true.append(label.item())
            y_scores.append(score)
            per_file.append((label.item(), score, i))
        auc = metrics.roc_auc_score(np.array(y_true), np.array(y_scores))
        print(f"{name}: ROC AUC = {auc:.4f}")
        results[name] = auc
        loss_outputs[name] = per_file
    return results, loss_outputs

def save_best_scores(best_name, outputs, file_list):
    path = os.path.join(args.result_dir, f"anomaly_scores_best_{best_name}.csv")
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["File Name", "True Label", "Score"])
        for label, score, idx in outputs[best_name]:
            name = os.path.splitext(os.path.basename(file_list[idx]))[0]
            writer.writerow([name, label, score])
    print("저장 완료:", path)

# ---------------------------
# 전체 실행
# ---------------------------

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
set_seed(2025)

model = DAEModel(dropout=args.dropout).to(device)
state = torch.load(os.path.join(args.model_dir, args.model_path), map_location=device)
model.load_state_dict(state)

loader, file_list = get_eval_loader()
loss_fns = get_loss_functions()
loss_fns = {k: v.to(device) for k, v in loss_fns.items()}

results, outputs = evaluate_model_gridsearch(model, loader, loss_fns, device)
best_name = max(results, key=results.get)
save_best_scores(best_name, outputs, file_list)


GMSD+L1: ROC AUC = 0.8550
MS-GMSD+L1: ROC AUC = 0.8563
저장 완료: /content/drive/MyDrive/anomaly_scores_best_MS-GMSD+L1.csv
