In [None]:
%pip install TTS bark soundfile torch whisperspeech


In [None]:

import csv
import os
import tempfile
import time
from pathlib import Path
import statistics

import psutil
import torch

corpus_path = "corpus/en_corpus_10.txt"
WARMUP_TEXT = "Hello"
BENCHMARK_TEXT = "Тестовая фраза для синтеза."


def get_corpus(path: str):
    if not os.path.exists(path):
        raise FileNotFoundError("corpus not found")

    with open(path, "r") as file:
        samples = [x.strip() for x in file.readlines()]

    return [sample for sample in samples if sample]


corpus = get_corpus(corpus_path)
device = "cuda" if torch.cuda.is_available() else "cpu"

model_names = ["bark", "fastspeech", "vits", "xtts", "whisper-speech"]
stats = {name: {"report": {}, "deltas": []} for name in model_names}

reports_dir = Path("reports")
reports_dir.mkdir(parents=True, exist_ok=True)
report_csv_path = reports_dir / "lab4_report.csv"
report_rows = []


def create_temp_wav(prefix: str) -> str:
    tmp = tempfile.NamedTemporaryFile(prefix=f"{prefix}_", suffix=".wav", delete=False)
    path = tmp.name
    tmp.close()
    return path


def summarize_deltas(deltas):
    if not deltas:
        return {
            "num_runs": 0,
            "mean_runtime_s": None,
            "median_runtime_s": None,
            "std_runtime_s": None,
            "min_runtime_s": None,
            "max_runtime_s": None,
        }

    summary = {
        "num_runs": len(deltas),
        "mean_runtime_s": statistics.mean(deltas),
        "median_runtime_s": statistics.median(deltas),
        "std_runtime_s": statistics.pstdev(deltas) if len(deltas) > 1 else 0.0,
        "min_runtime_s": min(deltas),
        "max_runtime_s": max(deltas),
    }

    return summary


def record_run(model_name, audio_file, duration, model_report):
    row = {
        "audio_file": audio_file,
        "model": model_name,
        "inference_time_s": duration,
    }

    for key, value in (model_report or {}).items():
        if key == "model_name":
            continue
        row[key] = value

    report_rows.append(row)


def save_report(rows, path):
    if not rows:
        print("Report was not generated: no successful synthesis runs.")
        return

    fieldnames = ["audio_file", "model", "inference_time_s"]
    for row in rows:
        for key in row.keys():
            if key not in fieldnames:
                fieldnames.append(key)

    with open(path, "w", newline="") as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)

    print(f"Saved report to {path} ({len(rows)} rows).")


def evaluate_tts_model(
    name,
    synth_fn,
    *,
    warmup_kwargs=None,
    benchmark_kwargs=None,
    model_obj=None,
):
    warmup_kwargs = dict(warmup_kwargs or {})
    benchmark_kwargs = dict(benchmark_kwargs or {})

    def _run(call_kwargs):
        kwargs = dict(call_kwargs) if call_kwargs else {}
        tmp_path = kwargs.get("file_path")
        try:
            synth_fn(**kwargs)
        finally:
            if tmp_path and os.path.exists(tmp_path):
                try:
                    os.remove(tmp_path)
                except OSError:
                    pass

    if torch.cuda.is_available() and hasattr(torch.cuda, "reset_peak_memory_stats"):
        torch.cuda.reset_peak_memory_stats()

    try:
        _run(warmup_kwargs)
    except Exception as exc:
        print(f"[{name}] Warmup error: {exc}")

    synth_time = None
    start_time = time.time()
    try:
        _run(benchmark_kwargs)
    except Exception as exc:
        print(f"[{name}] Benchmark error: {exc}")
    else:
        synth_time = time.time() - start_time

    model_ref = model_obj or getattr(synth_fn, "__self__", None)

    model_size_mb = None
    if model_ref is not None and hasattr(model_ref, "state_dict"):
        state = model_ref.state_dict()
        total_bytes = sum(t.nelement() * t.element_size() for t in state.values())
        model_size_mb = total_bytes / (1024 * 1024)

    process = psutil.Process(os.getpid())
    mem_mb = process.memory_info().rss / (1024 * 1024)
    cpu_percent = psutil.cpu_percent(interval=0.1)

    gpu_mem = None
    if torch.cuda.is_available():
        gpu_mem = torch.cuda.max_memory_allocated() / (1024 * 1024)

    return {
        "model_name": name,
        "benchmark_synthesis_time_s": synth_time,
        "model_size_mb": model_size_mb,
        "cpu_load_percent": cpu_percent,
        "ram_usage_mb": mem_mb,
        "gpu_usage_mb": gpu_mem,
    }


def timer(func, *args, **kwargs):
    start = time.time()
    func(*args, **kwargs)
    end = time.time()
    return end - start


In [None]:

# Fast Speech 

from TTS.api import TTS

model_name = "fastspeech"
output_dir = Path("models") / model_name
output_dir.mkdir(parents=True, exist_ok=True)

fast_speech_tts = TTS("tts_models/en/ljspeech/fastspeech2") 
fast_speech_tts = fast_speech_tts.to(device)

fastspeech_report = evaluate_tts_model(
    model_name,
    fast_speech_tts.tts_to_file,
    warmup_kwargs={
        "text": WARMUP_TEXT,
        "file_path": create_temp_wav(f"{model_name}_warmup"),
    },
    benchmark_kwargs={
        "text": BENCHMARK_TEXT,
        "file_path": create_temp_wav(f"{model_name}_bench"),
    },
    model_obj=fast_speech_tts,
)

stats[model_name]["report"] = dict(fastspeech_report)

for i, sequence in enumerate(corpus):
    output_path = output_dir / f"{i}.wav"
    dt = timer(
        fast_speech_tts.tts_to_file,
        text=sequence,
        file_path=str(output_path),
    )

    stats[model_name]["deltas"].append(dt)
    record_run(model_name, str(output_path), dt, fastspeech_report)

stats[model_name]["report"].update(summarize_deltas(stats[model_name]["deltas"]))


In [None]:

# VITS

vits_tts = TTS("tts_models/en/ljspeech/vits")
vits_tts = vits_tts.to(device)

model_name = "vits"
output_dir = Path("models") / model_name
output_dir.mkdir(parents=True, exist_ok=True)

vits_report = evaluate_tts_model(
    model_name,
    vits_tts.tts_to_file,
    warmup_kwargs={
        "text": WARMUP_TEXT,
        "file_path": create_temp_wav(f"{model_name}_warmup"),
    },
    benchmark_kwargs={
        "text": BENCHMARK_TEXT,
        "file_path": create_temp_wav(f"{model_name}_bench"),
    },
    model_obj=vits_tts,
)

stats[model_name]["report"] = dict(vits_report)

for i, sequence in enumerate(corpus):
    output_path = output_dir / f"{i}.wav"
    dt = timer(
        vits_tts.tts_to_file,
        text=sequence,
        file_path=str(output_path)
    )

    stats[model_name]["deltas"].append(dt)
    record_run(model_name, str(output_path), dt, vits_report)

stats[model_name]["report"].update(summarize_deltas(stats[model_name]["deltas"]))


In [None]:

# Bark

from bark import generate_audio, SAMPLE_RATE
import soundfile as sf

model_name = "bark"
output_dir = Path("models") / model_name
output_dir.mkdir(parents=True, exist_ok=True)


def bark_to_file(text, file_path):
    audio_arr = generate_audio(text)
    sf.write(file_path, audio_arr, SAMPLE_RATE)


bark_report = evaluate_tts_model(
    model_name,
    bark_to_file,
    warmup_kwargs={
        "text": WARMUP_TEXT,
        "file_path": create_temp_wav(f"{model_name}_warmup"),
    },
    benchmark_kwargs={
        "text": BENCHMARK_TEXT,
        "file_path": create_temp_wav(f"{model_name}_bench"),
    },
)

stats[model_name]["report"] = dict(bark_report)

for i, sequence in enumerate(corpus):
    output_path = output_dir / f"{i}.wav"
    dt = timer(
        bark_to_file,
        text=sequence,
        file_path=str(output_path),
    )

    stats[model_name]["deltas"].append(dt)
    record_run(model_name, str(output_path), dt, bark_report)

stats[model_name]["report"].update(summarize_deltas(stats[model_name]["deltas"]))


In [None]:

# xtts
xtts = TTS("tts_models/multilingual/multi-dataset/xtts_v2")
xtts = xtts.to(device)

model_name = "xtts"
output_dir = Path("models") / model_name
output_dir.mkdir(parents=True, exist_ok=True)

xtts_report = evaluate_tts_model(
    model_name,
    xtts.tts_to_file,
    warmup_kwargs={
        "text": WARMUP_TEXT,
        "language": "en",
        "file_path": create_temp_wav(f"{model_name}_warmup"),
    },
    benchmark_kwargs={
        "text": BENCHMARK_TEXT,
        "language": "en",
        "file_path": create_temp_wav(f"{model_name}_bench"),
    },
    model_obj=xtts,
)

stats[model_name]["report"] = dict(xtts_report)

for i, sequence in enumerate(corpus):
    output_path = output_dir / f"{i}.wav"
    dt = timer(
        xtts.tts_to_file,
        text=sequence,
        file_path=str(output_path),
        language="en",
    )

    stats[model_name]["deltas"].append(dt)
    record_run(model_name, str(output_path), dt, xtts_report)

stats[model_name]["report"].update(summarize_deltas(stats[model_name]["deltas"]))


In [None]:

# Whisper Speech

from whisperspeech.pipeline import Pipeline
import soundfile as sf

pipe = Pipeline()

model_name = "whisper-speech"
output_dir = Path("models") / model_name
output_dir.mkdir(parents=True, exist_ok=True)


def whisper_to_file(text, file_path):
    audio = pipe.generate_audio(text)
    sf.write(file_path, audio, 16000)


whisper_report = evaluate_tts_model(
    model_name,
    whisper_to_file,
    warmup_kwargs={
        "text": WARMUP_TEXT,
        "file_path": create_temp_wav(f"{model_name}_warmup"),
    },
    benchmark_kwargs={
        "text": BENCHMARK_TEXT,
        "file_path": create_temp_wav(f"{model_name}_bench"),
    },
)

stats[model_name]["report"] = dict(whisper_report)

for i, sequence in enumerate(corpus):
    output_path = output_dir / f"{i}.wav"

    dt = timer(
        whisper_to_file,
        text=sequence,
        file_path=str(output_path),
    )

    stats[model_name]["deltas"].append(dt)
    record_run(model_name, str(output_path), dt, whisper_report)

stats[model_name]["report"].update(summarize_deltas(stats[model_name]["deltas"]))


In [None]:

# Persist metrics report
save_report(report_rows, report_csv_path)
stats
