# DDD Colab Training (Declip, 48k Mono)

This notebook downloads your MEGA dataset, validates mono/48k audio, creates train/valid/test splits, and trains the DDD model (HiFiGAN solver).
Checkpoints and inference outputs are stored in Google Drive under the run name.


## Mount Drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')


## Config


In [None]:
import os
from pathlib import Path

RUN_NAME = "ddd_48k_run1"
DRIVE_BASE_DIR = "/content/drive/MyDrive/DeClip Data/runs"
REPO_URL = "https://github.com/a-n-t-h-o-n-y/DDD.git"
REPO_DIR = "/content/DDD"
BRANCH = "main"

MEGA_LINK = "https://mega.nz/file/6hsn1ajJ#n99Hn-6HZhEjMzPE6mzs1UdhxgpVd9XD83PWIxS6MXw"
DATA_ZIP_NAME = "training_data.zip"
LOCAL_DATA_ROOT = "/content/data_raw"
LOCAL_DATA_DIR = f"{LOCAL_DATA_ROOT}/audio"

SEED = 1337
TRAIN_RATIO = 0.90
VALID_RATIO = 0.05
TEST_RATIO = 0.05

SAMPLE_RATE = 48000
SAVE_EVERY = 10

RUN_DIR = os.path.join(DRIVE_BASE_DIR, RUN_NAME)
CHECKPOINT_DIR = os.path.join(RUN_DIR, "checkpoints")
INFER_DIR = os.path.join(RUN_DIR, "infer")
EGS_DIR = os.path.join("/content/egs", RUN_NAME)

Path(CHECKPOINT_DIR).mkdir(parents=True, exist_ok=True)
Path(INFER_DIR).mkdir(parents=True, exist_ok=True)
Path(EGS_DIR).mkdir(parents=True, exist_ok=True)
print("Run dir:", RUN_DIR)


## Install Dependencies


In [None]:
import os
import subprocess

subprocess.check_call(["apt-get", "update", "-y"])
subprocess.check_call(["apt-get", "install", "-y", "megatools"])

os.chdir(REPO_DIR)
subprocess.check_call(["pip", "install", "-r", "requirements.txt", "-f", "https://download.pytorch.org/whl/torch_stable.html", "--no-cache-dir"])


## Download Dataset (MEGA)


In [None]:
from pathlib import Path
import subprocess

zip_path = Path(LOCAL_DATA_ROOT) / DATA_ZIP_NAME
Path(LOCAL_DATA_ROOT).mkdir(parents=True, exist_ok=True)

if not zip_path.exists():
    subprocess.check_call(["megadl", "--path", str(zip_path), MEGA_LINK])
else:
    print("Zip already exists:", zip_path)


## Extract Dataset


In [None]:
import zipfile
from pathlib import Path

Path(LOCAL_DATA_DIR).mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zf:
    zf.extractall(LOCAL_DATA_DIR)
print("Extracted to:", LOCAL_DATA_DIR)


## Scan Wav Files


In [None]:
from pathlib import Path

wav_paths = sorted([str(p) for p in Path(LOCAL_DATA_DIR).rglob('*.wav')])
print(f"Found {len(wav_paths)} wav files")
if not wav_paths:
    raise RuntimeError('No wav files found after extraction.')


## Validate Audio (Mono, 48k)


In [None]:
import torchaudio

bad = []
for path in wav_paths:
    info = torchaudio.info(path)
    sr = info.sample_rate
    ch = info.num_channels
    issues = []
    if ch != 1:
        issues.append(f"channels={ch}")
    if sr != SAMPLE_RATE:
        issues.append(f"sample_rate={sr}")
    if issues:
        bad.append((path, "; ".join(issues)))

if bad:
    print('Invalid files:')
    for path, issues in bad:
        print(f"- {path}: {issues}")
    raise RuntimeError(f"Validation failed for {len(bad)} file(s).")

print('All files are mono and 48kHz.')


## Create Train/Valid/Test Splits


In [None]:
import json
import random
from pathlib import Path
import torchaudio

rng = random.Random(SEED)
rng.shuffle(wav_paths)

total = len(wav_paths)
train_end = int(total * TRAIN_RATIO)
valid_end = train_end + int(total * VALID_RATIO)

train_paths = wav_paths[:train_end]
valid_paths = wav_paths[train_end:valid_end]
test_paths = wav_paths[valid_end:]

def file_length(path):
    info = torchaudio.info(path)
    if hasattr(info, 'num_frames'):
        return info.num_frames
    siginfo = info[0]
    return siginfo.length // siginfo.channels

def write_split(split_name, clean_paths, noisy_paths=None):
    split_dir = Path(EGS_DIR) / split_name
    split_dir.mkdir(parents=True, exist_ok=True)
    noisy_paths = noisy_paths if noisy_paths is not None else clean_paths
    clean_meta = [(str(Path(p).resolve()), file_length(p)) for p in clean_paths]
    noisy_meta = [(str(Path(p).resolve()), file_length(p)) for p in noisy_paths]
    clean_path = split_dir / 'clean.json'
    noisy_path = split_dir / 'noisy.json'
    with open(clean_path, 'w') as f:
        json.dump(clean_meta, f, indent=2)
    with open(noisy_path, 'w') as f:
        json.dump(noisy_meta, f, indent=2)
    return str(split_dir)

train_dir = write_split('tr', train_paths)
valid_dir = write_split('cv', valid_paths)

print(f"Train: {len(train_paths)}, Valid: {len(valid_paths)}, Test: {len(test_paths)}")
print('EGS dirs:', train_dir, valid_dir)


## Create Clipped Test Set


In [None]:
import random
from pathlib import Path
import torchaudio
import torch

TEST_NOISY_DIR = '/content/test_noisy'
Path(TEST_NOISY_DIR).mkdir(parents=True, exist_ok=True)

root_dir = Path(LOCAL_DATA_DIR).resolve()
rng = random.Random(SEED)
test_noisy_paths = []

for src in test_paths:
    src_path = Path(src).resolve()
    rel = src_path.relative_to(root_dir)
    dst_path = Path(TEST_NOISY_DIR) / rel
    dst_path.parent.mkdir(parents=True, exist_ok=True)

    wav, sr = torchaudio.load(str(src_path))
    thres = 10 ** (rng.uniform(-2.0, -0.9))
    wav = torch.clamp(wav, min=-thres, max=thres)
    torchaudio.save(str(dst_path), wav, sr)
    test_noisy_paths.append(str(dst_path))

test_dir = write_split('ts', test_paths, test_noisy_paths)

print('Clipped test set created at:', TEST_NOISY_DIR)
print('EGS test dir:', test_dir)


## Train DDD (HiFiGAN Solver)


In [None]:
import os
import subprocess

os.chdir(REPO_DIR)

train_cmd = [
    'python', 'train_hifigan.py',
    '+augmentor=shift_only',
    '+tr_loader=clippedclean',
    '+cv_loader=clippedclean',
    '+ts_loader=setting_1',
    '+loss=setting_1',
    '+model=setting_1',
    '+optimizer=adamw_1e4',
    '+experiment=setting_3_bs2',
    '+solver=hifiganaudiotoaudio',
    f'+data.train={train_dir}',
    f'+data.valid={valid_dir}',
    f'+data.test={test_dir}',
    f'+experiment.output_path={CHECKPOINT_DIR}',
    f'+experiment.save_every={SAVE_EVERY}',
    f'+tr_loader.parameters.sample_rate={SAMPLE_RATE}',
    f'+cv_loader.parameters.sample_rate={SAMPLE_RATE}',
    f'+ts_loader.parameters.sample_rate={SAMPLE_RATE}',
    f'+model.parameters.sample_rate={SAMPLE_RATE}',
]

subprocess.check_call(train_cmd)


## Create Clipped Test Set


In [None]:
import os
import subprocess

os.chdir(REPO_DIR)
checkpoint_path = os.path.join(CHECKPOINT_DIR, 'checkpoint.th')

infer_cmd = [
    'python', 'infer_gan.py',
    f'+load_from={checkpoint_path}',
    f'+out_dir={INFER_DIR}',
    '+model=setting_1',
    '+experiment=setting_3',
    f'+experiment.output_path={CHECKPOINT_DIR}',
    '+experiment.device=cuda',
    '+experiment.num_workers=2',
    '+experiment.dry=0.0',
    f'+noisy_dir={TEST_NOISY_DIR}',
    f'+model.parameters.sample_rate={SAMPLE_RATE}',
]

subprocess.check_call(infer_cmd)
