## Diarization

In [None]:
from pyannote.audio import Pipeline
import torch

# Replace with your Hugging Face access token
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token="read_token_here" # Needed only once to download the gated models. Safe to comment out after models are cached locally.
)

# Send pipeline to GPU if available
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# Apply pretrained pipeline to an audio file ("path/to/your/audio.wav")
diarization = pipeline(r".wav")

# Print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")

## "Denoise → Diarize → Transcribe" pipeline

In [None]:
import torch
import torchaudio
import os
import gradio as gr
from denoiser import pretrained
from denoiser.dsp import convert_audio
from transformers import WhisperForConditionalGeneration, WhisperProcessor, pipeline as whisper_pipeline
from pyannote.audio import Pipeline as DiarizationPipeline
import uuid
import shutil
import math

# ========== Device Setup ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Load Denoising Model ==========
denoise_model = pretrained.dns64().to(device)

DEBUG_DIR = "debug/"
os.makedirs(DEBUG_DIR, exist_ok=True)

def denoise_audio(audio_path):
    wav, sr = torchaudio.load(audio_path)
    wav = convert_audio(wav, sr, denoise_model.sample_rate, denoise_model.chin)

    with torch.no_grad():
        enhanced = denoise_model(wav.to(device))

    enhanced = enhanced.squeeze(0).cpu()
    out_path = os.path.join(DEBUG_DIR, f"denoised_{uuid.uuid4().hex}.wav")
    torchaudio.save(out_path, enhanced, denoise_model.sample_rate)
    return out_path

# ========== Load Whisper Large-V3 ==========
large_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v3")
large_processor = WhisperProcessor.from_pretrained("openai/whisper-large-v3")

large_model.generation_config.language = "az"
large_model.config.forced_decoder_ids = large_processor.get_decoder_prompt_ids(language="azerbaijani", task="transcribe")

large_pipe = whisper_pipeline(
    "automatic-speech-recognition",
    model=large_model,
    tokenizer=large_processor.tokenizer,
    feature_extractor=large_processor.feature_extractor,
    device=0 if torch.cuda.is_available() else -1
)

# ========== Load PyAnnote Diarization Pipeline ==========
# NOTE: Replace with your real Hugging Face token
HF_TOKEN = "your_hf_token_here"

diarization_pipeline = DiarizationPipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    #use_auth_token=HF_TOKEN
)
diarization_pipeline.to(device)

def diarize_audio(audio_path):
    diarization = diarization_pipeline(audio_path)
    segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        segments.append(f"{turn.start:.1f}s - {turn.end:.1f}s: Speaker {speaker}")
    return "\n".join(segments)

# ========== Main Processing Function ==========
def process_audio(audio_path):
    # 1) save a copy of the raw input
    raw_copy = os.path.join(DEBUG_DIR, f"original_{uuid.uuid4().hex}.wav")
    shutil.copy(audio_path, raw_copy)

    # 2) denoise & save in debug/
    denoised_path = denoise_audio(audio_path)

    # 3) diarize & load the denoised audio
    diarization = diarization_pipeline(denoised_path)
    wav, sr   = torchaudio.load(denoised_path)
    duration  = wav.shape[-1] / sr

    speaker_clips = {}
    PAD = 0.2  # 200 ms padding

    for i, (segment, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
        # pad and round
        start = max(0.0, segment.start - PAD)
        end   = min(duration, segment.end   + PAD)
        start_s = int(start * sr)
        end_s   = int(math.ceil(end * sr))

        clip = wav[:, start_s:end_s]
        clip_name = f"clip_{speaker}_{i}_{uuid.uuid4().hex}.wav"
        clip_path = os.path.join(DEBUG_DIR, clip_name)
        torchaudio.save(clip_path, clip, sr)

        # transcribe
        transcript = large_pipe(clip_path)["text"].strip()
        speaker_clips.setdefault(speaker, []).append((clip_path, transcript))

    # (optionally) remove denoised_path if you don’t need even that
    # os.remove(denoised_path)

    # 4) build HTML output…
    html = ""
    for spk, clips in speaker_clips.items():
        html += f"<h3>{spk}</h3><ul>"
        for clip_path, txt in clips:
            html += (
                f"<li>"
                f"<audio controls src=\"{clip_path}\"></audio> "
                f"<b>Transcription:</b> {txt}"
                f"</li>"
            )
        html += "</ul>"

    return html

# ——— Gradio UI ———

with gr.Blocks() as demo:
    gr.Markdown("## Azerbaijani Audio → Per-Speaker Transcript & Clips")
    audio_input = gr.Audio(type="filepath", label="Your audio")
    out = gr.HTML(label="Per-Speaker Transcripts and Audio")
    btn = gr.Button("Process")
    
    btn.click(fn=process_audio, inputs=audio_input, outputs=out)

demo.launch()

## “Whole+Align”

denoise → diarize → one-shot full-audio transcription with Stable-Whisper → align each transcribed segment to speakers by midpoint → display per-speaker utterances alongside a single full-audio player for context.

### Comparison of two approaches

In [None]:
import torch
import torchaudio
import os
import uuid
import shutil
import math
from collections import defaultdict
from denoiser import pretrained
from denoiser.dsp import convert_audio
from pyannote.audio import Pipeline as DiarizationPipeline
from stable_whisper import load_model as load_sw_model
import gradio as gr

# ========== Device Setup ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Load Denoising Model ==========
denoise_model = pretrained.dns64().to(device)
DEBUG_DIR = "debug/"
os.makedirs(DEBUG_DIR, exist_ok=True)

def denoise_audio(audio_path):
    wav, sr = torchaudio.load(audio_path)
    wav = convert_audio(wav, sr, denoise_model.sample_rate, denoise_model.chin)
    with torch.no_grad():
        enhanced = denoise_model(wav.to(device))
    enhanced = enhanced.squeeze(0).cpu()
    out_path = os.path.join(DEBUG_DIR, f"denoised_{uuid.uuid4().hex}.wav")
    torchaudio.save(out_path, enhanced, denoise_model.sample_rate)
    return out_path

# ========== Load PyAnnote Diarization Pipeline ==========
# NOTE: replace with your Hugging Face token if needed
HF_TOKEN = None
diarization_pipeline = DiarizationPipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token=HF_TOKEN
)
diarization_pipeline.to(device)

# ========== Load Stable-Whisper ==========
sw_model = load_sw_model("large-v3", device=device)

# ================= Approach 1 =================
# Transcribe whole audio with Stable-Whisper, then align segments to speakers
def process_audio_whole_align(audio_path):
    # 1) save a copy of raw input
    raw_copy = os.path.join(DEBUG_DIR, f"original_{uuid.uuid4().hex}.wav")
    shutil.copy(audio_path, raw_copy)

    # 2) denoise
    denoised_path = denoise_audio(audio_path)

    # 3) full transcription with timestamps for context & comparison
    full_result = sw_model.transcribe(
        denoised_path,
        language="azerbaijani",
        word_timestamps=True
    )
    full_text = full_result.text.strip()
    sw_segments = full_result.segments  # list of Segment with .start, .end, .text

    # 4) diarize
    diarization = diarization_pipeline(denoised_path)
    diar_list = [(turn.start, turn.end, speaker) for turn, _, speaker in diarization.itertracks(yield_label=True)]

    # 5) assign segments to speakers
    speaker_texts = defaultdict(list)
    for seg in sw_segments:
        text = seg.text.strip()
        if not text:
            continue
        mid = (seg.start + seg.end) / 2
        for start, end, spk in diar_list:
            if start <= mid <= end:
                speaker_texts[spk].append(text)
                break

    # 6) build HTML output
    html = ""
    html += "<h2>Full Transcription (Whole Audio)</h2>"
    html += f"<div style='white-space: pre-wrap; padding:8px; border:1px solid #ccc; margin-bottom:16px;'>{full_text}</div>"
    html += f"<audio controls src='{denoised_path}' style='width:100%; margin-bottom:16px;'></audio>"
    for spk, texts in speaker_texts.items():
        html += f"<h3>Speaker {spk}</h3>"
        for utt in texts:
            html += f"<p>{utt}</p>"
    return html

# =============== Approach 2 ===============
# Use Stable-Whisper per speaker clip with built-in chunking/segmentation logic
PAD = 0.2  # seconds of padding

def process_audio_perclip_stable(audio_path):
    # 1) save copy
    raw_copy = os.path.join(DEBUG_DIR, f"original_{uuid.uuid4().hex}.wav")
    shutil.copy(audio_path, raw_copy)

    # 2) denoise
    denoised_path = denoise_audio(audio_path)

    # 3) full transcription for comparison
    full_result = sw_model.transcribe(
        denoised_path,
        language="azerbaijani",
        beam_size=5,
        word_timestamps=False
    )
    full_text = full_result.text.strip()

    # 4) diarize & load wav
    diarization = diarization_pipeline(denoised_path)
    wav, sr = torchaudio.load(denoised_path)
    duration = wav.shape[-1] / sr
    speaker_clips = defaultdict(list)

    # 5) extract each speaker segment, pad, and transcribe
    for i, (segment, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
        start = max(0.0, segment.start - PAD)
        end = min(duration, segment.end + PAD)
        start_s = int(start * sr)
        end_s = int(math.ceil(end * sr))
        clip = wav[:, start_s:end_s]
        clip_name = f"clip_{speaker}_{i}_{uuid.uuid4().hex}.wav"
        clip_path = os.path.join(DEBUG_DIR, clip_name)
        torchaudio.save(clip_path, clip, sr)

        # transcribe with Stable-Whisper
        result = sw_model.transcribe(
            clip_path,
            language="azerbaijani",
            beam_size=5,
            word_timestamps=False
        )
        transcript = result.text.strip()
        speaker_clips[speaker].append((clip_path, transcript))

    # 6) build HTML output
    html = ""
    html += "<h2>Full Transcription (Whole Audio)</h2>"
    html += f"<div style='white-space: pre-wrap; padding:8px; border:1px solid #ccc; margin-bottom:16px;'>{full_text}</div>"
    for spk, clips in speaker_clips.items():
        html += f"<h3>Speaker {spk}</h3><ul>"
        for clip_path, txt in clips:
            html += (
                f"<li><audio controls src='{clip_path}'></audio> "
                f"<b>Transcription:</b> {txt}</li>"
            )
        html += "</ul>"
    return html

# ================= Gradio UI =================
with gr.Blocks() as demo:
    gr.Markdown("## Azerbaijani Audio → Per-Speaker Transcript & Clips")
    audio_input = gr.Audio(type="filepath", label="Your audio")
    method = gr.Radio(
        choices=["Whole+Align", "Per-Clip Stable"],
        value="Whole+Align",
        label="Choose Processing Method"
    )
    out = gr.HTML()
    btn = gr.Button("Process")
    btn.click(
        fn=lambda p, m: process_audio_whole_align(p) if m == "Whole+Align" else process_audio_perclip_stable(p),
        inputs=[audio_input, method],
        outputs=out
    )


demo.launch()

## Single Processing Approach

We have successfully integrated denoising, speaker diarization, and transcription components into our pipeline. The denoising and diarization modules are performing well. However, transcription remains the weakest link. To improve transcription quality, we plan to first generate initial transcriptions using the Whisper-Large-V3 model, then pass them through an LLM for refinement into high-quality text. We'll perform random manual checks on a representative sample, pair these corrected transcriptions with their corresponding audio clips, and use this dataset to fine-tune the transcription model.

It's important to note that **transcription quality is highly dependent on the quality of the audio input**. *When speaking clearly into the microphone, the transcription results are very accurate*. However, *for audio clips with lower quality*—such as those involving multiple people speaking over each other or instances where words are mumbled or inaudible—*the transcription quality deteriorates significantly*.

---

**Purpose and Limitations of ASR/STT Systems like Whisper**

Automatic Speech Recognition (ASR) or Speech-to-Text (STT) systems, such as OpenAI’s Whisper, are designed primarily to **transcribe audio speech into text**. Their core function is to convert the acoustic signal into its textual equivalent as faithfully as possible, without engaging in deeper reasoning, contextual understanding, or correction of factual or grammatical inaccuracies in the speech itself.

The **primary goal** of such systems is **not to "understand" or "reason" about the meaning or correctness** of what is being said, but to **accurately reflect the audible content**—even if the original speech contains errors, hesitations, or truncations. If a speaker says something incorrect or unclear, the ASR model aims to transcribe exactly that, not to correct or reinterpret it.

This limitation is similar to human perception: **even humans often struggle to decipher poor-quality or unclear speech** without relying on broader context, background knowledge, or guesswork. In AI systems, that interpretive reasoning is the domain of **Language Models (LLMs)** like GPT, not ASR models.

Thus, **LLMs can be layered on top of ASR output to perform higher-level tasks**, such as:

* Interpreting meaning
* Correcting grammar
* Inferring intent
* Handling ambiguous or low-quality transcriptions

This layered approach aligns with how human cognition works: first perceive (ASR), then comprehend and reason (LLM).

---

In [None]:
import torch
import torchaudio
import os
import uuid
import shutil
from collections import defaultdict
from denoiser import pretrained
from denoiser.dsp import convert_audio
from pyannote.audio import Pipeline as DiarizationPipeline
from stable_whisper import load_model as load_sw_model
import gradio as gr

# ========== Device Setup ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Load Denoising Model ==========
denoise_model = pretrained.dns64().to(device)
DEBUG_DIR = "debug/"
os.makedirs(DEBUG_DIR, exist_ok=True)

def denoise_audio(audio_path):
    wav, sr = torchaudio.load(audio_path)
    wav = convert_audio(wav, sr, denoise_model.sample_rate, denoise_model.chin)
    with torch.no_grad():
        enhanced = denoise_model(wav.to(device))
    enhanced = enhanced.squeeze(0).cpu()
    out_path = os.path.join(DEBUG_DIR, f"denoised_{uuid.uuid4().hex}.wav")
    torchaudio.save(out_path, enhanced, denoise_model.sample_rate)
    return out_path

# ========== Load PyAnnote Diarization Pipeline ==========
HF_TOKEN = None

diarization_pipeline = DiarizationPipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token=HF_TOKEN
)
diarization_pipeline.to(device)

# ========== Load Stable-Whisper ==========
sw_model = load_sw_model("large-v3", device=device)

# ========== Single Processing Approach ==========
def process_audio(audio_path):
    # 1) backup original
    raw_copy = os.path.join(DEBUG_DIR, f"original_{uuid.uuid4().hex}.wav")
    shutil.copy(audio_path, raw_copy)

    # 2) denoise
    denoised_path = denoise_audio(audio_path)

    # 3) full transcription with timestamps
    full_result = sw_model.transcribe(
        denoised_path,
        language="azerbaijani",
        word_timestamps=True
    )
    full_text = full_result.text.strip()
    sw_segments = full_result.segments

    # 4) diarize
    diarization = diarization_pipeline(denoised_path)
    diar_list = [(turn.start, turn.end, speaker) for turn, _, speaker in diarization.itertracks(yield_label=True)]

    # 5) align words to speakers
    speaker_texts = defaultdict(list)
    for seg in sw_segments:
        text = seg.text.strip()
        if not text:
            continue
        mid = (seg.start + seg.end) / 2
        for start, end, spk in diar_list:
            if start <= mid <= end:
                speaker_texts[spk].append(text)
                break

    # 6) build HTML
    html = []
    html.append("<h2>Full Transcription (Whole Audio)</h2>")
    html.append(f"<div style='white-space: pre-wrap; padding:8px; border:1px solid #ccc; margin-bottom:16px;'>{full_text}</div>")
    html.append(f"<audio controls src='{denoised_path}' style='width:100%; margin-bottom:16px;'></audio>")
    for spk, texts in speaker_texts.items():
        html.append(f"<h3>Speaker {spk}</h3>")
        for utt in texts:
            html.append(f"<p>{utt}</p>")
    return "".join(html)

# ========== Gradio UI ==========
with gr.Blocks() as demo:
    gr.Markdown("## Azerbaijani Audio → Per-Speaker Transcript")
    audio_input = gr.Audio(type="filepath", label="Upload your audio")
    out = gr.HTML()
    btn = gr.Button("Process")
    btn.click(
        fn=process_audio,
        inputs=audio_input,
        outputs=out
    )

demo.launch()