# 1.config/config.yaml

In [None]:
data:

  train_path: /Users/taehayeong/Desktop/data/deepvoice_data/train_data
  test_path: /Users/taehayeong/Desktop/data/deepvoice_data/test_data

  sample_rate: 16000
  n_feature: 4
  segment_frames: 300 
  max_samples: 500

model:
  hidden_dim: 128
  latent_dim: 64
  num_layers: 2

training:
  batch_size: 8
  lr: 0.001
  epochs: 20
  save_path: checkpoints/lstm_ae.pth

# 2. src/dataloader.py

In [None]:
from torch.utils.data import DataLoader
from src.dataset import VoiceDataset
#src폴더안의 dataset.py에서 VoiceDataset 함수 import
from src.utils import load_config

cfg = load_config("config/config.yaml")

train_dataset = VoiceDataset(cfg, mode="train")
train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    drop_last=True
)


# 3. src/dataset.py

In [None]:
# src/dataset.py
import os
import librosa
import torch
import numpy as np
from torch.utils.data import Dataset


class SegmentVoiceDataset(Dataset):
    def __init__(self, cfg, mode="train"):
        """
        mode: 'train' or 'test'
        """
        self.cfg = cfg
        self.mode = mode
        self.segment_frames = cfg["data"]["segment_frames"]
        self.sr = cfg["data"]["sample_rate"]
        self.hop = 512  # librosa default hop

        self.items = []
        # item = {
        #   "path": wav_path,
        #   "label": 0 or 1,
        #   "start": start_frame,
        #   "wav_id": wav index
        # }

        if mode == "train":
            self._prepare_train()
        elif mode == "test":
            self._prepare_test()
        else:
            raise ValueError("mode must be 'train' or 'test'")

        print(f"[Dataset] {mode} segments: {len(self.items)}")

    # --------------------------------------------------
    # Train: 정상 flac만 (구조 검증용으로 wav 수 제한)
    # --------------------------------------------------
    def _prepare_train(self):
        root = self.cfg["data"]["train_path"]
        wav_id = 0

        wav_paths = []
        for r, _, files in os.walk(root):
            for f in files:
                if f.endswith(".flac"):
                    wav_paths.append(os.path.join(r, f))

        wav_paths = wav_paths[:5]

        for path in wav_paths:
            wav, _ = librosa.load(path, sr=self.sr)

            n_frames = len(wav) // self.hop
            n_segments = n_frames // self.segment_frames

            for i in range(n_segments):
                self.items.append({
                    "path": path,
                    "label": 0,
                    "start": i * self.segment_frames,
                    "wav_id": wav_id
                })

            wav_id += 1

    # --------------------------------------------------
    # Test: flac = 정상, wav = 딥보이스 (소량만)
    # --------------------------------------------------
    def _prepare_test(self):
        root = self.cfg["data"]["test_path"]
        wav_id = 0

        wav_items = []
        for r, _, files in os.walk(root):
            for f in files:
                if f.endswith(".flac"):
                    wav_items.append((os.path.join(r, f), 0))
                elif f.endswith(".wav"):
                    wav_items.append((os.path.join(r, f), 1))

        normal = [x for x in wav_items if x[1] == 0][:3]
        fake = [x for x in wav_items if x[1] == 1][:3]
        wav_items = normal + fake

        for path, label in wav_items:
            wav, _ = librosa.load(path, sr=self.sr)

            n_frames = len(wav) // self.hop
            n_segments = n_frames // self.segment_frames

            for i in range(n_segments):
                self.items.append({
                    "path": path,
                    "label": label,
                    "start": i * self.segment_frames,
                    "wav_id": wav_id
                })

            wav_id += 1

    # --------------------------------------------------
    # Prosody feature (가볍게 유지)
    # --------------------------------------------------
    def extract_prosody(self, wav):
        f0, _, _ = librosa.pyin(
            wav,
            fmin=librosa.note_to_hz("C2"),
            fmax=librosa.note_to_hz("C7"),
            sr=self.sr
        )
        f0 = np.nan_to_num(f0)

        rms = librosa.feature.rms(y=wav)[0]

        f0_delta = np.diff(f0, prepend=f0[0])
        rms_delta = np.diff(rms, prepend=rms[0])

        feat = np.stack([f0, f0_delta, rms, rms_delta], axis=0)
        return feat  # (4, T)

    # --------------------------------------------------
    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        item = self.items[idx]

        wav, _ = librosa.load(item["path"], sr=self.sr)

        start_frame = item["start"]
        end_frame = start_frame + self.segment_frames

        start_sample = start_frame * self.hop
        end_sample = end_frame * self.hop

        segment = wav[start_sample:end_sample]

        # 길이 보정
        expected_len = self.segment_frames * self.hop
        if len(segment) < expected_len:
            segment = np.pad(segment, (0, expected_len - len(segment)))

        feat = self.extract_prosody(segment)
        feat = torch.tensor(feat, dtype=torch.float32)

        if self.mode == "train":
            return feat
        else:
            return feat, item["label"], item["wav_id"]

# 4.src/models.py

In [None]:
# src/models.py
import torch
import torch.nn as nn

class LSTMAutoEncoder(nn.Module):
    def __init__(
        self,
        n_feature=4,
        hidden_dim=128,
        latent_dim=64,
        num_layers=2
    ):
        super().__init__()

        # -------- Encoder --------
        self.encoder = nn.LSTM(
            input_size=n_feature,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )

        self.to_latent = nn.Linear(hidden_dim, latent_dim)

        # -------- Decoder --------
        self.from_latent = nn.Linear(latent_dim, hidden_dim)

        self.decoder = nn.LSTM(
            input_size=hidden_dim,
            hidden_size=n_feature,
            num_layers=num_layers,
            batch_first=True
        )

    def forward(self, x):
        
        # (B, n_mels, T) → (B, T, n_mels)
        x = x.permute(0, 2, 1)

        # -------- Encoder --------
        enc_out, (h_n, _) = self.encoder(x)
        # h_n: (num_layers, B, hidden_dim)

        h_last = h_n[-1]                 # (B, hidden_dim)
        z = self.to_latent(h_last)       # (B, latent_dim)

        # -------- Decoder --------
        h_dec = self.from_latent(z)      # (B, hidden_dim)

        # repeat for each timestep
        T = x.size(1)
        h_dec_seq = h_dec.unsqueeze(1).repeat(1, T, 1)

        recon, _ = self.decoder(h_dec_seq)
        # recon: (B, T, n_mels)

        # (B, T, n_mels) → (B, n_mels, T)
        recon = recon.permute(0, 2, 1)

        return recon


# 5.src/utils.py

In [None]:
import yaml

def load_config(path):
    with open(path, "r") as f:
        cfg = yaml.safe_load(f)
    return cfg


# 6.eval.py

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

def plot_roc(scores, labels):
    """
    scores: anomaly scores (higher = more anomalous)
    labels: 0 (normal), 1 (fake)
    """

    fpr, tpr, thresholds = roc_curve(labels, scores)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(6, 6))
    plt.plot(
        fpr,
        tpr,
        color="darkorange",
        lw=2,
        label=f"ROC curve (AUC = {roc_auc:.3f})"
    )
    plt.plot([0, 1], [0, 1], color="navy", lw=1, linestyle="--")

    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve (DeepVoice Anomaly Detection)")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.show()

    return roc_auc


# 7.plot_distribution.py

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def plot_score_distribution(scores, labels):
    normal_scores = scores[labels == 0]
    fake_scores = scores[labels == 1]

    plt.figure(figsize=(8, 5))

    plt.hist(
        normal_scores,
        bins=50,
        alpha=0.6,
        label="Normal (Human)",
        density=True
    )
    plt.hist(
        fake_scores,
        bins=50,
        alpha=0.6,
        label="DeepVoice (Fake)",
        density=True
    )

    plt.xlabel("Anomaly Score (Reconstruction Error)")
    plt.ylabel("Density")
    plt.title("Anomaly Score Distribution")
    plt.legend()
    plt.grid(True)
    plt.show()


# 8.train.py

In [None]:
# train.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
from src.dataset import SegmentVoiceDataset
from src.models import LSTMAutoEncoder
from src.utils import load_config
import os

def train():
    # ---------- config ----------
    cfg = load_config("config/config.yaml")

    if torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    print("Using device:", device)


    # ---------- dataset / dataloader ----------
    train_dataset = SegmentVoiceDataset(cfg, mode="train")
    train_loader = DataLoader(
        train_dataset,
        batch_size=16,
        shuffle=True,
        drop_last=True
    )

    # ---------- model ----------
    model = LSTMAutoEncoder(
        n_mels=cfg["data"]["n_mels"],
        hidden_dim=cfg["model"]["hidden_dim"],
        latent_dim=cfg["model"]["latent_dim"],
        num_layers=cfg["model"]["num_layers"]
    ).to(device)

    # ---------- loss / optimizer ----------
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=cfg["training"]["lr"]
    )

    # ---------- training loop ----------
    model.train()
    for epoch in range(cfg["training"]["epochs"]):
        epoch_loss = 0.0
        pbar = tqdm(
            train_loader,
            desc=f"Epoch [{epoch+1}/{cfg['training']['epochs']}]",
            total=len(train_loader)
        )
        for x in train_loader:
            x = x.to(device)

            x_hat = model(x)
            loss = criterion(x_hat, x)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            pbar.set_postfix(loss=loss.item())

        epoch_loss /= len(train_loader)
        print(f"[Epoch {epoch+1}] loss: {epoch_loss:.6f}")

    save_path = cfg["training"]["save_path"]
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    torch.save(model.state_dict(), save_path)

    # ---------- save ----------
    torch.save(model.state_dict(), cfg["training"]["save_path"])
    print("Model saved.")

if __name__ == "__main__":
    train()



# 9.test.py

In [None]:
## test.py
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader
from tqdm import tqdm
from collections import defaultdict

from src.dataset import SegmentVoiceDataset
from src.models import LSTMAutoEncoder
from src.utils import load_config
from eval import plot_roc


def test():
    # ---------- config ----------
    cfg = load_config("config/config.yaml")

    # ---------- device ----------
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    print("Using device:", device)

    # ---------- dataset / dataloader ----------
    test_dataset = SegmentVoiceDataset(cfg, mode="test")
    test_loader = DataLoader(
        test_dataset,
        batch_size=1,
        shuffle=False
    )

    # ---------- model ----------
    model = LSTMAutoEncoder(
        n_mels=cfg["data"]["n_mels"],
        hidden_dim=cfg["model"]["hidden_dim"],
        latent_dim=cfg["model"]["latent_dim"],
        num_layers=cfg["model"]["num_layers"]
    ).to(device)

    model.load_state_dict(
        torch.load(cfg["training"]["save_path"], map_location=device)
    )
    model.eval()

    # ---------- loss ----------
    criterion = nn.MSELoss(reduction="none")

    # ---------- containers ----------
    wav_scores = defaultdict(list)   # wav_id -> list of segment scores
    wav_labels = {}                  # wav_id -> label (0/1)

    # ---------- inference ----------
    with torch.no_grad():
        pbar = tqdm(
            test_loader,
            desc="Testing (segment-level)",
            total=len(test_loader)
        )

        for x, label, wav_id in pbar:
            x = x.to(device)

            x_hat = model(x)

            # reconstruction error (segment-level)
            loss = criterion(x_hat, x)
            score = loss.mean().item()

            wid = wav_id.item()
            wav_scores[wid].append(score)
            wav_labels[wid] = label.item()

            pbar.set_postfix(score=score)

    # ---------- wav-level aggregation ----------
    final_scores = []
    final_labels = []

    for wid, scores in wav_scores.items():
        # top-k mean (k=5)
        scores = sorted(scores, reverse=True)
        k = min(5, len(scores))
        wav_score = np.mean(scores[:k])

        final_scores.append(wav_score)
        final_labels.append(wav_labels[wid])

    final_scores = np.array(final_scores)
    final_labels = np.array(final_labels)

    # ---------- stats ----------
    print("\nTest finished (wav-level).")
    print("Total wavs:", len(final_scores))
    print("Score stats:")
    print("  mean:", final_scores.mean())
    print("  std :", final_scores.std())
    print("Normal mean:", final_scores[final_labels == 0].mean())
    print("Fake mean  :", final_scores[final_labels == 1].mean())

    # ---------- ROC ----------
    auc_score = plot_roc(final_scores, final_labels)
    print("AUC:", auc_score)

    return final_scores, final_labels


if __name__ == "__main__":
    test()


# model2.py

# train2.py

# test2.py