# Interspeech 2026

## Imports

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from transformers.utils.notebook import NotebookProgressBar

import torchaudio.transforms as T

from voicestudio.utils.audio_utils import show_waveform

### Check GPU Availability

In [None]:
!nvidia-smi

In [None]:
# Set CUDA Device Number
DEVICE_NUM = 0

if torch.cuda.is_available():
    device = torch.device(f"cuda:{DEVICE_NUM}")
else:
    device = torch.device("cpu")
    DEVICE_NUM = -1

device_map = f"cuda:{DEVICE_NUM}" if DEVICE_NUM >= 0 else "cpu"
print(f"INFO: Using device - {device}")

## Datasets

In [None]:
from spk_incon.datasets import LIBRITTS_P_Custom
from spk_incon.datasets.libritts_p3 import download_libritts_p_metadata

In [None]:
DATA_ROOT = "./data"
URL = "https://dolab-data.duckdns.org/api/public/dl/-qA96ilN"

In [None]:
if not os.path.isfile(os.path.join(DATA_ROOT, "train-clean-100.tar.gz")):
    !wget -O "./data/train-clean-100.tar.gz" {URL}

In [None]:
try:
    curated_dataset = LIBRITTS_P_Custom(root=DATA_ROOT, download=False)
except FileNotFoundError:
    try:
        download_libritts_p_metadata(root=DATA_ROOT)
        curated_dataset = LIBRITTS_P_Custom(root=DATA_ROOT, download=False)
    except FileNotFoundError:
        print("Full download logic triggered. This may take a while...")
        curated_dataset = LIBRITTS_P_Custom(root=DATA_ROOT, download=True)

## Models

In [None]:
from transformers import AutoTokenizer, AutoProcessor

from voicestudio.models.qwen3_tts import Qwen3TTSForConditionalGeneration
from spk_incon.models.selective_tuner import SelectiveTunerForConditionalGeneration, SelectiveTunerConfig
from spk_incon.components.style_anchor import DirectStyleAnchorEmbedding, EncoderStyleAnchorEmbedding, MixedStyleAnchorEmbedding

In [None]:
model_id = "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign"

In [None]:
model = Qwen3TTSForConditionalGeneration.from_pretrained(
    model_id,
    device_map=device_map,
    dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
)
processor = AutoProcessor.from_pretrained(model_id, device_map=device_map)
model

In [None]:
original_config = model.config.to_dict()
print("Original Model Config:")
for key, value in original_config.items():
    print(f"{key}: {value}")

In [None]:
# Example Inference
inputs = processor.encode_voice_design(
    text="I am solving the equation: x = [-b ± √(b²-4ac)] / 2a? Nobody can — it's a disaster (◍•͈⌔•͈◍), very sad!",
    instruct="Happy man describes the equation in a cheerful tone, with a hint of humor. He emphasizes the complexity of the equation and expresses his feelings about it in a lighthearted way.",
)
outputs = model.generate(**inputs)

audio_values, sr = processor.decode(outputs)
show_waveform(None, waveform=torch.from_numpy(audio_values[0]), sr=sr)

In [None]:
#config = SelectiveTunerConfig.from_pretrained(
#    model.config,
#    anchor_token=(tuple(), ("<consistency>", )), anchor_token_id=((1, ), (len(processor.tokenizer), )), use_direct_anchor=False, tie_embeddings=True
#)
#setattr(config, 'hidden_size', config.decoder.hidden_size)  # parler-tts doesn't have decoder hidden_size conf
#config

In [None]:
config = SelectiveTunerConfig.from_pretrained(
    model.config,
    anchor_token=("<consistency>", ), anchor_token_id=(len(processor.tokenizer), ), use_direct_anchor=False, tie_embeddings=True
)
#setattr(config, 'hidden_size', config.decoder.hidden_size)  # parler-tts doesn't have decoder hidden_size conf
#config

In [None]:
config.talker_config.text_vocab_size, config.talker_config.hidden_size

In [None]:
setattr(config, 'vocab_size', config.talker_config.text_vocab_size)

In [None]:
setattr(config, 'hidden_size', config.talker_config.hidden_size)

In [None]:
SelectiveTunerForConditionalGeneration._replace_embeddings_with_anchors(model, config)

In [None]:
model.config = config

In [None]:
SelectiveTunerForConditionalGeneration.extend_vocabulary(model, processor.tokenizer)

In [None]:
for param in model.parameters():
    param.requires_grad = False

for module in model.modules():
    if isinstance(module, (DirectStyleAnchorEmbedding, EncoderStyleAnchorEmbedding, MixedStyleAnchorEmbedding)):
        print(f"INFO: Found a target embedding instance: {type(module).__name__}")
        for param in module.parameters():
            param.requires_grad = True

In [None]:
model.to(device)
model.to(torch.bfloat16)

In [None]:
model.eval()

In [None]:
# Check model still works after modification
inputs = processor.encode_voice_design(
    text="I am solving the equation: x = [-b ± √(b²-4ac)] / 2a? Nobody can — it's a disaster (◍•͈⌔•͈◍), very sad!",
    instruct="Happy man describes the equation in a cheerful tone, with a hint of humor. He emphasizes the complexity of the equation and expresses his feelings about it in a lighthearted way.",
)
outputs = model.generate(**inputs)

audio_values, sr = processor.decode(outputs)
show_waveform(None, waveform=torch.from_numpy(audio_values[0]), sr=sr)

## Training

In [None]:
model.train()

In [None]:
BATCH_SIZE = 2
NUM_EPOCHS = 2
LEARNING_RATE = 1e-4
OUTPUT_DIR = "./results/consistency"

os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
optimizer = AdamW([p for p in model.parameters() if p.requires_grad], lr=LEARNING_RATE, weight_decay=0.01)
total_steps = len(dataset) * NUM_EPOCHS
scheduler = CosineAnnealingLR(optimizer, T_max=total_steps, eta_min=LEARNING_RATE/100)

In [None]:
total_bar = NotebookProgressBar(NUM_EPOCHS, prefix="Running Epochs")
for epoch in range(0, NUM_EPOCHS):
    total_bar.update(epoch+1)
    train_loss, train_mfcc = [], []

    train_loader, train_len = load_data_by_epoch(epoch)

    train_bar = NotebookProgressBar(int(train_len/BATCH_SIZE+0.99), prefix=f"Training {epoch+1}")
    for i, inputs in enumerate(train_loader):
        optimizer.zero_grad()

        try:
            outputs = trainer(
                style_prompts=inputs['style'],
                transcriptions_1=inputs['content1'],
                transcriptions_2=inputs['content2'],
            )
        except (torch.cuda.OutOfMemoryError, RuntimeError):
            import gc
            gc.collect()
            torch.cuda.empty_cache()
            continue

        losses = outputs['loss']
        #mfcc = outputs['mfcc_consistency_loss']

        losses.backward()
        optimizer.step()
        scheduler.step()

        train_loss.append(losses.item())

        if i+1 != train_bar.total: train_bar.update(i+1, comment=f"Loss={losses.item():.5f}, LR={optimizer.param_groups[0]['lr']:.1e}")

    torch.save(model.state_dict(), OUTPUT_DIR+f"/epoch{epoch+1}.pt")
    import gc
    gc.collect()
    torch.cuda.empty_cache()
    train_bar.update(train_bar.total, comment=f"Loss={sum(train_loss)/len(train_loss):.5f}, LR={optimizer.param_groups[0]['lr']:.1e}")

In [None]:
import copy
copied = copy.deepcopy(model).cpu()
copied.merge_and_unload(cast_to_embedding=True)
copied.save_pretrained(OUTPUT_DIR+"_final")
del copied

In [None]:
model.save_pretrained(OUTPUT_DIR+"_final")

## Testing

In [None]:
from spk_incon.metrics.presets import DatasetType, GenerationMethod, SynthesisConfig, ModelType
from spk_incon.metrics.strategies import create_strategy
from spk_incon.datasets import DatasetType, create_dataset

from spk_incon.utils.evaluate import EvaluationPipeline

In [None]:
test_config = SynthesisConfig()
test_dataset_type = DatasetType.LIBRITTS
test_dataset_config = test_config.get_dataset_config(test_dataset_type.value)

In [None]:
test_dataset = create_dataset(test_dataset_type, test_dataset_config, root_dir="./data")

In [None]:
from pathlib import Path
import random

import numpy as np
import torch

import soundfile as sf


torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


class TestModel:
    @classmethod
    def seed_everything(cls, seed: int = 42):
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)

    @classmethod
    def synthesize(
        cls,
        text: str,
        output_path: Path,
        reference_audio: Path | None = None,
        style_prompt: str | None = None,
        speaker_id: str | None = None
    ) -> bool:
        cls.seed_everything()
        output_path.parent.mkdir(parents=True, exist_ok=True)

        #description_ids = tokenizer(style_prompt, return_tensors="pt").input_ids
        #text_ids = tokenizer(text, return_tensors="pt").input_ids
        
        # Example Inference
        inputs = processor.encode_voice_design(
            text=text,
            instruct=style_prompt,
        )
        outputs = model.generate(**inputs)

        audio_values, sr = processor.decode(outputs)
        sf.write(output_path, audio_values[0], sr)

        #with torch.inference_mode():
        #    audio = model.generate(
        #        input_ids=description_ids.to(device),
        #        prompt_input_ids=text_ids.to(device),
        #        #top_k=1,
        #    )

        #sf.write(output_path, audio.cpu().numpy().squeeze(), sample_rate)

        try:
            return output_path.stat().st_size > 0
        except FileNotFoundError:
            return False

In [None]:
test_model_type = ModelType.PARLER_TTS_MINI_V1
test_model = TestModel()

evaluator = EvaluationPipeline()

### Experiment 1

In [None]:
strategy = create_strategy(GenerationMethod.METHOD1, test_config, test_dataset, test_model)
exp1_result = strategy.generate_all(test_dataset_type.value, test_model_type.value)
exp1_result

In [None]:
exp1_eval_result = evaluator.evaluate_dataset_model(
    dataset_type=test_dataset_type,
    model_type=test_model_type,
    methods=[GenerationMethod.METHOD1]
)
evaluator.save_results_to_csv(exp1_eval_result, test_dataset_type, test_model_type)

### Experiment 2

In [None]:
strategy = create_strategy(GenerationMethod.METHOD2, test_config, test_dataset, test_model)
exp2_result = strategy.generate_all(test_dataset_type.value, test_model_type.value)
exp2_result

In [None]:
exp2_eval_result = evaluator.evaluate_dataset_model(
    dataset_type=test_dataset_type,
    model_type=test_model_type,
    methods=[GenerationMethod.METHOD2]
)
evaluator.save_results_to_csv(exp2_eval_result, test_dataset_type, test_model_type)

### Experiment 3

In [None]:
strategy = create_strategy(GenerationMethod.METHOD3, test_config, test_dataset, test_model)
exp3_result = strategy.generate_all(test_dataset_type.value, test_model_type.value)
exp3_result

In [None]:
exp3_eval_result = evaluator.evaluate_dataset_model(
    dataset_type=test_dataset_type,
    model_type=test_model_type,
    methods=[GenerationMethod.METHOD3]
)
evaluator.save_results_to_csv(exp3_eval_result, test_dataset_type, test_model_type)