<a href="https://colab.research.google.com/github/AnshikaSingh33/voiceCloning/blob/main/voiceCloning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q --upgrade ffmpeg-python git+https://github.com/openai/whisper.git soundfile datasets yt-dlp

In [None]:
!pip install numpy==2.0

In [None]:
import os
import json
import subprocess
import uuid
import torch
import whisper
import glob
from pathlib import Path

In [None]:

MODEL_SIZE = "turbo"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


AUDIO_DIR = Path("/content/audio")
AUDIO_DIR.mkdir(exist_ok=True, parents=True)

print(f" Using {MODEL_SIZE} model on {DEVICE}")

In [None]:
YOUTUBE_URLS = [
    "http://youtube.com/watch?v=br8hkcZ1YV0",
]
for url in YOUTUBE_URLS:
    try:

        process = subprocess.run(
            ["yt-dlp", "--print", "%(title)s", url],
            capture_output=True, text=True, check=True
        )
        title = process.stdout.strip()

        expected_filename = AUDIO_DIR / f"{title}.m4a"

    except Exception as e:
        print(f"Warning: Could not determine expected filename for {url}: {e}")

        print(f"* Downloading {url} (without checking for existing file)")
        subprocess.run([
            "yt-dlp",
            "-q", "-x",
            "--audio-format", "m4a",
            "-o", str(AUDIO_DIR / "%(title)s.%(ext)s"),
            url
        ], check=True)

        continue


    if expected_filename.exists():
        print(f"\n Skipping download for {url} (already exists as {expected_filename.name})")
    else:
        print(f"\n Downloading {url}")

        subprocess.run([
            "yt-dlp",
            "-q", "-x",
            "--audio-format", "m4a",
            "-o", str(expected_filename),
            url
        ], check=True)
        continue;


if expected_filename.exists():
    print(f" Skipping download for {url} (already exists as {expected_filename.name})")
else:
    print(f" Downloading {url}")

    subprocess.run([
        "yt-dlp",
        "-q",
        "-x",
        "--audio-format", "m4a",
        "-o", str(AUDIO_DIR / "%(title)s.%(ext)s"),
        url
    ], check=True)


print(f"\nAll YouTube audio saved to {AUDIO_DIR}")

In [None]:

from pathlib import Path
import subprocess
from IPython.display import Audio, display


CLEANED_AUDIO_DIR = Path("/content/audio_clean")
CLEANED_AUDIO_DIR.mkdir(exist_ok=True, parents=True)



def clean_one(src: Path) -> Path:
    """
    Applies a series of FFmpeg filters to clean and normalize an audio file.

    Args:
        src: Path to the source audio file.

    Returns:
        Path to the cleaned output audio file in CLEANED_AUDIO_DIR.
    """

    out = CLEANED_AUDIO_DIR / f"{src.stem}_clean.wav"


    filter_chain = (
        "highpass=f=80,"
        "afftdn,"
        "loudnorm=I=-16:LRA=11:TP=-1.5,"
        "dynaudnorm=f=200,"
        "apad=pad_dur=0.1"
    )


    subprocess.run([
        'ffmpeg',
        '-loglevel', 'error',
        '-y',
        '-i', str(src),
        '-af', filter_chain,
        str(out)
    ], check=True)

    return out

In [None]:
import subprocess
from pathlib import Path
from IPython.display import Audio, display



AUDIO_DIR = Path("/content/audio")


audio_paths = sorted(list(AUDIO_DIR.glob('*.*')))




def get_duration_sec(filepath: Path) -> float:
    """Gets the duration of an audio file in seconds using ffprobe."""
    cmd = [
        "ffprobe", "-v", "error", "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1", str(filepath)
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    return float(result.stdout.strip())

print("\n---  Comparing Original vs. Cleaned Audio ---")
if audio_paths:
    first_original_audio_path = audio_paths[0]

    print("\nOriginal audio (first 10 seconds):")
    temp_original_clip_path = Path("/content/temp_original_clip_10s.wav")

    subprocess.run([
        "ffmpeg", "-loglevel", "error", "-y",
        "-i", str(first_original_audio_path),
        "-ss", "0", "-t", "10",
        str(temp_original_clip_path)
    ], check=True)

    display(Audio(str(temp_original_clip_path)))
    original_clip_duration = get_duration_sec(temp_original_clip_path)
    print(f"Duration: {original_clip_duration:.3f} seconds")

    print("\nCleaning the 10-second clip...")
    temp_cleaned_clip_path = clean_one(temp_original_clip_path)

    print("\nCleaned audio (first 10 seconds):")
    display(Audio(str(temp_cleaned_clip_path)))
    cleaned_clip_duration = get_duration_sec(temp_cleaned_clip_path)
    print(f"Duration: {cleaned_clip_duration:.3f} seconds")
else:
    print(" No audio files found to compare.")

In [None]:

cleaned_paths = []
total_clean_sec = 0


for p in audio_paths:

    cleaned_file = CLEANED_AUDIO_DIR / f"{p.stem}_clean.wav"


    if cleaned_file.exists():
        print(f"⏩ Skipping cleaning for {p.name} (already exists)")
        cleaned_paths.append(cleaned_file)
        dur = get_duration_sec(cleaned_file)
        total_clean_sec += dur
    else:

        print(f"🧼 Cleaning {p.name}...", end="")
        cleaned = clean_one(p)
        cleaned_paths.append(cleaned)


        dur = get_duration_sec(cleaned)
        total_clean_sec += dur
        print(f" done ({dur/60:.2f} min)")


print(f"\n All cleaned files are in {CLEANED_AUDIO_DIR}")
print(f"Total cleaned duration: {total_clean_sec/60:.2f} min ({total_clean_sec/3600:.2f} h)")

In [None]:

cleaned_paths = sorted(list(CLEANED_AUDIO_DIR.glob("*.wav")))

print(f"\n Found {len(cleaned_paths)} cleaned audio files.")

total_sec = 0
print("-" * 60)
for p in cleaned_paths:
    dur_sec = get_duration_sec(p)
    total_sec += dur_sec

    print(f"• {p.name:<40} {dur_sec/60:6.2f} min")
print("-" * 60)


print(f"\nTotal duration: {total_sec/60:.2f} min ({total_sec/3600:.2f} h)")

In [None]:
import whisper
import json
from pathlib import Path


model = whisper.load_model(MODEL_SIZE, device=DEVICE)


TRANSCRIPTS_DIR = Path("/content/transcripts")
TRANSCRIPTS_DIR.mkdir(exist_ok=True)


for audio_path in cleaned_paths:
    fname = audio_path.stem
    out_json = TRANSCRIPTS_DIR / f"{fname}.json"


    if out_json.exists():
        print(f"⏩ Skipping transcription for {fname} (already exists)")
        continue


    print(f" Transcribing {fname}...")
    result = model.transcribe(
        str(audio_path),
        word_timestamps=True,
        fp16=(DEVICE == "cuda"),
        verbose=False
    )


    with out_json.open("w") as f:
        json.dump(result, f, indent=2)

print(f"\n Done! All transcripts saved in {TRANSCRIPTS_DIR}")

In [None]:
from huggingface_hub import login, HfApi
from huggingface_hub.utils import LocalTokenNotFoundError


try:
    HfApi().whoami()
    print(" Already logged into Hugging Face.")
except LocalTokenNotFoundError:
    print("Not logged into Hugging Face. Logging in...")
    login()

In [None]:
import json
import os
import subprocess
import uuid
from pathlib import Path

import datasets


HF_ORG = "Anshika33"
REPO_NAME = "Anshi-voice-dataset"
MAX_SEC = 30


SAMPLING_RATE = 24_000

In [None]:
import json
import os
import subprocess
import uuid
from pathlib import Path
import datasets
import nltk
from nltk.tokenize import PunktSentenceTokenizer





TRANSCRIPTS_DIR = Path("/content/transcripts")
CLIPS_ROOT = Path("/content/clips")
CLIPS_ROOT.mkdir(exist_ok=True, parents=True)


nltk.download('punkt', quiet=True)
tokenizer = PunktSentenceTokenizer()




def chunk_one(audio_path: Path, json_path: Path):
    """Processes one audio/transcript pair to create clips."""


    with open(json_path) as f:
        data = json.load(f)
    word_meta, full_text, char_pos = [], [], 0
    for seg in data["segments"]:
        for w in seg["words"]:
            tok = w["word"].strip()
            if not tok: continue
            start_char, end_char = char_pos, char_pos + len(tok)
            word_meta.append({"char0": start_char, "char1": end_char, "t0": w["start"], "t1": w["end"]})
            full_text.extend([tok, " "])
            char_pos = end_char + 1
    full_text = "".join(full_text).rstrip()


    sent_spans = list(tokenizer.span_tokenize(full_text))
    sentences, w_idx = [], 0
    for c0, c1 in sent_spans:
        first_w = w_idx
        while first_w < len(word_meta) and word_meta[first_w]["char1"] <= c0:
            first_w += 1
        last_w = first_w
        while last_w < len(word_meta) and word_meta[last_w]["char0"] < c1:
            last_w += 1
        if first_w == last_w: continue
        s_time = word_meta[first_w]["t0"]
        e_time = word_meta[last_w - 1]["t1"]
        text = full_text[c0:c1]
        sentences.append({"start": s_time, "end": e_time, "text": text})
        w_idx = last_w


    clip_rows, bundle = [], None

    def flush(b):
        """Helper function to process a bundle and create a clip."""
        if b is None: return
        st, et, tx, out_dir = b
        out_dir.mkdir(exist_ok=True, parents=True)
        clip_path = out_dir / f"clip_{uuid.uuid4().hex}.wav"

        subprocess.run([
            "ffmpeg", "-loglevel", "error", "-y",
            "-i", str(audio_path),
            "-ss", str(st), "-to", str(et),
            "-ar", str(SAMPLING_RATE), "-ac", "1",
            str(clip_path)
        ], check=True)

        clip_rows.append({
            "audio": str(clip_path),
            "text": tx.strip(),
            "source": "0"
        })

    out_dir = CLIPS_ROOT / audio_path.stem
    for s in sentences:
        st, et, tx = s["start"], s["end"], s["text"]
        if (et - st) > MAX_SEC:
            continue

        if bundle is None:
            bundle = [st, et, tx, out_dir]
            continue

        b_st, b_et, b_tx, _ = bundle
        if (et - b_st) <= MAX_SEC:

            bundle = [b_st, et, b_tx + " " + tx, out_dir]
        else:

            flush(bundle)
            bundle = [st, et, tx, out_dir]

    flush(bundle)
    return clip_rows




rows = []
for audio_path in cleaned_paths:
    stem = audio_path.stem
    json_path = TRANSCRIPTS_DIR / f"{stem}.json"

    if not json_path.exists():
        print(f"⏩ {stem} → skipping (no transcript)")
        continue

    print(f"🧩 Chunking {stem}...")
    clip_rows = chunk_one(audio_path, json_path)
    rows.extend(clip_rows)
    print(f"  → Created {len(clip_rows)} clips")

print(f"\n Created {len(rows)} total clips across {len(cleaned_paths)} source files.")

In [None]:
print(cleaned_paths)

In [None]:
print(rows)

In [None]:
import datasets
from datasets import Dataset

ds = Dataset.from_list(rows)

ds = ds.cast_column("audio", datasets.Audio(sampling_rate=SAMPLING_RATE))

repo_id = f"{HF_ORG}/{REPO_NAME}"

print(f" Pushing dataset to {repo_id}...")
ds.push_to_hub(repo_id, private=False)

print("\n Done! Your dataset is live on the Hub.")

In [None]:
%%capture
import os

if "COLAB_" not in "".join(os.environ.keys()):

    !pip install unsloth tensorboard
else:

    !pip install -no-deps bitsandbytes accelerate xformers==0.0.29.post3 peft trl==0.15.2 triton cut_cross_entropy unsloth_zoo ten
    !pip install sentencepiece protobuf "datasets>=3.4.1" huggingface_hub hf_transfer
    !pip install -no-deps unsloth
    !pip install transformers==4.52.3

In [None]:

!pip install --force-reinstall transformers==4.52.3
!pip install --force-reinstall unsloth[colab-new]@git+https://github.com/unslothai/unsloth.git






In [None]:

from unsloth import FastModel
from transformers import CsmForConditionalGeneration
import torch

model_name = "unsloth/csm-1b"

model, processor = FastModel.from_pretrained(
    model_name = model_name,
    max_seq_length = 2048,
    dtype = None,
    auto_model = CsmForConditionalGeneration,
    load_in_4bit = True,
    full_finetuning = True
)
print("completed successfully")

In [None]:
from unsloth import FastLanguageModel



model = FastLanguageModel.get_peft_model(
    model,
    r = 32,
    target_modules = [
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ],
    lora_alpha = 16,
    lora_dropout = 0,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
    use_rslora = True,
)

In [None]:
print(model)

In [None]:
model.print_trainable_parameters()

In [None]:

ft_dataset = "Anshika33/Anshi-voice-dataset"
sampling_rate = 24_000

In [None]:

import math
from datasets import load_dataset, Audio, DatasetDict
from transformers import AutoProcessor


processor = AutoProcessor.from_pretrained("unsloth/csm-1b")
raw_ds = load_dataset(ft_dataset, split="train")

print(f'Dataset loaded with features: {raw_ds.features["audio"]}')


speaker_key = "source"
if speaker_key not in raw_ds.column_names:
    print('Unsloth: No speaker key found. Adding a default "source" column.')

    new_column = ["0"] * len(raw_ds)
    raw_ds = raw_ds.add_column(speaker_key, new_column)



raw_ds = raw_ds.cast_column("audio", Audio(sampling_rate=sampling_rate))


total_rows = len(raw_ds)

eval_rows = min(30, max(1, math.ceil(0.10 * total_rows)))

split: DatasetDict = raw_ds.train_test_split(
    test_size=eval_rows,
    shuffle=True,
    seed=42,
)

raw_train_ds, raw_eval_ds = split["train"], split["test"]

print(f"Train split: {len(raw_train_ds):>5} rows")
print(f"Eval split:  {len(raw_eval_ds):>5} rows (requested {eval_rows})")

In [None]:

max_text_length = max(len(x) for x in raw_ds["text"])
max_audio_length = max(len(x["array"]) for x in raw_ds["audio"])

print(f"Maximum text length in the dataset: {max_text_length}")
print(f"Maximum audio length in the dataset: {max_audio_length}")

In [None]:
def preprocess_example(example):
    """
    Takes a single data example, formats it, processes it, and validates the output.
    """

    conversation = [
        {
            "role": str(example["source"]),
            "content": [
                {"type": "text", "text": example["text"]},
                {"type": "audio", "audio": example["audio"]["array"]},
            ]
        }
    ]

    try:

        model_inputs = processor.apply_chat_template(
            conversation,
            tokenize=True,
            padding="max_length",
            max_length=max_audio_length,
            return_tensors="pt",
            return_dict=True,
        )
    except Exception as e:
        print(f"Error processing example with text '{example['text'][:50]}...': {e}")
        return None


    if "input_ids" in model_inputs:

        model_inputs["labels"] = model_inputs["input_ids"].clone()


        if "attention_mask" in model_inputs:

            model_inputs["labels"][model_inputs["attention_mask"] == 0] = -100



    available_keys = list(model_inputs.keys())
    print(f"Available keys: {available_keys}")


    expected_keys = ["input_ids", "attention_mask", "labels"]

    if "input_values" in available_keys:
        expected_keys.append("input_values")
    if "audio_features" in available_keys:
        expected_keys.append("audio_features")
    if "pixel_values" in available_keys:
        expected_keys.append("pixel_values")

    processed_example = {}

    for key in expected_keys:
        if key not in model_inputs:
            if key == "labels":

                print(f"Error: Could not create labels from input_ids")
                return None
            else:
                print(f"Warning: Expected key '{key}' not found in processor output.")

                continue


        value = model_inputs[key][0]
        processed_example[key] = value


    if not all(isinstance(v, torch.Tensor) for v in processed_example.values()):
        print(f"Error: Not all values are tensors. Keys: {list(processed_example.keys())}")
        return None

    return processed_example


def preprocess_example_flexible(example):
    """
    More flexible version that adapts to whatever keys the processor actually returns.
    """

    conversation = [
        {
            "role": str(example["source"]),
            "content": [
                {"type": "text", "text": example["text"]},
                {"type": "audio", "audio": example["audio"]["array"]},
            ]
        }
    ]

    try:

        model_inputs = processor.apply_chat_template(
            conversation,
            tokenize=True,
            padding="max_length",
            max_length=max_audio_length,
            return_tensors="pt",
            return_dict=True,
        )
    except Exception as e:
        print(f"Error processing example with text '{example['text'][:50]}...': {e}")
        return None


    if "labels" not in model_inputs and "input_ids" in model_inputs:
        model_inputs["labels"] = model_inputs["input_ids"].clone()


        if "attention_mask" in model_inputs:
            model_inputs["labels"][model_inputs["attention_mask"] == 0] = -100


    processed_example = {}
    for key, value in model_inputs.items():
        if isinstance(value, torch.Tensor) and value.dim() > 0:

            if value.shape[0] == 1:
                processed_example[key] = value[0]
            else:
                processed_example[key] = value
        else:
            processed_example[key] = value


    if "input_ids" not in processed_example or "labels" not in processed_example:
        print(f"Error: Missing essential keys. Available: {list(processed_example.keys())}")
        return None

    return processed_example


def debug_preprocess_example(example):
    """
    Debug version to understand what the processor returns.
    """
    conversation = [
        {
            "role": str(example["source"]),
            "content": [
                {"type": "text", "text": example["text"]},
                {"type": "audio", "audio": example["audio"]["array"]},
            ]
        }
    ]

    try:
        model_inputs = processor.apply_chat_template(
            conversation,
            tokenize=True,
            padding="max_length",
            max_length=max_audio_length,
            return_tensors="pt",
            return_dict=True,
        )

        print("="*50)
        print("Processor output keys and shapes:")
        for key, value in model_inputs.items():
            if isinstance(value, torch.Tensor):
                print(f"  {key}: {value.shape} ({value.dtype})")
            else:
                print(f"  {key}: {type(value)} - {value}")
        print("="*50)

        return model_inputs

    except Exception as e:
        print(f"Error in debug preprocessing: {e}")
        return None


print("Testing debug preprocessing on first example...")
first_example = raw_train_ds[0]
debug_result = debug_preprocess_example(first_example)

In [None]:

processed_train_ds = raw_train_ds.map(
    preprocess_example_flexible,
    remove_columns=raw_train_ds.column_names,
    desc="Preprocessing train set",
)

In [None]:
print(processed_train_ds)

In [None]:
import time


run_name = model_name.split("/")[-1] + '-lora-ft' + time.strftime("_%Y%m%d_%H%M%S")
print(run_name)

In [None]:
from transformers import TrainingArguments, Trainer
from unsloth import is_bfloat16_supported

trainer = Trainer(
    model = model,
    train_dataset = processed_train_ds,
    eval_dataset = processed_eval_ds,
    args = TrainingArguments(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 8,
        warmup_steps = 5,
        num_train_epochs = 3,
        learning_rate = 2e-4,
        fp16 = not is_bfloat16_supported(),
        bf16 = is_bfloat16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        weight_decay = 0.01,
        lr_scheduler_type = "constant",
        seed = 3407,
        output_dir = "outputs",

        eval_strategy = "steps",
        eval_steps = 10,

        report_to = "tensorboard",
        logging_dir = f"logs/{run_name}",
    ),
)

In [None]:
import torch


gpu_stats = torch.cuda.get_device_properties(0)


start_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024**3, 3)
max_memory = round(gpu_stats.total_memory / 1024**3, 3)


print(f"GPU = {gpu_stats.name}. Max memory = {max_memory} GB.")
print(f"{start_gpu_memory} GB of memory reserved.")

In [None]:

%load_ext tensorboard


%tensorboard --logdir logs/csm-1b-lora-ft_20250829_205230

In [None]:

import torch
import gc
torch.cuda.empty_cache()
gc.collect()


import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'



training_args = TrainingArguments(
    output_dir="./results/minimal",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=16,
    gradient_checkpointing=True,
    fp16=True,
    dataloader_pin_memory=False,
    num_train_epochs=1,
    logging_steps=10,
    save_steps=1000,

    remove_unused_columns=True,
)

In [None]:

!pip uninstall -y unsloth transformers accelerate bitsandbytes peft trl


!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"

In [None]:
trainer_stats=trainer.train()