# 초기 설정

In [24]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Wed Nov 20 20:10:59 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   77C    P0              34W /  70W |    187MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [25]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 54.8 gigabytes of available RAM

You are using a high-RAM runtime!


In [26]:
# Colab 셀에 입력하여 실행
!pip install librosa h5py optuna



# 라이브러리 임포트

In [27]:
# ----------------------------- 라이브러리 임포트 -----------------------------
import os
import json
import numpy as np
import librosa
from tqdm import tqdm
import h5py

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    classification_report,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.utils.class_weight import compute_class_weight

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.cuda.amp  # 혼합 정밀도 학습을 위한 torch.cuda.amp 추가

import random
import re
import gc
from typing import List, Tuple, Dict, Any

# ----------------------------- 환경 설정 -----------------------------
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# 환경설정, 전처리 데이터 불러오기

In [28]:
# ----------------------------- 환경 설정 -----------------------------
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

# 일반 설정
BATCH_SIZE = 512  # 배치 크기
EPOCHS = 200  # 에포크 수
LEARNING_RATE = 1e-3  # 학습률

# 데이터 관련 설정 (Google Drive 내 경로로 변경)
DATA_DIR = "/content/drive/MyDrive/Sonus/Feature-based"  # 데이터 디렉토리
METADATA_FILE = os.path.join(DATA_DIR, "metadata.json")  # 메타데이터 파일 경로
HDF5_FILE = os.path.join(DATA_DIR, "preprocessed_data.h5")  # 전처리된 데이터 저장 경로
N_MFCC = 40  # MFCC 계수의 수
MAX_LEN = 174  # MFCC 벡터의 최대 길이

# 모델 및 로그 디렉토리 설정 (Google Drive 내 경로로 변경)
MODELS_DIR = os.path.join(DATA_DIR, "models")  # 모델 저장 디렉토리
os.makedirs(MODELS_DIR, exist_ok=True)
LOGS_DIR = os.path.join(DATA_DIR, "logs")  # 로그 디렉토리
os.makedirs(LOGS_DIR, exist_ok=True)

# 하이퍼파라미터 튜닝 설정
APPLY_HYPERPARAMETER_TUNING = False  # 하이퍼파라미터 튜닝 적용 여부

# ----------------------------- GPU 및 CPU 설정 -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용 중인 디바이스: {device}\n")

# CPU 사용 제한 설정 (필요에 따라 조정 가능)
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["OPENBLAS_NUM_THREADS"] = "4"
os.environ["MKL_NUM_THREADS"] = "4"
os.environ["VECLIB_MAXIMUM_THREADS"] = "4"
os.environ["NUMEXPR_NUM_THREADS"] = "4"
torch.set_num_threads(4)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
사용 중인 디바이스: cuda



# 함수 모음

In [None]:
# ----------------------------- 전처리 함수 정의 -----------------------------
def sanitize_name(name: str) -> str:
    """
    문자열에서 알파벳, 숫자, 언더스코어만 남기고 나머지는 제거합니다.

    Args:
        name (str): 원본 문자열.

    Returns:
        str: 정제된 문자열.
    """
    return re.sub(r"[^a-zA-Z0-9_]", "", name.replace(" ", "_"))

def load_metadata(metadata_path: str) -> List[Dict[str, Any]]:
    """
    메타데이터를 로드합니다.

    Args:
        metadata_path (str): 메타데이터 파일의 경로.

    Returns:
        List[Dict[str, Any]]: 메타데이터 리스트.
    """
    metadata = []
    with open(metadata_path, "r") as f:
        for line in f:
            metadata.append(json.loads(line))
    return metadata

def extract_mfcc(file_path: str, n_mfcc: int = N_MFCC, max_len: int = MAX_LEN) -> np.ndarray:
    """
    오디오 파일에서 MFCC 특징을 추출합니다.

    Args:
        file_path (str): 오디오 파일의 경로.
        n_mfcc (int): 추출할 MFCC 계수의 수.
        max_len (int): MFCC 벡터의 최대 길이.

    Returns:
        np.ndarray: 고정된 크기의 MFCC 배열.
    """
    try:
        y, sr = librosa.load(file_path, sr=None, mono=True)
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        if mfcc.shape[1] < max_len:
            pad_width = max_len - mfcc.shape[1]
            mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode="constant")
        else:
            mfcc = mfcc[:, :max_len]
        return mfcc
    except Exception as e:
        print(f"MFCC 추출 실패: {file_path}, 에러: {e}")
        return np.zeros((n_mfcc, max_len))

def preprocess_and_save_data(
    metadata: List[Dict[str, Any]], data_dir: str, hdf5_path: str
) -> None:
    """
    데이터 전처리 및 HDF5 파일로 저장합니다.

    Args:
        metadata (List[Dict[str, Any]]): 메타데이터 리스트.
        data_dir (str): 오디오 데이터가 저장된 디렉토리.
        hdf5_path (str): 저장할 HDF5 파일의 경로.
    """
    total_samples = len(metadata)
    with h5py.File(hdf5_path, "w") as h5f:
        X_ds = h5f.create_dataset(
            "X", shape=(total_samples, N_MFCC, MAX_LEN), dtype=np.float32
        )
        Y_list = []
        for idx, data in enumerate(tqdm(metadata, desc="전처리 중")):
            sample_path = os.path.join(
                data_dir, data["relative_path"], data["sample_name"]
            )
            mfcc = extract_mfcc(sample_path)
            X_ds[idx] = mfcc
            instruments = data["instruments"]
            Y_list.append(instruments)
        Y_encoded_strings = [json.dumps(instr_list) for instr_list in Y_list]
        dt = h5py.string_dtype(encoding="utf-8")
        h5f.create_dataset(
            "Y", data=np.array(Y_encoded_strings, dtype=object), dtype=dt
        )

def load_preprocessed_data(hdf5_path: str) -> Tuple[np.ndarray, np.ndarray]:
    """
    전처리된 데이터를 HDF5 파일에서 로드합니다.

    Args:
        hdf5_path (str): HDF5 파일의 경로.

    Returns:
        Tuple[np.ndarray, np.ndarray]: 특징 배열 X와 레이블 리스트 Y.
    """
    with h5py.File(hdf5_path, "r") as h5f:
        X = h5f["X"][:]
        Y = h5f["Y"][:]
    return X, Y

def encode_labels(Y: np.ndarray) -> Tuple[np.ndarray, MultiLabelBinarizer, List[str]]:
    """
    레이블을 이진 벡터로 인코딩합니다.

    Args:
        Y (np.ndarray): 레이블 리스트 (JSON-encoded strings).

    Returns:
        Tuple[np.ndarray, MultiLabelBinarizer, List[str]]: 인코딩된 레이블 배열, 레이블 변환기, 모든 악기 리스트.
    """
    all_instruments = set()
    parsed_Y = []
    for instruments in Y:
        if isinstance(instruments, bytes):
            instruments = instruments.decode("utf-8")
        if isinstance(instruments, str):
            try:
                instruments = json.loads(instruments)
            except json.JSONDecodeError:
                instruments = instruments.split(",")
        if not isinstance(instruments, list):
            instruments = []
        instruments = [instr.strip() for instr in instruments if instr.strip()]
        all_instruments.update(instruments)
        parsed_Y.append(instruments)
    all_instruments = sorted(list(all_instruments))

    if not all_instruments:
        print("경고: 모든 악기 리스트가 비어 있습니다.")

    mlb = MultiLabelBinarizer(classes=all_instruments)
    Y_encoded = mlb.fit_transform(parsed_Y)
    return Y_encoded, mlb, all_instruments

# ----------------------------- 데이터셋 및 데이터로더 클래스 정의 -----------------------------
class MFCCDataset(Dataset):
    """
    MFCC 데이터를 위한 PyTorch Dataset 클래스
    """

    def __init__(self, X: np.ndarray, Y: np.ndarray, augment: bool = False):
        """
        초기화 함수

        Args:
            X (np.ndarray): 특징 데이터.
            Y (np.ndarray): 레이블 데이터.
            augment (bool): 데이터 증강 여부.
        """
        self.X = X
        self.Y = Y
        self.augment = augment

    def __len__(self) -> int:
        """
        데이터셋의 길이를 반환합니다.

        Returns:
            int: 데이터셋의 길이.
        """
        return len(self.Y)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        x = self.X[idx]
        y = self.Y[idx]

        # 데이터 전처리: 차원 변경 및 채널 추가
        x = np.expand_dims(x, axis=0)  # (1, height, width)
        x = torch.from_numpy(x).float()

        if self.augment:
            x = augment_batch(x)

        mean = x.mean()
        std = x.std()
        x = (x - mean) / (std + 1e-6)

        y = torch.tensor(y, dtype=torch.float32)

        return x, y


def augment_batch(batch_X: torch.Tensor) -> torch.Tensor:
    """
    데이터 증강을 수행합니다.

    Args:
        batch_X (torch.Tensor): 배치의 MFCC 데이터. (batch_size, height, width)

    Returns:
        torch.Tensor: 증강된 배치의 MFCC 데이터.
    """
    # 랜덤 스케일링
    scale = random.uniform(0.8, 1.2)
    batch_X = batch_X * scale
        
    # 잡음 추가
    noise = torch.randn_like(batch_X) * 0.05
    batch_X = batch_X + noise

    # 주파수 마스킹
    freq_masking = random.randint(0, batch_X.shape[2] // 10)  # width
    if freq_masking > 0:
        f0 = random.randint(0, batch_X.shape[2] - freq_masking - 1)
        batch_X[:, :, f0 : f0 + freq_masking] = 0

    # 시간 마스킹
    time_masking = random.randint(0, batch_X.shape[1] // 10)  # height
    if time_masking > 0:
        t0 = random.randint(0, batch_X.shape[1] - time_masking - 1)
        batch_X[:, t0 : t0 + time_masking, :] = 0

    return batch_X


def create_data_loaders(
    X_train, Y_train, X_val, Y_val, X_test, Y_test
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    """
    데이터로더를 생성합니다.

    Args:
        X_train, Y_train, X_val, Y_val, X_test, Y_test: 학습, 검증, 테스트 데이터와 레이블

    Returns:
        Tuple[DataLoader, DataLoader, DataLoader]: 학습, 검증, 테스트 데이터로더
    """
    train_dataset = MFCCDataset(X_train, Y_train, augment=True)
    val_dataset = MFCCDataset(X_val, Y_val, augment=False)
    test_dataset = MFCCDataset(X_test, Y_test, augment=False)

    train_loader = DataLoader(
        train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )
    test_loader = DataLoader(
        test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )

    return train_loader, val_loader, test_loader

# ----------------------------- 모델 정의 -----------------------------
def calculate_padding(kernel_size: int, stride: int, input_size: int) -> int:
    """
    패딩 크기를 계산합니다.

    Args:
        kernel_size (int): 커널 크기.
        stride (int): 스트라이드 크기.
        input_size (int): 입력 크기.

    Returns:
        int: 패딩 크기.
    """
    return (kernel_size - stride) // 2

class CNNModel(nn.Module):
    """
    CNN 모델 정의 클래스
    """

    def __init__(
        self,
        input_shape: Tuple[int, int, int],
        num_classes: int,
        dropout_rate: float = 0.5,
        l2_reg: float = 0.001,
    ):
        """
        모델을 초기화합니다.

        Args:
            input_shape (Tuple[int, int, int]): 입력 데이터의 형태.
            num_classes (int): 출력 클래스의 수.
            dropout_rate (float): 드롭아웃 비율.
            l2_reg (float): L2 정규화 계수.
        """
        super(CNNModel, self).__init__()
        _, input_height, input_width = input_shape

        # 패딩 계산
        self.padding1 = calculate_padding(3, 1, input_height)
        self.padding2 = calculate_padding(3, 1, input_width)

        self.quant = torch.quantization.QuantStub()  # QuantStub 추가
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), padding=(self.padding1, self.padding2))
        self.pool1 = nn.MaxPool2d((2, 2))
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), padding=(self.padding1, self.padding2))
        self.pool2 = nn.MaxPool2d((2, 2))
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(self.padding1, self.padding2))
        self.pool3 = nn.MaxPool2d((2, 2))
        self.bn3 = nn.BatchNorm2d(128)

        conv_output_size = self._get_conv_output((1, input_height, input_width))

        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(conv_output_size, 128)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(128, 64)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.output_layer = nn.Linear(64, num_classes)

        self.dequant = torch.quantization.DeQuantStub()  # DeQuantStub 추가
        self.l2_reg = l2_reg

    def _get_conv_output(self, shape: Tuple[int, int, int]) -> int:
        """
        합성곱 계층의 출력을 계산합니다.

        Args:
            shape (Tuple[int, int, int]): 입력 데이터의 형태.

        Returns:
            int: 합성곱 계층 출력의 크기.
        """
        bs = 1
        input = torch.zeros(bs, *shape)
        output_feat = self._forward_features(input)
        n_size = output_feat.data.view(bs, -1).size(1)
        return n_size

    def _forward_features(self, x: torch.Tensor) -> torch.Tensor:
        """
        합성곱 계층을 통과하는 부분입니다.

        Args:
            x (torch.Tensor): 입력 텐서.

        Returns:
            torch.Tensor: 합성곱 계층의 출력.
        """
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool1(x)
        x = self.bn1(x)

        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool2(x)
        x = self.bn2(x)

        x = self.conv3(x)
        x = torch.relu(x)
        x = self.pool3(x)
        x = self.bn3(x)
        return x

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        모델의 순전파를 정의합니다.

        Args:
            x (torch.Tensor): 입력 텐서.

        Returns:
            torch.Tensor: 모델의 출력.
        """
        x = self.quant(x)  # QuantStub로 양자화 적용
        x = self._forward_features(x)
        x = x.reshape(x.size(0), -1)  # Flatten
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = torch.relu(x)
        x = self.dropout2(x)
        x = self.output_layer(x)  # Sigmoid 제거 (BCEWithLogitsLoss 사용)
        x = self.dequant(x)  # DeQuantStub로 역양자화 적용
        return x 

# ----------------------------- 모델 학습 및 평가 함수 정의 -----------------------------
def train_model(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    criterion,
    optimizer,
    scheduler,
    num_epochs: int,
    device: torch.device,
):
    """
    모델을 훈련합니다.

    Args:
        model (nn.Module): 학습할 모델.
        train_loader (DataLoader): 훈련 데이터 로더.
        val_loader (DataLoader): 검증 데이터 로더.
        criterion: 손실 함수.
        optimizer: 옵티마이저.
        scheduler: 학습률 스케줄러.
        num_epochs (int): 에포크 수.
        device (torch.device): 장치 (CPU 또는 GPU).
    """
    best_val_loss = float("inf")
    patience = 10
    trigger_times = 0

    # 혼합 정밀도 스케일러 초기화
    scaler = torch.amp.GradScaler()

    print("모델 학습 시작...\n")
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        total_samples = 0
        total_correct = 0

        # 훈련 단계
        train_bar = tqdm(
            train_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] 훈련 중", leave=False
        )
        for batch_X, batch_Y in train_bar:
            batch_X = batch_X.to(device)
            batch_Y = batch_Y.to(device)

            optimizer.zero_grad()

            with torch.amp.autocast("cuda"):  # 혼합 정밀도 적용
                outputs = model(batch_X)
                loss = criterion(outputs, batch_Y)
                # L2 정규화는 옵티마이저의 weight_decay로 처리하므로 추가하지 않음

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item() * batch_X.size(0)
            total_samples += batch_X.size(0)

            preds = (torch.sigmoid(outputs) > 0.5).float()
            total_correct += (preds == batch_Y).sum().item()

            # 배치별 손실 업데이트
            train_bar.set_postfix({"Loss": f"{loss.item():.4f}"})

        epoch_loss = running_loss / total_samples
        epoch_acc = total_correct / (total_samples * batch_Y.size(1))

        # 검증 단계
        model.eval()
        val_loss = 0.0
        val_samples = 0
        val_correct = 0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            val_bar = tqdm(
                val_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] 검증 중", leave=False
            )
            for batch_X, batch_Y in val_bar:
                batch_X = batch_X.to(device)
                batch_Y = batch_Y.to(device)

                with torch.amp.autocast("cuda"):  # 혼합 정밀도 적용
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_Y)
                    # L2 정규화는 옵티마이저의 weight_decay로 처리하므로 추가하지 않음

                val_loss += loss.item() * batch_X.size(0)
                val_samples += batch_X.size(0)
                preds = (torch.sigmoid(outputs) > 0.5).float()
                val_correct += (preds == batch_Y).sum().item()
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch_Y.cpu().numpy())

        avg_val_loss = val_loss / val_samples
        val_acc = val_correct / (val_samples * batch_Y.size(1))

        # F1-score, Precision, Recall 계산
        if len(np.unique(all_labels)) > 1:
            epoch_f1 = f1_score(all_labels, all_preds, average="samples", zero_division=0)
            epoch_precision = precision_score(
                all_labels, all_preds, average="samples", zero_division=0
            )
            epoch_recall = recall_score(
                all_labels, all_preds, average="samples", zero_division=0
            )
        else:
            epoch_f1 = 0.0
            epoch_precision = 0.0
            epoch_recall = 0.0

        # 에포크별 성능 지표 출력
        tqdm.write(
            f"Epoch [{epoch+1}/{num_epochs}] 완료 - "
            f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, "
            f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}, "
            f"Val Precision: {epoch_precision:.4f}, Val Recall: {epoch_recall:.4f}, Val F1-Score: {epoch_f1:.4f}"
        )

        # 학습률 스케줄러 단계
        scheduler.step(avg_val_loss)

        # Early Stopping 체크
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            trigger_times = 0
            save_model(
                model,
                os.path.join(MODELS_DIR, "best_model.pt"),
            )
            tqdm.write(f"--> 최고 검증 손실 기록: {best_val_loss:.4f} 저장됨.\n")
        else:
            trigger_times += 1
            tqdm.write(f"--> EarlyStopping 트리거 횟수: {trigger_times}/{patience}\n")
            if trigger_times >= patience:
                tqdm.write("Early stopping 발생. 학습 중단.\n")
                break

    tqdm.write(f"모델 학습 완료. 최고 검증 손실: {best_val_loss:.4f}\n")

def evaluate_model(
    model: nn.Module, test_loader: DataLoader, criterion, device: torch.device
) -> Tuple[float, float, List[float], List[float]]:
    """
    모델을 평가합니다.

    Args:
        model (nn.Module): 평가할 모델.
        test_loader (DataLoader): 테스트 데이터 로더.
        criterion: 손실 함수.
        device (torch.device): 장치 (CPU 또는 GPU).

    Returns:
        Tuple[float, float, List[float], List[float]]: 테스트 손실, 정확도, 예측 값 리스트, 실제 값 리스트.
    """
    # 모델과 데이터를 CPU로 이동
    model.to(device)  # 모델을 CPU로 이동
    model.eval()
    test_loss = 0.0
    total_samples = 0
    all_preds = []
    all_labels = []

    # 가중치도 cpu로 이동
    criterion = nn.BCEWithLogitsLoss(pos_weight=class_weight_tensor.to(device))

    with torch.no_grad():
        for batch_X, batch_Y in test_loader:
            # 배치를 명시적으로 CPU로 이동
            batch_X = batch_X.to('cpu').float()  # batch_X를 CPU로
            batch_Y = batch_Y.to('cpu').float()  # batch_Y를 CPU로

            # 모델 예측 수행
            outputs = model(batch_X)
            loss = criterion(outputs, batch_Y)  # 손실 계산

            test_loss += loss.item() * batch_X.size(0)
            preds = (torch.sigmoid(outputs) > 0.5).float()

            # 결과를 numpy 배열로 변환 (CPU 상에서)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch_Y.cpu().numpy())

    avg_test_loss = test_loss / len(test_loader.dataset)
    avg_test_acc = (
        (np.array(all_preds) == np.array(all_labels)).mean()
    )  # 정확도 계산

    return avg_test_loss, avg_test_acc, all_preds, all_labels


def save_model(model: nn.Module, path: str) -> None:
    """
    모델을 지정된 경로에 저장합니다.

    Args:
        model (nn.Module): 저장할 모델.
        path (str): 저장할 파일 경로.
    """
    torch.save(model.state_dict(), path)
    print(f"모델 저장 완료: {path}")

def load_model(model: nn.Module, path: str, device: torch.device) -> nn.Module:
    """
    모델을 지정된 경로에서 로드합니다.

    Args:
        model (nn.Module): 로드할 모델 구조.
        path (str): 모델 파일 경로.
        device (torch.device): 장치 (CPU 또는 GPU).

    Returns:
        nn.Module: 로드된 모델.
    """
    model.load_state_dict(torch.load(path, map_location=device))
    model.to(device)
    model.eval()
    return model

def load_quantized_model(model_path: str, input_shape: Tuple[int, int, int], num_classes: int, device: torch.device) -> nn.Module:
    """
    양자화된 모델을 불러옵니다.

    Args:
        model_path (str): 저장된 양자화된 모델 파일 경로.
        input_shape (Tuple[int, int, int]): 모델의 입력 데이터 형태.
        num_classes (int): 클래스 수.
        device (torch.device): 사용할 디바이스 (CPU 또는 GPU).

    Returns:
        nn.Module: 불러온 양자화된 모델.
    """
    # x86 엔진 설정
    torch.backends.quantized.engine = 'x86'

    # 양자화된 모델 구조 정의
    model = CNNModel(input_shape=input_shape, num_classes=num_classes)
    model.qconfig = torch.quantization.get_default_qconfig('x86')

    # 양자화 준비 및 변환
    torch.quantization.prepare(model, inplace=True)
    torch.quantization.convert(model, inplace=True)

    # 저장된 양자화된 모델 가중치 로드
    model.load_state_dict(torch.load(model_path, map_location=device))

    # 모델을 지정된 디바이스로 이동
    model.to(device)
    model.eval()

    return model

# ----------------------------- 개별 악기 예측 함수 정의 -----------------------------
def predict_instruments(audio_file_path: str, model_path: str, mlb: MultiLabelBinarizer) -> List[str]:
    # MFCC 추출
    mfcc = extract_mfcc(audio_file_path)
    if mfcc.shape != (N_MFCC, MAX_LEN):
        raise ValueError(f"MFCC 크기가 예상 범위를 벗어남: {mfcc.shape}")

    # 배치 및 채널 차원 추가
    mfcc = np.expand_dims(mfcc, axis=0)  # (1, N_MFCC, MAX_LEN)
    mfcc = np.expand_dims(mfcc, axis=0)  # (1, 1, N_MFCC, MAX_LEN)

    # 텐서 변환
    mfcc_tensor = torch.tensor(mfcc, dtype=torch.float32)

    # 정규화
    mean = mfcc_tensor.mean()
    std = mfcc_tensor.std()
    mfcc_tensor = (mfcc_tensor - mean) / (std + 1e-6)

    # 모델 로드
    device_cpu = torch.device('cpu')
    input_shape = (1, N_MFCC, MAX_LEN)
    num_classes = len(mlb.classes_)

    # 양자화된 모델 정의
    model = CNNModel(input_shape, num_classes)
    model.qconfig = torch.quantization.get_default_qconfig('x86')  # 양자화 구성 동일하게 설정
    torch.backends.quantized.engine = 'x86'
    torch.quantization.prepare(model, inplace=True)
    torch.quantization.convert(model, inplace=True)

    # 양자화된 모델 가중치 로드
    model.load_state_dict(torch.load(model_path, map_location=device_cpu))
    model.to(device_cpu)
    model.eval()

    # 예측
    with torch.no_grad():
        output = model(mfcc_tensor)
        sigmoid_output = torch.sigmoid(output).cpu().numpy()[0]
        print("Sigmoid Outputs:", sigmoid_output)

        # 클래스별 확률 값 리스트 생성
        class_probabilities = [f"{mlb.classes_[i]}: {round(prob, 4)}" for i, prob in enumerate(sigmoid_output)]

        # 한 줄 리스트 형태로 출력
        print("0 ~ 1 Outputs:", ", ".join(class_probabilities))

        predicted = sigmoid_output > 0.5
        predicted_instruments = [mlb.classes_[i] for i, val in enumerate(predicted) if val]

    return predicted_instruments


# 전처리 데이터 로드

In [30]:
# 메타데이터 로드
print("메타데이터 로드 중...")
metadata = load_metadata(METADATA_FILE)
print(f"메타데이터 로드 완료: {len(metadata)} 샘플")

# 데이터 전처리 및 저장
if not os.path.exists(HDF5_FILE):
    print("데이터 전처리 시작...")
    preprocess_and_save_data(metadata, DATA_DIR, HDF5_FILE)
    print("데이터 전처리 및 저장 완료.")
else:
    print("전처리된 데이터 파일이 존재합니다. 로드합니다.")

# 전처리된 데이터 로드
print("전처리된 데이터 로드 중...")
X, Y = load_preprocessed_data(HDF5_FILE)
print(f"전처리된 데이터 로드 완료: X shape={X.shape}, Y shape={Y.shape}")

# 레이블 인코딩
print("레이블 인코딩 중...")
Y_encoded, mlb, all_instruments = encode_labels(Y)
print(f"레이블 인코딩 완료: {len(all_instruments)} 종류의 악기")

if len(all_instruments) == 0:
    raise ValueError("레이블 인코딩 실패: 악기 리스트가 비어 있습니다.")

# 데이터셋 분할
print("데이터셋 분할 중...")
X_train, X_temp, Y_train, Y_temp = train_test_split(
    X, Y_encoded, test_size=0.2, random_state=42
)
X_val, X_test, Y_val, Y_test = train_test_split(
    X_temp, Y_temp, test_size=0.5, random_state=42
)
print(f"데이터셋 분할 완료:")
print(f" - 훈련 세트: {X_train.shape[0]} 샘플")
print(f" - 검증 세트: {X_val.shape[0]} 샘플")
print(f" - 테스트 세트: {X_test.shape[0]} 샘플")

메타데이터 로드 중...
메타데이터 로드 완료: 231980 샘플
전처리된 데이터 파일이 존재합니다. 로드합니다.
전처리된 데이터 로드 중...
전처리된 데이터 로드 완료: X shape=(231980, 40, 174), Y shape=(231980,)
레이블 인코딩 중...
레이블 인코딩 완료: 16 종류의 악기
데이터셋 분할 중...
데이터셋 분할 완료:
 - 훈련 세트: 185584 샘플
 - 검증 세트: 23198 샘플
 - 테스트 세트: 23198 샘플


# 모델 학습 / 평가

In [None]:
# 클래스 가중치 계산
print("클래스 가중치 계산 중...")
# 멀티레이블 분류이므로 각 클래스에 대해 가중치를 계산
class_weights = []
for i in range(Y_encoded.shape[1]):
    classes = np.unique(Y_train[:, i])
    if len(classes) > 1:
        cw = compute_class_weight(
            class_weight="balanced", classes=classes, y=Y_train[:, i]
        )
        class_weights.append(cw[1])  # Positive class weight
    else:
        class_weights.append(1.0)  # Default weight

class_weight_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)
print(f"클래스 가중치: {class_weight_tensor}")

# 하이퍼파라미터 설정
if APPLY_HYPERPARAMETER_TUNING:
    print("하이퍼파라미터 튜닝을 수행합니다...")
    # 일단 생략.............
else:
    learning_rate = LEARNING_RATE
    dropout_rate = 0.5
    num_epochs = EPOCHS

# 모델 구축
input_shape = (1, MAX_LEN, N_MFCC)  # 입력 형태 수정
num_classes = Y_encoded.shape[1]  # 멀티레이블 분류를 위한 클래스 수
model = CNNModel(input_shape, num_classes, dropout_rate=dropout_rate).to(device)

# 옵티마이저와 손실 함수 설정
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)  # L2 정규화 포함
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weight_tensor)

# 학습률 스케줄러 설정
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

# 데이터로더 생성
train_loader, val_loader, test_loader = create_data_loaders(
    X_train, Y_train, X_val, Y_val, X_test, Y_test
)

# # 모델 훈련
# print("모델 훈련 시작...")
# train_model(
#     model=model,
#     train_loader=train_loader,
#     val_loader=val_loader,
#     criterion=criterion,
#     optimizer=optimizer,
#     scheduler=scheduler,
#     num_epochs=num_epochs,
#     device=device,
# )
# print("모델 훈련 완료.")

# model = load_model(model,os.path.join(MODELS_DIR, f"optimized_model.pt"),device)

# # 모델 양자화 적용 (Quantization)
# print("모델 양자화 시작...")
# model.eval()
# model.cpu()
# model.qconfig = torch.quantization.get_default_qconfig('x86')
# torch.backends.quantized.engine = 'x86'
# torch.quantization.prepare(model, inplace=True)
# # 캘리브레이션 (간단히 검증 데이터로 수행)
# with torch.no_grad():
#     for batch_X, _ in tqdm(val_loader, desc="양자화 캘리브레이션 중", leave=False):
#         batch_X = batch_X.to('cpu')
#         outputs = model(batch_X)
# torch.quantization.convert(model, inplace=True)
# print("모델 양자화 완료.")

# 모델 저장
# save_model(model, os.path.join(MODELS_DIR, "optimized_model.pt"))


# CPU에서 평가
device_cpu = torch.device("cpu")  # CPU 장치 설정
model.to(device_cpu)  # 모델을 CPU로 이동

# 이미 학습된 모델 로드하기 위해..
model = load_quantized_model(
    model_path=os.path.join(MODELS_DIR, "optimized_model.pt"),
    input_shape=input_shape,
    num_classes=num_classes,
    device=device_cpu
)


# 테스트 데이터 평가
print("테스트 데이터 평가 중...")
avg_test_loss, avg_test_acc, all_preds, all_labels = evaluate_model(
    model=model, test_loader=test_loader, criterion=criterion, device=device_cpu
)
print(f"테스트 손실: {avg_test_loss}")
print(f"테스트 정확도: {avg_test_acc}")

# F1-score 계산
if len(np.unique(all_labels)) > 1:
    f1 = f1_score(all_labels, all_preds, average="samples", zero_division=0)
    print(f"F1-score: {f1}")
else:
    print("F1-score 계산 불가: 테스트 세트에 하나의 클래스만 존재합니다.")

# 분류 리포트 출력
if len(np.unique(all_labels)) > 1:
    report = classification_report(
        all_labels,
        all_preds,
        target_names=mlb.classes_,
        zero_division=0
    )
    print(report)
else:
    print("분류 리포트 출력 불가: 테스트 세트에 하나의 클래스만 존재합니다.")

# 메모리 정리
del model
del train_loader
del val_loader
del test_loader
gc.collect()
if device.type == "cuda":
    torch.cuda.empty_cache()

print("\n모든 작업 완료.")

# 예측할 오디오 파일 경로
audio_file_path = "/content/drive/MyDrive/Sonus/Feature-based/concerto_sample_1.mp3" 

# 예측 수행
print("개별 오디오 파일에 대한 악기 예측 중...")
predicted_instruments = predict_instruments(
    audio_file_path,
    os.path.join(MODELS_DIR, "optimized_model.pt"),
    mlb
)
print(f"예측된 악기: {predicted_instruments}")

클래스 가중치 계산 중...
클래스 가중치: tensor([0.9408, 0.9405, 0.9382, 0.9408, 0.9441, 0.9405, 0.9424, 0.9412, 0.9411,
        0.9395, 0.9421, 0.9443, 0.9401, 0.9431, 0.9435, 0.9429],
       device='cuda:0')
테스트 데이터 평가 중...


  model.load_state_dict(torch.load(model_path, map_location=device))


테스트 손실: 0.4078852299888932
테스트 정확도: 0.8158596646262609
F1-score: 0.7290878684355311
               precision    recall  f1-score   support

bass clarinet       0.87      0.79      0.83     12396
      bassoon       0.90      0.84      0.87     12471
        cello       0.77      0.71      0.74     12391
     clarinet       0.82      0.85      0.83     12483
clash cymbals       1.00      1.00      1.00     12448
  double bass       0.72      0.96      0.82     12534
        flute       0.79      0.67      0.72     12412
  french horn       0.79      0.71      0.75     12567
         oboe       0.87      0.72      0.79     12488
    saxophone       0.85      0.81      0.83     12395
   tambourine       0.97      0.99      0.98     12391
     trombone       0.82      0.81      0.81     12381
      trumpet       0.83      0.80      0.81     12460
         tuba       0.88      0.93      0.91     12383
        viola       0.77      0.72      0.75     12371
       violin       0.77      0.71 

  model.load_state_dict(torch.load(model_path, map_location=device_cpu))
