<a href="https://colab.research.google.com/github/OverfitSurvivor/OverfitSurvivor/blob/main/%EB%8C%80%ED%9A%8C_for_anomaly%EC%B0%BE%EA%B8%B0_ipynb%EC%9D%98_%EC%82%AC%EB%B3%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import zipfile

zip_path = "/content/drive/MyDrive/ICSV31AIChallengeDataset.zip"  # 업로드한 ZIP 파일 경로
extract_path = "/content/ICSV31AIChallengeDataset"  # 압축을 풀 폴더 경로

# 폴더가 없으면 생성
os.makedirs(extract_path, exist_ok=True)

# 압축 해제
with zipfile.ZipFile(zip_path, "r") as zip_ref:
    zip_ref.extractall(extract_path)

print("압축 해제 완료:", extract_path)

압축 해제 완료: /content/ICSV31AIChallengeDataset


In [None]:
import os
import re
import librosa
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
import os
import re
import numpy as np
import librosa
from sklearn.preprocessing import MinMaxScaler

# 파일명에서 anomaly 여부 탐지
pattern = re.compile(r"(eval|train)_([ABC])_[^_]+_(normal|anomaly)_")

def get_file_info(file_name):
    match = pattern.search(file_name)
    if match:
        data_type, drone_type, status = match.groups()  # train/eval, A/B/C, normal/anomaly
        return data_type, drone_type, status
    return None, None, None

# 특징 추출 함수
def extract_features(file_path, sr=16000, n_fft=2048, hop_length=256, scaler=None):
    y, sr = librosa.load(file_path, sr=sr)
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    stft_mag = np.abs(stft)
    log_stft = librosa.amplitude_to_db(stft_mag, ref=np.max)

    # 정규화 (훈련 데이터에 대해서만 fit)
    # If scaler is provided, use it to transform
    if scaler:
        log_stft = scaler.transform(log_stft)
    # Otherwise, return the original data without scaling

    return log_stft[np.newaxis, :, :], scaler  # (1, Freq, Time)

# 데이터 로드 함수 (Normal / Anomaly 구분)
def load_audio_data(folder_path, is_train=True):
    files = [f for f in os.listdir(folder_path) if f.endswith(".wav")]
    data = []
    labels = []

    # Initialize the scaler outside the loop for training data
    scaler = MinMaxScaler() if is_train else None

    # Fit the scaler on all training data if is_train
    if is_train:
        all_training_data = []  # Collect all training data
        for f in files:
            data_type, drone_type, status = get_file_info(f)
            if data_type is None:
                continue

            features, _ = extract_features(os.path.join(folder_path, f))  # Don't scale during collection
            all_training_data.extend(features)

        all_training_data = np.concatenate(all_training_data, axis=0)  # Combine into one array
        scaler.fit(all_training_data.reshape(-1, all_training_data.shape[-1]))  # Fit on the combined data

    # Apply feature extraction and scaling for each file.
    for f in files:
        data_type, drone_type, status = get_file_info(f)
        if data_type is None:
            continue

        features, _ = extract_features(os.path.join(folder_path, f), scaler=scaler)
        # Pass the fitted scaler if available
        data.append(features)
        labels.append(0 if status == "normal" else 1)  # Normal: 0, Anomaly: 1

    return np.array(data, dtype=np.float32), np.array(labels, dtype=np.int64), scaler

# 데이터 로드
train_dir = "/content/ICSV31AIChallengeDataset/train"
eval_dir = "/content/ICSV31AIChallengeDataset/eval"

X_train, y_train, scaler = load_audio_data(train_dir, is_train=True)
X_eval, y_eval, _ = load_audio_data(eval_dir, is_train=False)  # 같은 scaler 적용

print(f"Train Data Shape: {X_train.shape}, Labels: {np.unique(y_train, return_counts=True)}")
print(f"Eval Data Shape: {X_eval.shape}, Labels: {np.unique(y_eval, return_counts=True)}")

Train Data Shape: (5400, 1, 1025, 126), Labels: (array([0]), array([5400]))
Eval Data Shape: (1080, 1, 1025, 126), Labels: (array([0, 1]), array([540, 540]))


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# PyTorch Tensor 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_eval_tensor = torch.tensor(X_eval, dtype=torch.float32)

# Dataset 및 DataLoader 정의
train_dataset = TensorDataset(X_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
eval_dataset = TensorDataset(X_eval_tensor)
eval_loader = DataLoader(eval_dataset, batch_size=8, shuffle=False)

class VariationalAutoencoder(nn.Module):
    def __init__(self, input_channels, latent_dim=32):
        super(VariationalAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1),  # 1025x126 -> 513x63
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # 513x63 -> 257x32
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),  # 257x32 -> 129x16
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),  # 129x16 -> 65x8
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)  # 65x8 -> 33x4
        )

        # Encoder Output 크기 자동 계산
        sample_input = torch.zeros(1, input_channels, 1025, 126)
        encoded_sample = self.encoder(sample_input)
        self.flatten_dim = encoded_sample.numel()
        self.encoded_shape = encoded_sample.shape[1:]

        print(f"[DEBUG] Flattened dimension: {self.flatten_dim}, Shape: {self.encoded_shape}")

        # Latent Space
        self.mu_layer = nn.Linear(self.flatten_dim, latent_dim)
        self.logvar_layer = nn.Linear(self.flatten_dim, latent_dim)

        # Decoder Fully Connected Layer
        self.decoder_fc = nn.Linear(latent_dim, self.flatten_dim)

        # Decoder
        self.decoder_conv1 = nn.ConvTranspose2d(512, 256, kernel_size=3, stride=1, padding=1)
        self.decoder_conv2 = nn.ConvTranspose2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.decoder_conv3 = nn.ConvTranspose2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.decoder_conv4 = nn.ConvTranspose2d(64, 32, kernel_size=3, stride=1, padding=1)
        self.decoder_conv5 = nn.ConvTranspose2d(32, input_channels, kernel_size=3, stride=1, padding=1)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        print(f"[DEBUG] Input Shape: {x.shape}")

        # Encoder
        encoded = self.encoder(x)
        batch_size = encoded.shape[0]
        print(f"[DEBUG] Encoded Shape: {encoded.shape}")

        # Flatten 처리
        encoded_flat = encoded.view(batch_size, -1)
        print(f"[DEBUG] Flattened Shape: {encoded_flat.shape}")

        # Latent Representation
        mu = self.mu_layer(encoded_flat)
        logvar = self.logvar_layer(encoded_flat)
        print(f"[DEBUG] mu Shape: {mu.shape}, logvar Shape: {logvar.shape}")

        # Latent Space Sampling
        z = self.reparameterize(mu, logvar)
        print(f"[DEBUG] z Shape: {z.shape}")

        # Decoder 입력 크기 조정
        z_expanded = self.decoder_fc(z)
        z_expanded = z_expanded.view(batch_size, *self.encoded_shape)
        print(f"[DEBUG] Decoded FC Reshaped: {z_expanded.shape}")

        # Decoder 진행 (ConvTranspose2d -> interpolate 활용)
        decoded = self.decoder_conv1(z_expanded)
        print(f"[DEBUG] After decoder_conv1: {decoded.shape}")
        decoded = F.interpolate(decoded, scale_factor=2, mode="bilinear", align_corners=False)

        decoded = self.decoder_conv2(decoded)
        print(f"[DEBUG] After decoder_conv2: {decoded.shape}")
        decoded = F.interpolate(decoded, scale_factor=2, mode="bilinear", align_corners=False)

        decoded = self.decoder_conv3(decoded)
        print(f"[DEBUG] After decoder_conv3: {decoded.shape}")
        decoded = F.interpolate(decoded, scale_factor=2, mode="bilinear", align_corners=False)

        decoded = self.decoder_conv4(decoded)
        print(f"[DEBUG] After decoder_conv4: {decoded.shape}")
        decoded = F.interpolate(decoded, scale_factor=2, mode="bilinear", align_corners=False)

        decoded = self.decoder_conv5(decoded)
        print(f"[DEBUG] After decoder_conv5: {decoded.shape}")

        decoded = F.interpolate(decoded, size=(1025, 126), mode="bilinear", align_corners=False)
        print(f"[DEBUG] Final Output Shape: {decoded.shape}")

        return z, decoded, mu, logvar


# 모델 학습 준비
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VariationalAutoencoder(input_channels=X_train.shape[1]).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# VAE 손실 함수 (Reconstruction Loss + KL Divergence 조정)
def vae_loss(decoded, x, mu, logvar):
    recon_loss = nn.MSELoss()(decoded, x)
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / x.shape[0]
    return recon_loss + 0.01 * kl_div  # KL 가중치 줄여서 안정화

# 모델 학습 루프
for epoch in range(10):
    model.train()
    epoch_loss = 0

    for i, (inputs,) in enumerate(train_loader):
        inputs = inputs.to(device)

        # Forward Pass
        _, decoded, mu, logvar = model(inputs)
        loss = vae_loss(decoded, inputs, mu, logvar)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        # GPU 메모리 최적화
        if i % 50 == 0:
            torch.cuda.empty_cache()

    print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(train_loader):.6f}")


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
[DEBUG] Decoded FC Reshaped: torch.Size([8, 512, 33, 4])
[DEBUG] After decoder_conv1: torch.Size([8, 256, 33, 4])
[DEBUG] After decoder_conv2: torch.Size([8, 128, 66, 8])
[DEBUG] After decoder_conv3: torch.Size([8, 64, 132, 16])
[DEBUG] After decoder_conv4: torch.Size([8, 32, 264, 32])
[DEBUG] After decoder_conv5: torch.Size([8, 1, 528, 64])
[DEBUG] Final Output Shape: torch.Size([8, 1, 1025, 126])
[DEBUG] Input Shape: torch.Size([8, 1, 1025, 126])
[DEBUG] Encoded Shape: torch.Size([8, 512, 33, 4])
[DEBUG] Flattened Shape: torch.Size([8, 67584])
[DEBUG] mu Shape: torch.Size([8, 32]), logvar Shape: torch.Size([8, 32])
[DEBUG] z Shape: torch.Size([8, 32])
[DEBUG] Decoded FC Reshaped: torch.Size([8, 512, 33, 4])
[DEBUG] After decoder_conv1: torch.Size([8, 256, 33, 4])
[DEBUG] After decoder_conv2: torch.Size([8, 128, 66, 8])
[DEBUG] After decoder_conv3: torch.Size([8, 64, 132, 16])
[DEBUG] After decoder_conv4: torch.Size([8, 32, 264, 32])
[

In [5]:
import torch
import numpy as np
from sklearn.metrics import roc_curve, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# ================================
# 1. 모델 평가 및 Reconstruction Error(Log-MSE) 계산
# ================================
model.eval()
reconstruction_errors = []

with torch.no_grad():
    for inputs, in eval_loader:  # 라벨 제거
        inputs = inputs.to(device)

        # AutoEncoder 복원 (Reconstruction)
        _, decoded, _, _ = model(inputs) # model 출력 변경

        # Log-MSE Reconstruction Error 계산
        log_mse = torch.mean((torch.log1p(inputs) - torch.log1p(decoded)) ** 2, dim=[1, 2, 3])

        reconstruction_errors.extend(log_mse.cpu().numpy())

# ================================
# 2. 최적 Threshold 찾기 (ROC Curve)
# ================================
fpr, tpr, thresholds = roc_curve(y_eval, reconstruction_errors)

# Youden's J Statistic (TPR - FPR) 활용
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]

print(f"Optimal Threshold: {optimal_threshold:.6f}")

# ================================
# 3. 이상 탐지 수행
# ================================
# 최적 Threshold를 기반으로 정상(0) vs 이상(1) 분류
y_pred = [1 if err > optimal_threshold else 0 for err in reconstruction_errors]

# ================================
# 4. 평가 및 시각화
# ================================
# Classification Report 출력
print("\nClassification Report:\n", classification_report(y_eval, y_pred))

# Confusion Matrix 시각화
cm = confusion_matrix(y_eval, y_pred)
plt.figure(figsize=(5,4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Normal", "Anomaly"], yticklabels=["Normal", "Anomaly"])
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix with Log-MSE")
plt.show()

ValueError: not enough values to unpack (expected 2, got 1)