# Project Setup (Colab)

Run these cells from top to bottom to build a stable, reproducible environment.

In [None]:
# ======================================
# 🔧 INSTALL DEPENDENCIES (run once, then restart kernel if prompted)
# ======================================
#%%capture
#%pip install -q --force-reinstall     numpy==1.26.4     scipy==1.13.1     torch==2.4.1     torchaudio==2.4.1     coqui-tts==0.23.1     pandas==2.2.3     matplotlib==3.9.2     scikit-learn==1.5.2     tqdm==4.66.5

# After running this cell, restart the runtime by going to "Runtime" -> "Restart session" in the Colab menu.

In [None]:
#!pip uninstall -y numpy pandas scipy
#!pip install --no-cache-dir --force-reinstall numpy==1.26.4 pandas==2.2.3 scipy==1.13.1

In [None]:
import numpy as np, pandas as pd, scipy
print("NumPy:", np.__version__)
print("Pandas:", pd.__version__)
print("SciPy:", scipy.__version__)

NumPy: 1.26.4
Pandas: 2.2.3
SciPy: 1.13.1


In [None]:
# ======================================
# 📦 IMPORT LIBRARIES
# ======================================
import os, sys, glob, random, shutil, csv, itertools, threading, platform, importlib
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict

# Core scientific stack
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
from tqdm import tqdm

# Machine learning / audio
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from torch.utils.data import DataLoader, Dataset

from sklearn.metrics import classification_report
from sklearn.manifold import TSNE

# Coqui Text-to-Speech
from TTS.api import TTS

%matplotlib inline


ModuleNotFoundError: No module named 'TTS'

In [None]:
# ======================================
# 🎯 REPRODUCIBILITY (Seed everything)
# ======================================
import random
import numpy as _np
import torch as _torch

SEED = 42
random.seed(SEED)
_np.random.seed(SEED)
_torch.manual_seed(SEED)
if _torch.cuda.is_available():
    _torch.cuda.manual_seed_all(SEED)

print(f"Seed set to {SEED}")


Seed set to 42


In [None]:
# ======================================
# ✅ ENVIRONMENT CHECK
# ======================================
import platform
print("Environment is ready!")
print(f"Python: {platform.python_version()}")
print(f"NumPy: {np.__version__} | SciPy: {scipy.__version__}")
print(f"Torch: {torch.__version__} | Torchaudio: {torchaudio.__version__}")
print(f"Pandas: {pd.__version__}")
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("CUDA device:", torch.cuda.get_device_name(0))


Environment is ready!
Python: 3.12.12
NumPy: 1.26.4 | SciPy: 1.13.1
Torch: 2.8.0+cu126 | Torchaudio: 2.8.0+cu126
Pandas: 2.2.3
CUDA available: True
CUDA device: Tesla T4


In [None]:
# ======================================
# 🧊 FREEZE ENVIRONMENT (lock file + system info)
# ======================================
import os, json, platform
import numpy as _numpy
import scipy as _scipy
import torch as _torch
import pandas as _pandas
from google.colab import drive
drive.mount('/content/drive')

LOCK_TXT = "/content/requirements_lock.txt"
LOCK_JSON = "/content/env_lock.json"

# Freeze exact package versions
!pip freeze > "$LOCK_TXT"

# Save system info + core libs versions
env_info = {
    "python": platform.python_version(),
    "platform": platform.platform(),
    "cuda_available": _torch.cuda.is_available(),
    "cuda_device": (_torch.cuda.get_device_name(0) if _torch.cuda.is_available() else None),
    "versions": {
        "numpy": _numpy.__version__,
        "scipy": _scipy.__version__,
        "torch": _torch.__version__,
        "pandas": _pandas.__version__,
    }
}
with open(LOCK_JSON, "w") as f:
    json.dump(env_info, f, indent=2)

print(f"Saved lockfile to: {LOCK_TXT}")
print(f"Saved environment info to: {LOCK_JSON}")

# If Drive is mounted, also copy there for persistence
drive_base = "/content/drive/MyDrive/ColabEnvLocks"
if os.path.exists("/content/drive"):
    os.makedirs(drive_base, exist_ok=True)
    !cp -f "$LOCK_TXT" "$drive_base/requirements_lock.txt"
    !cp -f "$LOCK_JSON" "$drive_base/env_lock.json"
    print(f"Also copied to Drive: {drive_base}")
else:
    print("Google Drive is not mounted; skipping Drive backup.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Saved lockfile to: /content/requirements_lock.txt
Saved environment info to: /content/env_lock.json
Also copied to Drive: /content/drive/MyDrive/ColabEnvLocks


In [None]:
!pip install coqui-tts==0.23.1

Collecting coqui-tts==0.23.1
  Downloading coqui_tts-0.23.1-cp312-cp312-manylinux1_x86_64.whl.metadata (20 kB)
Collecting anyascii>=0.3.0 (from coqui-tts==0.23.1)
  Downloading anyascii-0.3.3-py3-none-any.whl.metadata (1.6 kB)
Collecting pysbd>=0.3.4 (from coqui-tts==0.23.1)
  Downloading pysbd-0.3.4-py3-none-any.whl.metadata (6.1 kB)
Collecting coqui-tts-trainer>=0.1 (from coqui-tts==0.23.1)
  Downloading coqui_tts_trainer-0.3.1-py3-none-any.whl.metadata (8.1 kB)
Collecting coqpit>=0.0.16 (from coqui-tts==0.23.1)
  Downloading coqpit-0.0.17-py3-none-any.whl.metadata (11 kB)
Collecting pypinyin (from coqui-tts==0.23.1)
  Downloading pypinyin-0.55.0-py2.py3-none-any.whl.metadata (12 kB)
Collecting hangul-romanize (from coqui-tts==0.23.1)
  Downloading hangul_romanize-0.1.0-py3-none-any.whl.metadata (1.2 kB)
Collecting gruut==2.2.3 (from gruut[de,es,fr]==2.2.3->coqui-tts==0.23.1)
  Downloading gruut-2.2.3.tar.gz (73 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.5

In [None]:
# ======================================
# 🔁 RESTORE ENV FROM LOCK (use on fresh runtimes)
# ======================================
%%capture
# Prefer Drive lock if available, else local
LOCK_TXT = "/content/drive/MyDrive/ColabEnvLocks/requirements_lock.txt"
FALLBACK_LOCK = "/content/requirements_lock.txt"
import os
lock_to_use = LOCK_TXT if os.path.exists(LOCK_TXT) else FALLBACK_LOCK
print(f"Installing from lock: {lock_to_use}")
%pip install -q --no-deps -r "$lock_to_use"


In [None]:

# ======================================
# ⬇️ OPTIONAL: PRE-DOWNLOAD TTS MODEL WEIGHTS (persist to Drive if mounted)
# ======================================
from TTS.api import TTS
import os

MODEL_NAME = "tts_models/en/ljspeech/tacotron2-DDC"  # change if you need a different model
LOCAL_DIR = "/content/models/tts"
os.makedirs(LOCAL_DIR, exist_ok=True)

# Instantiate once to trigger download into cache; also synthesize a tiny file to ensure weights are present
tts = TTS(model_name=MODEL_NAME, progress_bar=False, gpu=torch.cuda.is_available())
tts.tts_to_file(text="setup", file_path=f"{LOCAL_DIR}/_warmup.wav")
print("Model ready:", MODEL_NAME)

# If Drive is mounted, copy cache for persistence
if os.path.exists("/content/drive"):
    DRIVE_DIR = "/content/drive/MyDrive/ColabModels/tts"
    os.makedirs(DRIVE_DIR, exist_ok=True)
    print("Drive detected. Consider syncing ~/.local/share/tts to Drive for full persistence.")
else:
    print("Drive not mounted — model will be cached only in this runtime.")


  re_han_default = re.compile("([\u4E00-\u9FD5a-zA-Z0-9+#&\._%\-]+)", re.U)
  re_skip_default = re.compile("(\r\n|\s)", re.U)
  re_skip = re.compile("([a-zA-Z0-9]+(?:\.\d+)?%?)")


Model ready: tts_models/en/ljspeech/tacotron2-DDC
Drive detected. Consider syncing ~/.local/share/tts to Drive for full persistence.


In [None]:
# ======================================
# 📦 OPTIONAL: EXTRACT ALL ZIP DATASETS IN /content
# ======================================
import zipfile, glob, os
DATA_DIR = "/content/data"
os.makedirs(DATA_DIR, exist_ok=True)

zips = glob.glob("/content/*.zip")
for z in zips:
    print("Extracting:", z)
    with zipfile.ZipFile(z, 'r') as zip_ref:
        zip_ref.extractall(DATA_DIR)

print("Done. Files in data dir:")
for root, dirs, files in os.walk(DATA_DIR):
    for f in files[:50]:
        print(os.path.join(root, f))


Done. Files in data dir:


# extract_data


In [None]:
# import shutil
# import os

# # הנתיב לתיקייה שנוצרה בהרצה הקודמת
# destination_folder = "/content/vctk_full"

# # בדיקה אם התיקייה קיימת, ואז מחיקה
# if os.path.exists(destination_folder):
#     shutil.rmtree(destination_folder)
#     print(f" התיקייה '{destination_folder}' נמחקה בהצלחה.")
# else:
#     print(f"ℹ התיקייה '{destination_folder}' לא קיימת, אין מה למחוק.")


In [None]:
import zipfile
import os
import shutil
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# List contents to verify paths (optional)
root_path = '/content/drive/My Drive/'
print("Contents of 'My Drive':", os.listdir(root_path))

subfolder_path = '/content/drive/My Drive/Colab Notebooks/'
print("Contents of 'Colab Notebooks':", os.listdir(subfolder_path))

# Define paths
zip_file = "/content/drive/My Drive/Colab Notebooks/archive.zip"  # Path to your ZIP file
destination_folder = "/content/vctk_samples"  # Where to extract selected data
wanted_speakers = ["p225", "p226", "p227", "p228"]  # Select specific speakers

# Check if the ZIP file exists
if os.path.isfile(zip_file):
    print(" ZIP file found:", zip_file)
else:
    raise FileNotFoundError(f" ZIP file not found: {zip_file}")

# Create destination folder if it doesn't exist
os.makedirs(destination_folder, exist_ok=True)

# Selectively extract only desired speaker folders from the ZIP
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    extracted_files = 0
    for file in zip_ref.namelist():
        if any(f"VCTK-Corpus/wav48/{spk}/" in file or f"VCTK-Corpus/txt/{spk}/" in file for spk in wanted_speakers):
            # Ensure directory structure is preserved
            target_path = os.path.join(destination_folder, file)
            os.makedirs(os.path.dirname(target_path), exist_ok=True)
            with zip_ref.open(file) as source, open(target_path, 'wb') as target:
                shutil.copyfileobj(source, target)
            extracted_files += 1

print(f"\n Extraction complete: {extracted_files} files were extracted.")
print(f" Extracted data is available in: {destination_folder}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Contents of 'My Drive': ['Colab Notebooks', 'Deep learning', 'ColabEnvLocks', 'ColabModels', 'Colab_Data']
Contents of 'Colab Notebooks': ['archive.zip', 'fake_audio.zip', 'Copy of Welcome To Colab', 'Untitled0.ipynb', 'Untitled1.ipynb', 'FinalProjectCS.ipynb', 'FinalProjectCS_reset_setup.ipynb']
 ZIP file found: /content/drive/My Drive/Colab Notebooks/archive.zip

 Extraction complete: 2684 files were extracted.
 Extracted data is available in: /content/vctk_samples


# Installations


In [None]:
!apt-get install -y espeak-ng

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  espeak-ng-data libespeak-ng1 libpcaudio0 libsonic0
The following NEW packages will be installed:
  espeak-ng espeak-ng-data libespeak-ng1 libpcaudio0 libsonic0
0 upgraded, 5 newly installed, 0 to remove and 38 not upgraded.
Need to get 4,526 kB of archives.
After this operation, 11.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libpcaudio0 amd64 1.1-6build2 [8,956 B]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 libsonic0 amd64 0.2.0-11build1 [10.3 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 espeak-ng-data amd64 1.50+dfsg-10ubuntu0.1 [3,956 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libespeak-ng1 amd64 1.50+dfsg-10ubuntu0.1 [207 kB]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 espeak-ng amd64 1.50+dfsg-1

Deleting fake_wav48 if it exists

# generate_fake_data

In [None]:
# import os
# from TTS.api import TTS
# import shutil

# fake_audio_folder = "/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/fake_wav48"
# # פונקציה למחיקת כל קבצי האודיו שנוצרו
# def clear_fake_audio_folder(fake_audio_folder):
#     if os.path.exists(fake_audio_folder):
#         shutil.rmtree(fake_audio_folder)  # מוחק את כל התיקייה כולל הקבצים שבה
#         os.makedirs(fake_audio_folder, exist_ok=True)  # יוצר מחדש את התיקייה הריקה
#         print(f" כל הקבצים בתיקייה '{fake_audio_folder}' נמחקו!")

# clear_fake_audio_folder(fake_audio_folder)
# quit()

Creating Fake Audio (No Need to run now)

In [None]:
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import torch

from TTS.api import TTS

# =========================
# CONFIG
# =========================
# 1) Input text files (one .txt per utterance)
TEXT_ROOT = Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/txt")

# 2) Real audio folder (matching real WAVs you want to clone the voice from)
#    The script will try to find a matching WAV by the text filename stem inside the corresponding subfolder.
#    Example: If text is ".../p228/p228_065.txt", it will try "/.../real_audio_folder/p228/p228_065.wav"
REAL_AUDIO_ROOT = Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/wav48")  # <-- CHANGE if needed

# 3) Output folder for fake audio
OUT_ROOT = Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/fake_wav48_xtts")
OUT_ROOT.mkdir(parents=True, exist_ok=True)

# 4) Speaker metadata (for bookkeeping only; XTTS uses speaker_wav for actual voice)
SPEAKER_INFO = {
    "p225": ("F", "22", "Southern England"),
    "p226": ("M", "22", "Surrey"),
    "p227": ("M", "38", "Cumbria"),
    "p228": ("F", "22", "Southern England"),
}

# 5) Language to synthesize in (VCTK is English)
LANGUAGE = "en"

# 6) Concurrency settings:
#    On GPU, keep MAX_WORKERS=1 (XTTS is heavy and not thread-safe on CUDA).
#    On CPU, you can increase to 4 (or more if your machine can handle it).
GPU_AVAILABLE = torch.cuda.is_available()
MAX_WORKERS = 1 if GPU_AVAILABLE else 4

# =========================
# MODEL LOADING
# =========================
# Use XTTS v2 for voice cloning with a reference WAV.
# (We also lazy-load a VCTK-VITS fallback if a real WAV is missing.)
print("Loading XTTS v2 model...")
tts_xtts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", progress_bar=True)
device = "cuda" if GPU_AVAILABLE else "cpu"
tts_xtts.to(device)
print(f"XTTS is running on: {device}")

# Fallback multi-speaker model (only used when no real WAV is found)
print("Loading VCTK-VITS fallback model...")
tts_vctk = TTS(model_name="tts_models/en/vctk/vits", progress_bar=False)
tts_vctk.to(device)

# Mutex for model calls if you insist on >1 threads on CPU (XTTS is heavy; serialize calls by default on GPU).
synth_lock = threading.Lock() if MAX_WORKERS > 1 else None

# =========================
# HELPERS
# =========================
def to_vctk_id(s: str) -> str:
    """Convert '228' -> 'p228' for VCTK-style speaker IDs."""
    s = s.strip()
    return s if s.startswith("p") else f"p{s}"

def find_matching_real_wav(real_root: Path, subdir: str, txt_filename: str) -> Path | None:
    """
    Try to find the real WAV that matches the text file.
    Strategy:
      1) exact same stem under REAL_AUDIO_ROOT/subdir:  <stem>.wav
      2) any .wav in subdir that contains the stem (fallback)
      3) final fallback: None
    """
    stem = Path(txt_filename).stem  # e.g., 'p228_065' or '228_065'
    # Common VCTK stems look like 'p228_065'. If it's numeric-only, normalize:
    parts = stem.split("_")
    if parts and not parts[0].startswith("p"):
        parts[0] = "p" + parts[0]
    norm_stem = "_".join(parts)

    cand1 = real_root / subdir / f"{norm_stem}.wav"
    if cand1.exists():
        return cand1

    # Try exactly the original stem (if it already had 'p')
    cand2 = real_root / subdir / f"{stem}.wav"
    if cand2.exists():
        return cand2

    # Fallback: search within subdir for anything containing norm_stem or the raw stem
    subdir_path = real_root / subdir
    if subdir_path.is_dir():
        for fn in os.listdir(subdir_path):
            if not fn.lower().endswith(".wav"):
                continue
            if norm_stem in fn or stem in fn:
                return subdir_path / fn

    return None

def synth_xtts(text: str, speaker_wav: Path, out_path: Path, language: str = "en"):
    """
    Synthesize with XTTS v2 using a reference speaker WAV. This is the key for high voice similarity.
    """
    # Serialize heavy GPU calls if needed
    if synth_lock:
        with synth_lock:
            tts_xtts.tts_to_file(text=text, file_path=str(out_path), speaker_wav=str(speaker_wav), language=language)
    else:
        tts_xtts.tts_to_file(text=text, file_path=str(out_path), speaker_wav=str(speaker_wav), language=language)

def synth_vctk(text: str, speaker_id: str, out_path: Path):
    """
    Fallback synthesis with VCTK-VITS multi-speaker model (uses 'p###' speakers).
    """
    if synth_lock:
        with synth_lock:
            tts_vctk.tts_to_file(text=text, speaker=speaker_id, file_path=str(out_path))
    else:
        tts_vctk.tts_to_file(text=text, speaker=speaker_id, file_path=str(out_path))

def process_one(text_path: Path, out_subdir: Path):
    """
    Process a single text file:
      - Read text
      - Resolve speaker_id from filename (for metadata/fallback)
      - Find matching real WAV
      - Prefer XTTS cloning; fallback to VCTK-VITS speaker if real WAV missing
    """
    text = text_path.read_text(encoding="utf-8").strip()
    if not text:
        return f"[SKIP] Empty text: {text_path.name}"

    # Resolve speaker id from the filename (e.g., 'p228' from 'p228_065.txt')
    raw_id = text_path.stem.split("_")[0]          # 'p228' or '228'
    speaker_id = to_vctk_id(raw_id)                # 'p228'

    # Metadata (for filename only)
    gender, age, accent = SPEAKER_INFO.get(speaker_id, ("F", "22", "Southern England"))

    # Try to find the matching real wav in REAL_AUDIO_ROOT/<subdir>/
    subdir = text_path.parent.name
    real_wav = find_matching_real_wav(REAL_AUDIO_ROOT, subdir, text_path.name)

    # Build output path (include metadata in filename)
    out_name = f"{text_path.stem}__{speaker_id}__{gender}_{age}_{accent}.wav"
    out_path = out_subdir / out_name

    # Prefer XTTS cloning if real wav exists; otherwise fallback to VCTK-VITS speaker
    if real_wav and real_wav.exists():
        msg = f"[XTTS] {text_path.name} -> clone from {real_wav.name} -> {out_name}"
        synth_xtts(text=text, speaker_wav=real_wav, out_path=out_path, language=LANGUAGE)
        return msg
    else:
        msg = f"[FALLBACK VCTK] {text_path.name} -> speaker={speaker_id} -> {out_name}"
        synth_vctk(text=text, speaker_id=speaker_id, out_path=out_path)
        return msg

# =========================
# BUILD JOBS
# =========================
jobs = []
for subdir in os.listdir(TEXT_ROOT):
    subdir_path = TEXT_ROOT / subdir
    if not subdir_path.is_dir():
        continue

    out_subdir = OUT_ROOT / subdir
    out_subdir.mkdir(parents=True, exist_ok=True)

    for fn in os.listdir(subdir_path):
        if not fn.lower().endswith(".txt"):
            continue
        jobs.append((subdir_path / fn, out_subdir))

print(f"Found {len(jobs)} text files.")

# =========================
# RUN
# =========================
if not jobs:
    print("No jobs found. Check TEXT_ROOT.")
else:
    print(f"Starting synthesis with MAX_WORKERS={MAX_WORKERS} (device={device})")
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = [ex.submit(process_one, text_path, out_dir) for (text_path, out_dir) in jobs]
        for fut in as_completed(futs):
            try:
                info = fut.result()
                print(info)
            except Exception as e:
                print("[ERROR]", repr(e))

print("Done. Fake audio saved under:", OUT_ROOT)


  re_han_default = re.compile("([\u4E00-\u9FD5a-zA-Z0-9+#&\._%\-]+)", re.U)
  re_skip_default = re.compile("(\r\n|\s)", re.U)
  re_skip = re.compile("([a-zA-Z0-9]+(?:\.\d+)?%?)")


Loading XTTS v2 model...
 > You must confirm the following:
 | > "I have purchased a commercial license from Coqui: licensing@coqui.ai"
 | > "Otherwise, I agree to the terms of the non-commercial CPML: https://coqui.ai/cpml" - [y/n]


KeyboardInterrupt: Interrupted by user

Saving the fake audio

In [None]:
# import shutil
# from google.colab import files

# folder_path = "/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/fake_wav48_xtts"
# zip_path = "/content/fake_audio.zip"
# shutil.make_archive(base_name=zip_path.replace(".zip", ""), format='zip', root_dir=folder_path)

# files.download(zip_path)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Extracting the fake audio

In [None]:
import zipfile
import os
import shutil
from google.colab import drive

# Define paths
zip_file = "/content/drive/My Drive/Colab Notebooks/fake_audio.zip"  # Path to your ZIP file
destination_folder = "/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/fake_audio"  # Where to extract selected data

# Example: define the speakers you want to extract
wanted_speakers = ["p225", "p226","p227","p228"]  # change this list as needed

# Check if the ZIP file exists
if os.path.isfile(zip_file):

    print(" ZIP file found:", zip_file)
else:
    raise FileNotFoundError(f" ZIP file not found: {zip_file}")

# Create destination folder if it doesn't exist
os.makedirs(destination_folder, exist_ok=True)

# Selectively extract only desired speaker folders from the ZIP
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    extracted_files = 0
    for file in zip_ref.namelist():
        if any(f"{spk}/" in file for spk in wanted_speakers):
            target_path = os.path.join(destination_folder, file)

            # If this entry is a directory → skip it
            if file.endswith('/'):
                os.makedirs(target_path, exist_ok=True)
                continue

            # Ensure directory structure is preserved
            os.makedirs(os.path.dirname(target_path), exist_ok=True)

            # Copy file content
            with zip_ref.open(file) as source, open(target_path, 'wb') as target:
                shutil.copyfileobj(source, target)
            extracted_files += 1

print(f"Extracted {extracted_files} files for speakers: {wanted_speakers}")


 ZIP file found: /content/drive/My Drive/Colab Notebooks/fake_audio.zip
Extracted 1342 files for speakers: ['p225', 'p226', 'p227', 'p228']


# Importing libraries

In [None]:
!nvidia-smi -L || echo "No GPU"

GPU 0: Tesla T4 (UUID: GPU-b0b0d63e-a0d9-cbd7-cb0f-09877605c050)


In [None]:
%%bash
set -euo pipefail

# --- Clone CLAD fresh (idempotent: remove existing dir if present) ---
# remove previous clone to ensure a clean edit of requirements
rm -rf CLAD
git clone https://github.com/CLAD23/CLAD.git

# --- Normalize requirements for Python 3.12 (single source of truth) ---
cp CLAD/requirements.txt CLAD/requirements.bak

# 1) Remove torch lines (Torch is installed manually for the correct CUDA wheel)
sed -i '/^torch==/d; /^torchvision==/d; /^torchaudio==/d' CLAD/requirements.txt

# 2) Core pins for Py3.12 + Numba 0.60.0 (compatible with llvmlite 0.43.0 and NumPy 1.26.4)
#    These ensure no NumPy 2.x is pulled by accident.
if grep -q '^numpy' CLAD/requirements.txt; then
  sed -i 's/^numpy==.*/numpy==1.26.4/' CLAD/requirements.txt
else
  sed -i '1i numpy==1.26.4' CLAD/requirements.txt
fi

if grep -q '^numba' CLAD/requirements.txt; then
  sed -i 's/^numba==.*/numba==0.60.0/' CLAD/requirements.txt
else
  sed -i '1i numba==0.60.0' CLAD/requirements.txt
fi

if grep -q '^llvmlite' CLAD/requirements.txt; then
  sed -i 's/^llvmlite==.*/llvmlite==0.43.0/' CLAD/requirements.txt
else
  sed -i '1i llvmlite==0.43.0' CLAD/requirements.txt
fi

# 3) Stable Matplotlib on Py3.12
if grep -q '^matplotlib' CLAD/requirements.txt; then
  sed -i 's/^matplotlib==.*/matplotlib==3.8.4/' CLAD/requirements.txt
else
  sed -i '1i matplotlib==3.8.4' CLAD/requirements.txt
fi

# 4) Librosa must satisfy coqui-tts (>=0.11.0); keep it permissive to avoid conflicts
if grep -q '^librosa' CLAD/requirements.txt; then
  sed -i 's/^librosa.*/librosa>=0.11.0/' CLAD/requirements.txt
else
  sed -i '1i librosa>=0.11.0' CLAD/requirements.txt
fi

# 5) Pin OpenCV to builds compatible with NumPy 1.26.x (avoid NumPy 2.x constraint)
#    Only modify if opencv lines exist (do not add if the project doesn't use it).
grep -q '^opencv-python' CLAD/requirements.txt && sed -i 's/^opencv-python==.*/opencv-python==4.9.0.80/' CLAD/requirements.txt || true
grep -q '^opencv-contrib-python' CLAD/requirements.txt && sed -i 's/^opencv-contrib-python==.*/opencv-contrib-python==4.9.0.80/' CLAD/requirements.txt || true

# 6) Pin spaCy/Thinc to versions that work with NumPy 1.x (only if present)
grep -q '^thinc' CLAD/requirements.txt && sed -i 's/^thinc==.*/thinc==8.2.2/' CLAD/requirements.txt || true
grep -q '^spacy' CLAD/requirements.txt && sed -i 's/^spacy==.*/spacy==3.7.4/' CLAD/requirements.txt || true

echo "===== Updated CLAD/requirements.txt ====="
sed -n '1,250p' CLAD/requirements.txt

# --- Upgrade pip to avoid resolver quirks ---
python -m pip install -U pip

# --- Install PyTorch 2.3.1 CUDA 12.1 (use CPU wheels by removing the index line if no GPU) ---
python -m pip install --index-url https://download.pytorch.org/whl/cu121 \
  torch==2.3.1 torchvision==0.18.1 torchaudio==2.3.1

# --- Install remaining CLAD dependencies from the single normalized requirements file ---
python -m pip install -r CLAD/requirements.txt

# --- System library for soundfile/librosa WAV I/O (safe to install always) ---
apt-get update -y
apt-get install -y libsndfile1

echo "===== DONE: Environment pinned for Python 3.12 ====="
python -V
python - <<'PY'
import sys, numpy, numba, llvmlite, matplotlib
import importlib
print("Python:", sys.version.split()[0])
print("NumPy:", numpy.__version__)
print("Numba:", numba.__version__)
print("llvmlite:", llvmlite.__version__)
print("Matplotlib:", matplotlib.__version__)
for m in ("torch","torchvision","torchaudio","librosa"):
    try:
        mod = importlib.import_module(m)
        print(f"{m}:", getattr(mod,"__version__", "unknown"))
    except Exception as e:
        print(f"{m}: NOT INSTALLED ({e})")
PY


===== Updated CLAD/requirements.txt =====
llvmlite==0.43.0
numba==0.60.0
librosa>=0.11.0
matplotlib==3.8.4
numpy==1.26.4
primePy==1.3
torchcontrib
pytorch_model_summary
Collecting pip
  Downloading pip-25.2-py3-none-any.whl.metadata (4.7 kB)
Downloading pip-25.2-py3-none-any.whl (1.8 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.8/1.8 MB 32.5 MB/s eta 0:00:00
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.2
Looking in indexes: https://download.pytorch.org/whl/cu121
Collecting torch==2.3.1
  Downloading https://download.pytorch.org/whl/cu121/torch-2.3.1%2Bcu121-cp312-cp312-linux_x86_64.whl (780.9 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 780.9/780.9 MB 25.7 MB/s  0:00:14
Collecting torchvision==0.18.1
  Downloading https://download.pytorch.org/whl/cu121/torchvision-0.18.1%2Bcu121-cp312-cp312-linux_x86_64.whl (7.0 MB)

Cloning into 'CLAD'...
  DEPRECATION: Building 'torchcontrib' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'torchcontrib'. Discussion can be found at https://github.com/pypa/pip/issues/6334
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
#4
#Creating a differnet folder called my_audio so we won't destroy the data created

import os, pathlib, shutil

# Destination base folders
REAL_DST = pathlib.Path("/content/my_audio/real")
TXT_DST  = pathlib.Path("/content/my_audio/txt")
FAKE_DST = pathlib.Path("/content/my_audio/fake")

REAL_DST.mkdir(parents=True, exist_ok=True)
TXT_DST.mkdir(parents=True, exist_ok=True)
FAKE_DST.mkdir(parents=True, exist_ok=True)

# Source folders
FAKE_SRC = pathlib.Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/fake_audio")
TXT_SRC  = pathlib.Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/txt")
REAL_SRC = pathlib.Path("/content/vctk_samples/VCTK-Corpus/VCTK-Corpus/wav48")

# Copy FAKE wavs while keeping speaker folders
for folder in FAKE_SRC.glob("p*"):
    speaker_dst = FAKE_DST / folder.name
    speaker_dst.mkdir(parents=True, exist_ok=True)
    for wav in folder.glob("*.wav"):
        shutil.copy(wav, speaker_dst / wav.name)

# Copy REAL wavs while keeping speaker folders
for folder in REAL_SRC.glob("p*"):
    speaker_dst = REAL_DST / folder.name
    speaker_dst.mkdir(parents=True, exist_ok=True)
    for wav in folder.glob("*.wav"):
        shutil.copy(wav, speaker_dst / wav.name)

# Copy TXT transcripts (flat, no subfolders in original)
for folder in TXT_SRC.glob("p*"):
    speaker_dst = TXT_DST / folder.name
    speaker_dst.mkdir(parents=True, exist_ok=True)
    for txt in TXT_SRC.glob("*.txt"):
        shutil.copy(txt, TXT_DST / txt.name)

print("✅ All files copied with speaker folder structure preserved!")
print("  Real :", REAL_DST)
print("  Fake :", FAKE_DST)
print("  Text :", TXT_DST)


✅ All files copied with speaker folder structure preserved!
  Real : /content/my_audio/real
  Fake : /content/my_audio/fake
  Text : /content/my_audio/txt


In [None]:
#7
# Speaker-disjoint splitter with:
# - guaranteed non-empty val
# - easy "switch speakers" controls
# - deletes old OUT folder before writing
#
# Works for both 4 speakers (auto 2/1/1) and 5+ speakers (targets 3/1/1 by speakers).

import os, shutil, random, glob, csv, itertools
from pathlib import Path
from collections import defaultdict

# ---------- CONFIG ----------
ROOT = Path("/content/my_audio")           # your current data root (real/fake/{speaker}/*.wav)
OUT  = Path("/content/my_audio_split")     # split will be (re)created here
AUDIO_EXTS = {".wav", ".flac", ".mp3", ".m4a", ".aac", ".ogg"}  # add if needed
SEED = 42
USE_SYMLINKS = True                        # False = copy files instead of symlink
REQUIRE_BOTH_CLASSES = True                # speakers must exist under BOTH real/ and fake/
# Desired speaker counts (train/val/test)
DESIRED_311 = (3, 1, 1)                    # prefer 3/1/1 when you have ≥5 speakers
FALLBACK_211 = (2, 1, 1)                   # for 4 speakers, this is the safe split
# >>> Force specific speakers into splits (edit these to "switch")
TRAIN_FORCE = set()                        # e.g., {"p226","p227","p228"}
VAL_FORCE   = set()                        # e.g., {"p225"}
TEST_FORCE  = set()                        # e.g., {"p229"}
# --------------------------------------

random.seed(SEED)

def list_speakers(root, cls):
    base = root/cls
    if not base.exists(): return []
    return sorted([d.name for d in base.iterdir() if d.is_dir() and d.name != "txt"])

def list_audio(dirpath):
    return sorted([p for p in dirpath.rglob("*")
                   if p.is_file() and p.suffix.lower() in AUDIO_EXTS])

# 1) Find eligible speakers (present and non-empty under both classes if required)
real_spk = set(list_speakers(ROOT, "real"))
fake_spk = set(list_speakers(ROOT, "fake"))
if REQUIRE_BOTH_CLASSES:
    eligible = sorted(real_spk & fake_spk)
else:
    eligible = sorted(real_spk | fake_spk)

def nonempty_both(s):
    if not REQUIRE_BOTH_CLASSES:  # just need at least one side non-empty
        return (len(list_audio(ROOT/"real"/s)) + len(list_audio(ROOT/"fake"/s))) > 0
    return len(list_audio(ROOT/"real"/s)) > 0 and len(list_audio(ROOT/"fake"/s)) > 0

eligible = [s for s in eligible if nonempty_both(s)]
n_spk = len(eligible)
if n_spk < 2:
    raise RuntimeError(f"Need ≥2 eligible speakers, found {n_spk}: {eligible}")

# 2) Choose target split sizes by number of speakers
if n_spk >= sum(DESIRED_311):
    N_TRAIN, N_VAL, N_TEST = DESIRED_311   # 3/1/1
else:
    # With 4 speakers, 2/1/1 is the right shape to keep val+test non-empty
    N_TRAIN, N_VAL, N_TEST = FALLBACK_211  # 2/1/1

# 3) Validate FORCE sets and fill remaining slots
forced = TRAIN_FORCE | VAL_FORCE | TEST_FORCE
if forced:
    missing = forced - set(eligible)
    if missing:
        raise RuntimeError(f"Forced speakers not found/eligible: {sorted(missing)}")
    overlap = (TRAIN_FORCE & VAL_FORCE) | (TRAIN_FORCE & TEST_FORCE) | (VAL_FORCE & TEST_FORCE)
    if overlap:
        raise RuntimeError(f"Forced sets overlap: {sorted(overlap)}")

# file counts (use 'real' side as proxy for per-speaker volume)
spk_counts = {s: len(list_audio(ROOT/"real"/s)) for s in eligible}
total_files = sum(spk_counts.values())

def pick_k_closest(candidates, k, target_share):
    """Pick k speakers whose file-count sum is closest to target_share (in files)."""
    if k <= 0: return set()
    if len(candidates) <= k: return set(candidates)
    best, gap = None, float("inf")
    for combo in itertools.combinations(candidates, k):
        share = sum(spk_counts[s] for s in combo)
        g = abs(share - target_share)
        if g < gap:
            gap, best = g, set(combo)
    return best

# Start with forced
train_set, val_set, test_set = set(TRAIN_FORCE), set(VAL_FORCE), set(TEST_FORCE)
remaining = [s for s in eligible if s not in (train_set | val_set | test_set)]

need_train = max(0, N_TRAIN - len(train_set))
need_val   = max(0, N_VAL   - len(val_set))
need_test  = max(0, N_TEST  - len(test_set))

# Target train file share (rough guideline): ~60% if 3/1/1, ~50% if 2/1/1
target_train_share = 0.60*total_files if (N_TRAIN, N_VAL, N_TEST) == DESIRED_311 else 0.50*total_files

# Fill TRAIN first to hit the share as best as possible
if need_train > 0:
    add = pick_k_closest(remaining, need_train, target_train_share - sum(spk_counts[s] for s in train_set))
    train_set |= add
    remaining = [s for s in remaining if s not in add]

# Fill VAL with lighter speakers (to keep val/test similar size)
if need_val > 0:
    remaining.sort(key=lambda s: spk_counts[s])  # lightest first
    add = set(remaining[:need_val])
    val_set |= add
    remaining = remaining[need_val:]

# Fill TEST with the rest needed
if need_test > 0:
    add = set(remaining[:need_test])
    test_set |= add
    remaining = remaining[need_test:]

# Final sanity: exact sizes, disjointness
if not (len(train_set) == N_TRAIN and len(val_set) == N_VAL and len(test_set) == N_TEST):
    raise RuntimeError(f"Final sizes must be {N_TRAIN}/{N_VAL}/{N_TEST}, got {len(train_set)}/{len(val_set)}/{len(test_set)}")
if not (train_set.isdisjoint(val_set) and train_set.isdisjoint(test_set) and val_set.isdisjoint(test_set)):
    raise RuntimeError("Splits are not disjoint by speakers.")

print("Eligible speakers:", eligible)
print("Chosen split (by speakers):")
print("  train:", sorted(train_set))
print("  val  :", sorted(val_set))
print("  test :", sorted(test_set))
print("By-file shares (train/val/test):",
      round(sum(spk_counts[s] for s in train_set)/total_files, 3),
      round(sum(spk_counts[s] for s in val_set)/total_files, 3),
      round(sum(spk_counts[s] for s in test_set)/total_files, 3))

# 4) DELETE OLD OUT (so the previous no-val split is removed), then rebuild
if OUT.exists():
    shutil.rmtree(OUT)
for split in ["train", "val", "test"]:
    for cls in ["real", "fake"]:
        (OUT/split/cls).mkdir(parents=True, exist_ok=True)

def which_split(speaker):
    if speaker in train_set: return "train"
    if speaker in val_set:   return "val"
    return "test"

# 5) Materialize split (symlink/copy) + manifest
rows, counts = [], defaultdict(int)
for cls in ["real", "fake"]:
    base = ROOT/cls
    for sp in (train_set | val_set | test_set):
        src_dir = base/sp
        dst_dir = OUT/which_split(sp)/cls/sp
        dst_dir.mkdir(parents=True, exist_ok=True)
        for src in list_audio(src_dir):
            dst = dst_dir/src.name
            if dst.exists():
                try: dst.unlink()
                except: pass
            if USE_SYMLINKS:
                try:
                    os.symlink(src.resolve(), dst)
                except FileExistsError:
                    pass
            else:
                shutil.copy2(src, dst)
            rows.append({
                "split": which_split(sp),
                "speaker": sp,
                "label": 0 if cls=="real" else 1,
                "src_path": str(src.resolve()),
                "dst_path": str(dst.resolve())
            })
            counts[(which_split(sp), cls)] += 1

manifest_csv = OUT/"manifest.csv"
with open(manifest_csv, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["split","speaker","label","src_path","dst_path"])
    writer.writeheader(); writer.writerows(rows)

print("\nManifest:", manifest_csv)
print("Counts per split/class:")
for split in ["train","val","test"]:
    for cls in ["real","fake"]:
        print(f"  {split:5s} {cls:4s}: {counts[(split, cls)]}")

# Extra safety: show speaker overlap (should be empty)
print("\nOverlap checks (should be empty):")
print("  train ∩ val :", train_set & val_set)
print("  train ∩ test:", train_set & test_set)
print("  val   ∩ test:", val_set & test_set)


Eligible speakers: ['p225', 'p226', 'p227', 'p228']
Chosen split (by speakers):
  train: ['p225', 'p227']
  val  : ['p226']
  test : ['p228']
By-file shares (train/val/test): 0.462 0.265 0.273

Manifest: /content/my_audio_split/manifest.csv
Counts per split/class:
  train real: 620
  train fake: 620
  val   real: 356
  val   fake: 356
  test  real: 366
  test  fake: 366

Overlap checks (should be empty):
  train ∩ val : set()
  train ∩ test: set()
  val   ∩ test: set()


In [None]:
#6
# Choose ONE of the two options below:

USE_LIBROSA = True   # set to False to use scipy.io.wavfile only

if USE_LIBROSA:
    # Librosa path: convenient resample-to-16k + mono in one call
    !pip -q install librosa soundfile
else:
    # Scipy path: no extra system libs; we will do a small numpy resample
    !pip -q install scipy

In [None]:
# =========================================
# Preprocess audio with torchaudio: 16kHz mono + pad/trim to 64600
# =========================================
from pathlib import Path
import torchaudio, torch, torch.nn.functional as F

TARGET_SR = 16000
TARGET_LEN = 64600  # ~4 seconds at 16kHz

def preprocess_wav_torch(in_path: Path):
    wav, sr = torchaudio.load(str(in_path))   # [C,T]
    wav = wav.float()
    if wav.shape[0] > 1:
        wav = wav.mean(dim=0, keepdim=True)   # [1,T]
    if sr != TARGET_SR:
        res = torchaudio.transforms.Resample(orig_freq=sr, new_freq=TARGET_SR)
        wav = res(wav)                         # [1,T']
    T = wav.shape[-1]
    if T < TARGET_LEN:
        wav = F.pad(wav, (0, TARGET_LEN - T))
    else:
        wav = wav[..., :TARGET_LEN]
    return wav.squeeze(0), TARGET_SR          # [T], 16000

for split in ["train", "val", "test"]:
    split_dir = Path("/content/my_audio_split") / split
    for cls in ["real", "fake"]:
        for wav_path in (split_dir/cls).rglob("*.wav"):
            wav, sr = preprocess_wav_torch(wav_path)
            torchaudio.save(str(wav_path), wav.unsqueeze(0), sr)  # overwrite in place

print("✅ All audio preprocessed to 16kHz and padded/clipped to 64600 samples (torchaudio)")


  def dispatcher(
  s = torchaudio.io.StreamReader(src, format, None, buffer_size)
  s = torchaudio.io.StreamWriter(uri, format=muxer, buffer_size=buffer_size)


✅ All audio preprocessed to 16kHz and padded/clipped to 64600 samples (torchaudio)


In [None]:
# === DIAG A: בדיקת סביבה בסיסית ===
import numpy as np, scipy, torch, torchaudio, platform
print("python:", platform.python_version())
print("numpy :", np.__version__)
print("scipy :", scipy.__version__)
print("torch :", torch.__version__)
print("torchaudio:", torchaudio.__version__)

# מבחן ABI ל-numpy.random (אם זה נופל -> ABI שבור)
from numpy.random import RandomState
_ = RandomState(0)
print("✅ numpy.random ABI OK")


python: 3.12.12
numpy : 1.26.4
scipy : 1.13.1
torch : 2.8.0+cu126
torchaudio: 2.8.0+cu126
✅ numpy.random ABI OK


In [None]:
# 8
import torch
from pathlib import Path
import os
os.chdir('/content/CLAD')

from Model import MoCo_v2, RawNetEncoderBaseline

# Step 1: Define the RawNet encoder configuration
d_args = {
    "in_channels": 1,
    "first_conv": 251,
    "filts": [
        128,  # output channels for sinc conv
        [128, 128],  # block0 and block1
        [128, 256],  # block2
        [256, 256]   # block3-5
    ],
    "nb_fc_node": 1024,
    "gru_node": 1024,
    "nb_gru_layer": 3,
    "nb_classes": 2
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Step 2: Create both encoders
encoder_q = RawNetEncoderBaseline(d_args, device)
encoder_k = RawNetEncoderBaseline(d_args, device)

# Step 3: Create the MoCo_v2 model
model = MoCo_v2(
    encoder_q=encoder_q,
    encoder_k=encoder_k,
    queue_feature_dim=1024,  # matches encoder output
    mlp=True,
    return_q=True
)

# Step 4: Load pretrained weights
ckpt_path = Path("pretrained_models/CLAD_150_10_2310.pth.tar")
if not ckpt_path.exists():
    raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

ckpt = torch.load(ckpt_path, map_location='cpu')
state_dict = ckpt.get("state_dict", ckpt)

missing_keys, unexpected_keys = model.load_state_dict(state_dict, strict=False)
print("Missing keys:", missing_keys)
print("Unexpected keys:", unexpected_keys)

model.to(device)
model.eval()

print("CLAD model loaded and ready.")

SR = 16000
CLIP_SECONDS = 4.0
N_SAMPLES = int(SR * CLIP_SECONDS)  # = 64000

with torch.no_grad():
    dummy = torch.randn(2, N_SAMPLES, device=device)  # (B,T)
    try:
        f = encoder_q(dummy)              # try (B,T)
    except Exception:
        f = encoder_q(dummy.unsqueeze(1)) # fallback (B,1,T)

    # Pool any extra time/freq dims so we have (B,D)
    if f.dim() == 3:
        f = f.mean(dim=2)                 # (B,D)
    elif f.dim() > 3:
        f = f.mean(dim=tuple(range(2, f.dim())))  # reduce to (B,D)

    print("Encoder output shape:", tuple(f.shape))
    D = f.shape[1]
    print("Probed feature dim D =", D)




Missing keys: ['encoder_q.first_bn.weight', 'encoder_q.first_bn.bias', 'encoder_q.first_bn.running_mean', 'encoder_q.first_bn.running_var', 'encoder_q.block0.0.conv1.weight', 'encoder_q.block0.0.conv1.bias', 'encoder_q.block0.0.bn2.weight', 'encoder_q.block0.0.bn2.bias', 'encoder_q.block0.0.bn2.running_mean', 'encoder_q.block0.0.bn2.running_var', 'encoder_q.block0.0.conv2.weight', 'encoder_q.block0.0.conv2.bias', 'encoder_q.block1.0.bn1.weight', 'encoder_q.block1.0.bn1.bias', 'encoder_q.block1.0.bn1.running_mean', 'encoder_q.block1.0.bn1.running_var', 'encoder_q.block1.0.conv1.weight', 'encoder_q.block1.0.conv1.bias', 'encoder_q.block1.0.bn2.weight', 'encoder_q.block1.0.bn2.bias', 'encoder_q.block1.0.bn2.running_mean', 'encoder_q.block1.0.bn2.running_var', 'encoder_q.block1.0.conv2.weight', 'encoder_q.block1.0.conv2.bias', 'encoder_q.block2.0.bn1.weight', 'encoder_q.block2.0.bn1.bias', 'encoder_q.block2.0.bn1.running_mean', 'encoder_q.block2.0.bn1.running_var', 'encoder_q.block2.0.conv

In [None]:
# Overfit a single batch to sanity-check gradients
# Grab one batch
# The content of this cell has been moved to cell LLtOZnD1rsQx to resolve the NameError.

In [None]:
import torchaudio
from pathlib import Path

def check_audio_integrity(root_dir):
    root = Path(root_dir)
    total_ok = 0
    total_err = 0
    for p in root.rglob("*.wav"):
        try:
            # Load the waveform
            metadata = torchaudio.info(str(p))
            if metadata.sample_rate != 16000:
                print(f"❌ Wrong SR: {p.name} at {metadata.sample_rate}Hz")
                total_err += 1
            if metadata.num_channels != 1:
                print(f"❌ Not Mono: {p.name} has {metadata.num_channels} channels")
                total_err += 1
            if metadata.num_frames != 64600:
                print(f"❌ Wrong Length: {p.name} has {metadata.num_frames} frames")
                total_err += 1
            total_ok += 1
        except Exception as e:
            print(f"❌ Failed to load {p.name}: {e}")
            total_err += 1

    print(f"\nSummary: {total_ok} files OK, {total_err} files failed checks.")

print("Checking Train Split:")
check_audio_integrity("/content/my_audio_split/train")
print("\nChecking Validation Split:")
check_audio_integrity("/content/my_audio_split/val")

Checking Train Split:


  metadata = torchaudio.info(str(p))
  s = torchaudio.io.StreamReader(src, format, None, buffer_size)
  return AudioMetaData(



Summary: 1240 files OK, 0 files failed checks.

Checking Validation Split:

Summary: 712 files OK, 0 files failed checks.


In [None]:
# ... (קוד טעינת המודל וה-DataLoader)

# ודא שהמקודד קפוא לחלוטין
frozen_count = sum(1 for p in encoder_q.parameters() if not p.requires_grad)
total_count = sum(1 for p in encoder_q.parameters())
if frozen_count != total_count:
    print(f"⚠️ ATTENTION: Only {frozen_count}/{total_count} parameters are frozen in encoder_q! Check your loop.")
else:
    print("✅ Encoder is fully frozen.")

# ... (המשך האימון)

⚠️ ATTENTION: Only 0/80 parameters are frozen in encoder_q! Check your loop.


In [None]:
#1 =========================================
# Fine-tuning: linear classifier on top of FROZEN encoder_q
# (RawNet encoder_q expects input shape [B, T], not [B,1,T]!)
# =========================================
from pathlib import Path
import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchaudio
import torch.nn.functional as F
import os
import numpy as np
from sklearn.metrics import classification_report
from tqdm import tqdm # שימוש ב-tqdm למעקב

# 1. בדיקת סביבה (Environment Check)
print("--- 1. בדיקת סביבה ומודל ---")
os.chdir('/content/CLAD')
if not Path('Model.py').exists():
    raise FileNotFoundError("ERROR: Model.py not found in /content/CLAD. Did the cloning step (#15) fail?")

from Model import MoCo_v2, RawNetEncoderBaseline

# Step 1: Define the RawNet encoder configuration
d_args = {
    "in_channels": 1,
    "first_conv": 251,
    "filts": [128, [128, 128], [128, 256], [256, 256]],
    "nb_fc_node": 1024,
    "gru_node": 1024,
    "nb_gru_layer": 3,
    "nb_classes": 2
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Step 2: Create both encoders and MoCo_v2 model
encoder_q = RawNetEncoderBaseline(d_args, device)
encoder_k = RawNetEncoderBaseline(d_args, device)
model = MoCo_v2(encoder_q=encoder_q, encoder_k=encoder_k, queue_feature_dim=1024, mlp=True, return_q=True)

# Step 3: Load pretrained weights
ckpt_path = Path("pretrained_models/CLAD_150_10_2310.pth.tar")
if not ckpt_path.exists():
    raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

ckpt = torch.load(ckpt_path, map_location='cpu')
state_dict = ckpt.get("state_dict", ckpt)
missing_keys, unexpected_keys = model.load_state_dict(state_dict, strict=False)
print("CLAD model loaded and ready.")


# -------- Config --------
TARGET_SR  = 16000
TARGET_LEN = 64600
LABELS     = {"real": 0, "fake": 1}
BATCH_SIZE = 16
EPOCHS     = 8
# LR מתוקן
LR         = 1e-2

# -------- Dataset: returns [T] (mono 16k, fixed length) --------
class AudioDataset(Dataset):
    def __init__(self, root_dir: str):
        self.samples = []
        root = Path(root_dir)
        for cls in ("real", "fake"):
            base = root / cls
            if not base.exists():
                raise FileNotFoundError(f"Missing directory: {base}")
            for p in base.rglob("*.wav"):
                if p.is_file():
                    self.samples.append((p, LABELS[cls]))
        if len(self.samples) == 0:
            raise ValueError(f"No .wav files found under {root_dir}. Ensure split+preprocess ran correctly.")
        self._resamplers = {} # לא נשתמש בו, אבל משאירים ליתר בטחון

    def _fix(self, path: Path) -> torch.Tensor:
        # בגלל ה-preprocess שרץ קודם, קבצי ה-.wav אמורים להיות כבר 16k, mono ובאורך 64600.
        # טוענים אותם כמות שהם.
        wav, sr = torchaudio.load(str(path))  # [C,T]
        wav = wav.float()

        # ⭐️ בדיקה 2.1: אימות נתונים פנימי של ה-Dataset
        if wav.dim() == 2 and wav.shape[0] != 1:
             wav = wav.mean(dim=0, keepdim=True) # אם לא mono, נהפוך ל-mono
        if sr != TARGET_SR:
            print(f"⚠️ Warning: SR mismatch on {path.name}. Actual {sr}Hz.")

        return wav.squeeze(0) # -> [T]

    def __len__(self): return len(self.samples)
    def __getitem__(self, idx):
        path, label = self.samples[idx]
        return self._fix(path), label                 # ([T], label)

# -------- DataLoaders --------
train_ds = AudioDataset("/content/my_audio_split/train")
val_ds   = AudioDataset("/content/my_audio_split/val")
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  drop_last=False)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

# -------- Freeze encoder --------
encoder_q.to(device).eval()
for p in encoder_q.parameters():
    p.requires_grad = False
encoder_q.eval().to(device)

# --- 2. בדיקת טעינת נתונים והפיצ'רים (Feature Extraction) ---
print("\n--- 2. בדיקת פיצ'רים (Feature Extraction) ---")
# א. אימות גודל ה-Batch וצורת ה-WAV
wavs0, labels0 = next(iter(train_loader))     # wavs0: [B, T]
print(f"2.1. Batch Shape: {tuple(wavs0.shape)} (Expected: [B, T])")
print(f"2.2. Labels: {labels0[:5]} (Expected: [0, 1, 0, 1, ...])")

# ב. בדיקת חילוץ פיצ'רים (Encoder Forward)
with torch.no_grad():
    x0 = wavs0.to(device).float()               # [B, T]
    feat0 = encoder_q(x0)                       # RawNet expects [B, T]

    # Pool any extra time/freq dims
    if feat0.dim() == 3:
        feat0 = feat0.mean(dim=2)
    elif feat0.dim() > 3:
        feat0 = feat0.mean(dim=tuple(range(2, feat0.dim())))
    D = feat0.shape[1]

# ג. אימות גודל הפיצ'רים והערכים
print(f"2.3. Feature Shape: {tuple(feat0.shape)} (Expected: [{BATCH_SIZE}, {D}])")
print(f"2.4. Max Feature Value: {feat0.abs().max().item():.4f}")
print(f"2.5. Feature Mean (Batch): {feat0.mean().item():.4f}")
if D != 1024:
    print(f"⚠️ WARNING: Probed feature dim D={D} is unexpected (Expected 1024)")


# --- 3. בדיקת Overfit (Linear Probe Sanity Check) ---
print("\n--- 3. בדיקת Overfit (חייב להצליח) ---")
# נורמליזציה של הפיצ'רים לפני ה-Probe
f_norm = (feat0 - feat0.mean(dim=0, keepdim=True)) / (feat0.std(dim=0, keepdim=True) + 1e-5)

probe = nn.Sequential(
    torch.nn.Linear(D, 128),
    torch.nn.ReLU(),
    torch.nn.Linear(128, 2)
).to(device)
opt  = torch.optim.AdamW(probe.parameters(), lr=1e-2, weight_decay=0.0)
crit = torch.nn.CrossEntropyLoss()
y = labels0.to(device).long()

for step in range(1000):
    logits = probe(f_norm)
    loss = crit(logits, y)
    opt.zero_grad(); loss.backward(); opt.step()
    if (step+1) % 500 == 0:
        acc = (logits.argmax(1)==y).float().mean().item()
        # אם ה-Overfit הזה עדיין לא עובד, יש בעיה קריטית ב-RawNetEncoderBaseline/MoCo_v2
        print(f"  step {step+1}: loss {loss.item():.4f}, acc {acc:.3f}")

final_acc = (logits.argmax(1)==y).float().mean().item()
print(f"3.1. Final overfit-batch acc: {final_acc}")
if final_acc < 0.95:
    print("❌ CRITICAL ERROR: Batch overfit failed. Encoder output might be zero/garbage.")


# --- 4. אימון וולידציה (האימון הראשי) ---
print("\n--- 4. אימון וולידציה (עם תיקונים) ---")
# הגדרת ה-Classifier וה-Optimizer מחדש
classifier = nn.Sequential(
    nn.Linear(D, 64),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(64, 2)
).to(device)
criterion = nn.CrossEntropyLoss()
# שימוש ב-LR המתוקן
optimizer = torch.optim.AdamW(classifier.parameters(), lr=LR, weight_decay=1e-4)


for epoch in range(1, EPOCHS + 1):
    # ---- Train ----
    classifier.train()
    tr_loss = tr_correct = tr_total = 0.0

    # השתמש ב-tqdm כדי לראות התקדמות
    pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{EPOCHS} Train", leave=False)
    for wavs, labels in pbar:
        x = wavs.to(device).float()          # [B, T]
        y = labels.to(device).long()         # 0=real, 1=fake

        with torch.no_grad():
            f = encoder_q(x)
            if f.dim() == 3:
                f = f.mean(dim=2)
            elif f.dim() > 3:
                f = f.mean(dim=tuple(range(2, f.dim())))

        # ⭐️ תיקון: נורמליזציה של הפיצ'רים לפני ה-Classifier
        f = (f - f.mean(dim=0, keepdim=True)) / (f.std(dim=0, keepdim=True) + 1e-5)

        logits = classifier(f)
        loss = criterion(logits, y)

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        tr_loss   += loss.item() * x.size(0)
        tr_correct += (logits.argmax(dim=1) == y).sum().item()
        tr_total  += x.size(0)
        pbar.set_postfix({'Acc': f'{tr_correct/tr_total:.3f}', 'Loss': f'{tr_loss/tr_total:.4f}'})

    # ---- Val ----
    classifier.eval()
    va_loss, va_correct, va_total = 0.0, 0, 0
    with torch.no_grad():
      for wavs, labels in val_loader:
          x = wavs.to(device).float()
          y = labels.to(device).long()
          f = encoder_q(x)
          if f.dim() == 3:
              f = f.mean(dim=2)
          elif f.dim() > 3:
              f = f.mean(dim=tuple(range(2, f.dim())))

          # ⭐️ תיקון: נורמליזציה של הפיצ'רים בוולידציה
          f = (f - f.mean(dim=0, keepdim=True)) / (f.std(dim=0, keepdim=True) + 1e-5)

          logits = classifier(f)
          va_loss   += criterion(logits, y).item() * x.size(0)
          va_correct += (logits.argmax(dim=1) == y).sum().item()
          va_total  += x.size(0)

    print(f"Epoch {epoch}/{EPOCHS} | "
          f"Train Loss: {tr_loss/tr_total:.4f}, Acc: {tr_correct/tr_total:.3f} | "
          f"Val Loss: {va_loss/va_total:.4f}, Acc: {va_correct/va_total:.3f}")

--- 1. בדיקת סביבה ומודל ---
CLAD model loaded and ready.

--- 2. בדיקת פיצ'רים (Feature Extraction) ---
2.1. Batch Shape: (16, 64600) (Expected: [B, T])


ImportError: cannot import name '_CAFFE2_ATEN_FALLBACK' from 'torch._C._onnx' (unknown location)

In [None]:
#2 =========================================
# Evaluate model on validation/test sets
# =========================================
from sklearn.metrics import classification_report
import torch
from pathlib import Path
from torch.utils.data import DataLoader

def evaluate(loader, split_name="val", model=None, classifier=None, device=None):
    if model is None or classifier is None or device is None:
        raise ValueError("model, classifier, and device must be provided to evaluate function.")

    # Ensure classifier is on the correct device
    classifier.to(device)
    classifier.eval()

    all_preds, all_labels = [], []
    with torch.no_grad():
        for wavs, labels in loader:
            # Ensure data is on the correct device
            wavs, labels = wavs.to(device), labels.to(device)

            # Use the encoder from the loaded model
            feats = model.encoder_q(wavs)
            # Pool any extra time/freq dims so we have (B,D)
            if feats.dim() == 3:
                feats = feats.mean(dim=2)
            elif feats.dim() > 3:
                feats = feats.mean(dim=tuple(range(2, feats.dim())))

            preds = classifier(feats)
            all_preds.extend(torch.argmax(preds, dim=1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    print(f"=== {split_name.upper()} RESULTS ===")
    print(classification_report(all_labels, all_preds, target_names=["real","fake"]))

# Run evaluation
# Pass the necessary objects to the evaluate function
evaluate(val_loader, "val", model=model, classifier=classifier, device=device)

# Re-create test_loader as it might not be defined in the current runtime
test_ds = AudioDataset("/content/my_audio_split/test")
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE)
evaluate(test_loader, "test", model=model, classifier=classifier, device=device)

In [None]:
# 12
import torch
import torchaudio
import numpy as np
import torch.nn.functional as F
from pathlib import Path

# Set device (important!)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Function to load audio as 16k mono
def load_audio_16k_mono(path: Path):
    """Load an audio file as mono 16kHz."""
    waveform, sr = torchaudio.load(str(path))
    if waveform.shape[0] > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    waveform = waveform.squeeze().numpy()
    if sr != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
        waveform = resampler(torch.tensor(waveform).unsqueeze(0)).squeeze().numpy()
    return waveform, 16000

# Function to run prediction
def predict_file(path: Path):
    """Run CLAD on one WAV file. Returns (pred_label, fake_conf, real_conf)."""
    y, sr = load_audio_16k_mono(path)
    x = torch.from_numpy(y).float().unsqueeze(0).unsqueeze(1).to(device)  # [B=1, C=1, T]
    with torch.no_grad():
        logits = model(x)  # expected shape [B, 2] for [real, fake]
        probs  = F.softmax(logits, dim=1)[0].detach().cpu().numpy()
    label = "fake" if int(np.argmax(probs)) == 1 else "real"
    fake_conf = float(probs[1])
    real_conf = float(probs[0])
    return label, fake_conf, real_conf


In [None]:
#13
# This cell scans your external real/fake folders, runs CLAD, and writes a CSV with results.
# It never writes inside the CLAD repo.
import torch
from pathlib import Path
import os
# os.chdir('/content/CLAD') # No need to change directory here for prediction

import glob
import pandas as pd
import torch.nn.functional as F

# Assuming 'model', 'classifier', and 'device' are defined in previous cells
# (Specifically, 'model' from loading CLAD and 'classifier' from fine-tuning)
if 'model' not in globals() or 'classifier' not in globals() or 'device' not in globals():
     raise RuntimeError("CLAD model, classifier, or device not found. Please run the preceding cells first.")

# Ensure model and classifier are on the correct device and in eval mode
model.to(device).eval()
classifier.to(device).eval()

# Function to load audio as 16k mono and preprocess
def load_and_preprocess_audio(path: Path):
    """Load an audio file, resample to 16kHz mono, pad/trim to TARGET_LEN."""
    wav, sr = torchaudio.load(str(path))   # [C,T]
    wav = wav.float()
    if wav.shape[0] > 1:
        wav = wav.mean(dim=0, keepdim=True)   # [1,T]
    if sr != TARGET_SR:
        res = torchaudio.transforms.Resample(orig_freq=sr, new_freq=TARGET_SR)
        wav = res(wav)                         # [1,T']
    T = wav.shape[-1]
    if T < TARGET_LEN:
        wav = F.pad(wav, (0, TARGET_LEN - T))
    else:
        wav = wav[..., :TARGET_LEN]
    return wav.squeeze(0) # -> [T]


# Function to run prediction
def predict_file(path: Path, encoder, classifier, device):
    """Run CLAD encoder + classifier on one WAV file. Returns (pred_label, fake_conf, real_conf)."""
    try:
        wav = load_and_preprocess_audio(path) # [T]
        x = wav.unsqueeze(0).to(device)       # [B=1, T]

        with torch.no_grad():
            # Use only the encoder part of the MoCo model
            feats = encoder(x) # RawNet expects [B, T]
            # Pool any extra time/freq dims if necessary
            if feats.dim() == 3:
                feats = feats.mean(dim=2)
            elif feats.dim() > 3:
                 feats = feats.mean(dim=tuple(range(2, feats.dim())))

            # Pass features through the classifier
            logits = classifier(feats)        # [B=1, 2]
            probs  = F.softmax(logits, dim=1)[0].detach().cpu().numpy()

        label = "fake" if int(np.argmax(probs)) == 1 else "real"
        fake_conf = float(probs[1])
        real_conf = float(probs[0])
        return label, fake_conf, real_conf, feats.squeeze(0).cpu().numpy() # Also return embedding

    except Exception as e:
        print(f"Error processing file {path}: {e}")
        return "error", 0.0, 0.0, None # Return None for embedding on error


# Function to extract embedding (optional)
def embed_file(path: Path, encoder, device):
    """Extract embedding from CLAD encoder for one WAV file."""
    try:
        wav = load_and_preprocess_audio(path) # [T]
        x = wav.unsqueeze(0).to(device)       # [B=1, T]
        with torch.no_grad():
            feats = encoder(x) # RawNet expects [B, T]
            # Pool any extra time/freq dims if necessary
            if feats.dim() == 3:
                feats = feats.mean(dim=2)
            elif feats.dim() > 3:
                 feats = feats.mean(dim=tuple(range(2, feats.dim())))
        return feats.squeeze(0).cpu().numpy()
    except Exception as e:
        print(f"Error embedding file {path}: {e}")
        return None


REAL_DIR = Path("/content/my_audio/real")
FAKE_DIR = Path("/content/my_audio/fake")
OUT_DIR  = Path("/content/my_audio_results")
OUT_DIR.mkdir(parents=True, exist_ok=True)

real_files = sorted(REAL_DIR.rglob("*.wav"))
fake_files = sorted(FAKE_DIR.rglob("*.wav"))

print(f"Found {len(real_files)} real and {len(fake_files)} fake WAVs.")

rows = []
for f in real_files + fake_files:
    p = Path(f)
    # Pass encoder_q, classifier, and device to the prediction function
    pred, fake_conf, real_conf, emb = predict_file(p, model.encoder_q, classifier, device)
    rows.append({
        "file": str(p),
        "pred": pred,
        "fake_conf": fake_conf,
        "real_conf": real_conf,
        "embedding_dim": (len(emb) if emb is not None else None),
        "embedding": emb.tolist() if emb is not None else None # Convert numpy array to list for CSV
    })

df = pd.DataFrame(rows)
csv_path = OUT_DIR / "clad_results.csv"
df.to_csv(csv_path, index=False)
print("Saved CSV:", csv_path)
display(df.head())

In [None]:
from matplotlib import pyplot as plt
_df_0['index'].plot(kind='hist', bins=20, title='index')
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
#14
# This cell is optional. It visualizes embeddings in 2D if they were extracted.
# If embedding is None for all files (no encoder exposed), skip this cell.

import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

emb_rows = [r for r in rows if r["embedding"] is not None]
if len(emb_rows) >= 2:
    X = np.vstack([np.array(r["embedding"], dtype=np.float32) for r in emb_rows])
    y = np.array([r["pred"] for r in emb_rows])

    X2d = TSNE(n_components=2, random_state=0, perplexity=min(15, len(emb_rows)-1)).fit_transform(X)

    plt.figure(figsize=(6,5))
    for cls, marker in [("real", "o"), ("fake", "x")]:
        mask = (y == cls)
        plt.scatter(X2d[mask,0], X2d[mask,1], label=cls, marker=marker, alpha=0.85)
    plt.title("CLAD embeddings (t-SNE)")
    plt.legend()
    plt.tight_layout()
    plt.show()
else:
    print("No embeddings available (encoder not exposed or too few samples).")