<img src="../Images/DSC_Logo.png" style="width: 400px;">

This notebook applies a pipeline that is built on [faster-whisper](https://github.com/SYSTRAN/faster-whisper) and [pyannote.audio](https://github.com/pyannote/pyannote-audio), with installation procedures based on their official GitHub repositories (accessed September 25, 2025). This is one common Whisper-based workflow among several: Whisper can also be run directly (without faster-whisper) or via more integrated pipelines such as [WhisperX](https://github.com/m-bain/whisperX). The overall approach is: Whisper transcription + pyannote diarization + time-based merging. This is similar to tools like [noScribe](https://github.com/kaixxx/noScribe). If you only need a plain transcript (no speaker labels), you can run Whisper alone and skip pyannote (including Hugging Face setup), and the merging steps.

# 1. One-Time Setup: Install Software & Hugging Face Account

## 1.1 Install Software

This code installs the Python packages listed in "requirements.txt" into the environment your Jupyter notebook is using. It ensures all needed libraries (and versions) are available so the notebook can run without import errors.

In [None]:
%pip install -r requirements.txt

## 1.2 Hugging Face Account

If you don't have a [Hugging Face Account](https://huggingface.co/) you need to create one. 

You will require token later on for the speaker diarization. You can create one by clicking on your profile icon; next click on "Access Tokens": Create a new access token ("read" permission but no "write" permissions needed). 

>Important: Don't share your token.

In addition, pyannote requires you to agree to share your contact information to access it's models. For that, go on the [pyannote/speaker-diarization-community-1](https://huggingface.co/pyannote/speaker-diarization-community-1) page, enter your information, and click on "Agree and access repository". Please also have a look at [pyannote.audio](https://github.com/pyannote/pyannote-audio) in case anything changes with the setup.

In [None]:
own_token = "ENTER_YOUR_TOKEN"

# 2. Import Packages

In [None]:
import os
from datetime import timedelta
import subprocess

import imageio_ffmpeg
import soundfile as sf
import torch

from faster_whisper import WhisperModel # faster-whisper
from faster_whisper import BatchedInferencePipeline # Optional: can make transcription faster (especially on a GPU) by processing several audio chunks at once

import torchaudio
from pyannote.audio import Pipeline # pyannote: a wrapper that gives ready-to-use model for speaker diarization task

FFMPEG = imageio_ffmpeg.get_ffmpeg_exe()

You can check the installed PyTorch version and whether your environment has access to a GPU:

In [None]:
import torch

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

# 3. Setup

## 3.1 Runtime Setup

Automatically set device and compute type depending on hardware availability:

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

if device == "cuda":
    compute_type = "float16"  # Faster and more memory efficient on GPU
    batch_size = 16           # Adjust based on GPU memory
else:
    compute_type = "int8"     # Required or more efficient on CPU
    batch_size = 1            # Keep at 1 on CPU (larger values don’t help and may cause memory issues)

## 3.2 Select Audio File

With Python, you can easily transcribe multiple files by looping over a list of paths (e.g., all files in a folder) and applying the same steps to each file. In this notebook, however, we keep things simple and specify a single audio file. Provide the relative path to one audio file (going up one folder into, e.g., "Data/buffy/"). Both .wav and .mp3 files work because the transcription library uses ffmpeg under the hood to read many common audio formats. In addition, in the next step we explicitly convert the audio to a standardized format to ensure consistent processing.

To switch between example files, uncomment exactly one pair (file_name & audio_file) and keep all others commented out:

In [None]:
#file_name = "File-A"
#audio_file = "../Data_Raw/File-A_buffy/shortened_Buffy_Seas01-Epis01.en.wav"

#file_name = "File-B"
#audio_file = "../Data_Raw/File-B_moon-landing/CA138clip.mp3"

file_name = "File-C"
audio_file = "../Data_Raw/File-C_qualitative-interview-de/DE_example_2.mp3"

#file_name = "File-D"
#audio_file = "../Data_Raw/File-D_qualitative-interview-en/EN_example_1.mp3"

#file_name = "File-E"
#audio_file = "../Data_Raw/File-E_Bremen-guide-low-saxon/audioguide-2025-platt-01-shortened.wav"

#file_name = "File-F"
#audio_file = "../Data_Raw/File-F_math/math4.mp3"

#file_name = "File-G"
#audio_file = "../Data_Raw/File-G_XX/XX"


# 4. Preprocess Audio File

Whisper and pyannote already do the basic audio handling (decoding/resampling and speech/non-speech detection), so extra preprocessing is usually optional. You would only add it if you notice clear problems, for example strong background noise/echo, very uneven volume, very long silences, or heavy overlapping speech (where an advanced "speech separation" step can sometimes help).

## 4.1 Run ffmpeg
The audio file is once converted to a 16 kHz mono WAV. This file is then used for both Whisper and pyannote so they share the exact same audio time base. The preprocessing step makes it more straightforward to align Whisper's transcript segments with pyannote's speaker turns and assign a speaker label to each part of the transcript (see Sect. 8.).

In [None]:
audio_16k = "../Data_Preprocessed/audio_16k_mono.wav"

subprocess.run([FFMPEG, "-y", "-i", audio_file, "-ac", "1", "-ar", "16000", audio_16k],
               check=True)



# 5. Load Models

## 5.1 Load Whisper Model (ASR)

Load the model with the given device ("cpu" or "cuda") and precision type ("float16", "int8", etc.). Refer to [Whisper](https://github.com/openai/whisper) or use a custom (e.g. fine-tuned) model here.

In [None]:
model = WhisperModel("large-v3", 
                     device, 
                     compute_type=compute_type) # "tiny", ...

With faster-whisper, you can run transcription "normally" or with batched inference. Batched inference is mainly a speed option for GPUs (it processes several audio chunks at once). On CPU, it usually provides little benefit, so the default (non-batched) mode is typically used. If you want the batched model (alternative):

In [None]:
#model = BatchedInferencePipeline(model=model)

## 5.2 Load pyannote Diarization Model

In [None]:
diarization = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    token=own_token
)

# 6. Automatic Speech Recognition (ASR)

**The `transcribe()` function takes an audio input and produces a transcription as a sequence of time-stamped text segments.**

You can optionally provide a language code. It is one of **many optional settings**. Most of the other parameters control how the decoding is done. In the setup below, you can enable or disable Word-Level timestamps (`word_timestamps`). If enabled, per-word start/end times and word probabilities are added, but compute cost increases and timestamps can be less stable for short disfluencies or noisy speech. You can also enable or disable voice activity detection (`vad_filter`). If enabled, non-speech regions are removed, which typically speeds up transcription and can reduce hallucinations in silent/noisy parts, but it may also cut very short hesitation sounds (e.g., “um”, “ähm”) if they fall below the VAD sensitivity. Beam size (`beam_size`) controls how many alternative token sequences are considered during decoding. Larger values can slightly improve accuracy, but they also increase runtime. **For details on all available parameters and their defaults, refer to the [faster-whisper](https://github.com/SYSTRAN/faster-whisper) documentation.**

>Example: 
>
>In [noScribe](https://github.com/kaixxx/noScribe), hotwords from a separate file are passed into `transcribe()` via the `hotwords` parameter to implement the "disfluencies" on/off toggle in the noScribe application. In faster-whisper, these hotwords are inserted as extra prompt tokens before decoding, which slightly biases the decoder toward producing those words when the audio is uncertain (for example, very short filler sounds like "uh" or "ehm" that can be hard to distinguish from breathing or background noise). The original OpenAI [Whisper](https://github.com/openai/whisper) implementation does not provide a `hotwords` parameter under that name, so this behavior is specific to faster-whisper. The closest equivalent is providing a prompt via the `initial_prompt` parameter to bias decoding in a similar direction. 
>
>If you want to find out how accounting for hesitation sounds could be implemented in the `transcribe()` function below, add to the parameters `hotwords = "Äh, das ist, es ist, ähm, nicht so einfach."` or `hotwords = "Uhm, okay, here's what I'm, like, thinking."` (taken from noScribe`s [prompt.yml](https://github.com/kaixxx/noScribe/blob/main/prompt.yml)) and run the transcription with the audio file "C" (german example) or "D" (english example) that you can select in Sect. 3.2

In [None]:
segments, info = model.transcribe(
    audio_16k,
    language="de",
    word_timestamps=False, 
    vad_filter=True, 
    beam_size=5
)

segments = list(segments)  # The transcription will actually run here

**In faster-whisper transcription results, each text segment includes start and end time, the recognized text, the underlying token IDs, and several scores that can be inspected if needed.** These scores are model-internal confidence signals derived from the token probabilities during decoding. If word-level timestamps are enabled, each segment also contains a list of words with their own start and end times and probabilities.

In [None]:
print(segments)

# 7. Speaker Diarization

Next, we run the diarization model on the audio file. This gives us a **timeline of who speaks when**, for example: SPEAKER_00 speaks from 0–10 seconds, then SPEAKER_01 speaks from 10–15 seconds, and so on. By setting `min_speakers` and `max_speakers`, we constrain the output to exactly that number of speaker labels, even if the real audio might contain fewer or more speakers.

In Sect. 4, `imageio_ffmpeg` provides an ffmpeg executable that we call directly to convert audio files. Pyannote, however, loads audio files via its own internal decoder (`TorchCodec`; see [pyannote.audio](https://github.com/pyannote/pyannote-audio)). This decoder can fail on some machines even when ffmpeg itself works. To avoid this, we pass audio "from memory": a waveform (the raw audio samples as a numeric array) plus the sample rate (samples per second). This makes the notebook more reliable on Windows and on JupyterHubs.

In [None]:

def load_for_pyannote(path):
    waveform, sr = torchaudio.load(path)      # waveform shape: (channels, samples)
    return {"waveform": waveform, "sample_rate": sr}

diarization_result = diarization(
    load_for_pyannote(audio_16k),
    min_speakers=2,
    max_speakers=2
)

In [None]:
print(diarization_result.speaker_diarization)

# 8. Merge Results

We merge ASR output with diarization so that each spoken segment (and optionally each word) is linked to a speaker label (e.g., SPEAKER_00). If word-level timestamps are available, we assign speakers per word; otherwise we assign one speaker per segment.

## 8.1 ASR Result Formating

Since the output of faster-whisper is stored in its own custom Python objects, we first convert it into a simple **Python dictionary**. We define a function `to_whisper_result` that extracts only the fields we need. This makes the transcript easier to inspect, save, and process in later steps. Each segment gets a start time, an end time, the transcribed text, and (if word-level timestamps were enabled) a list of words with their own timing information.

In [None]:
def to_whisper_result(segments, language=None):
    out = {"segments": []}
    for s in segments:
        item = {
            "start": float(s.start),
            "end": float(s.end),
            "text": s.text or "",
        }
        if getattr(s, "words", None):
            item["words"] = [
                {
                    "word": w.word,
                    "start": float(w.start),
                    "end": float(w.end),
                }
                for w in s.words
                if w.start is not None and w.end is not None
            ]
        out["segments"].append(item)
    return out

Run conversion:

In [None]:
asr_result = to_whisper_result(segments)

In [None]:
print(asr_result)

## 8.2 Align Results

We combine the converted ASR output with the diarization output in the `align` function. For each word in the transcript, we determine which diarization time interval overlaps **the most** with that word, and assign the corresponding speaker label. We also assign a single main speaker to each ASR segment by choosing the speaker with the largest total word duration inside that segment.

Diarization segments and ASR segments are independent time partitions, so they rarely match perfectly. The merge therefore compares time intervals and links them based on temporal overlap.

If word-level timestamps are available, speaker labels are assigned per word. If not, one speaker is assigned to the entire ASR segment based on total overlap.

The `overlap` flag is `Truev when two or more different speakers are active at any point within the interval.

This step illustrates a simple manual merging strategy and can easily be adapted. For example, you could preserve overlaps explicitly, merge adjacent segments with the same speaker into longer turns, or change the decision rule (e.g., duration vs. word count). Because timestamps are short, any errors caused by overlaps remain limited to small time spans.

In [None]:
def align(asr_result, diarization_result):
    """
    Merge ASR output with diarization.
    - If word timestamps exist: assign speaker per word and compute a main speaker per segment.
    - Otherwise: assign one speaker per segment.
    - overlap=True if >= 2 different speakers overlap the interval in time.
    """

    # Convert diarization result into a plain list to avoid iterating itertracks() many times
    turns = []
    for turn, _, spk in diarization_result.itertracks(yield_label=True):
        turns.append((float(turn.start), float(turn.end), spk))

    # We work with time intervals: [start, end] for ASR words/segments and [turn_start, turn_end] for diarization turns.
    def overlap_seconds(s, e, ts, te):
        """Overlap duration in seconds between [s, e] and [ts, te]."""
        return max(0.0, min(e, te) - max(s, ts))

    def speakers_in_interval(s, e):
        """Return set of speakers that overlap [s, e]."""
        spks = set()
        for ts, te, spk in turns:
            if overlap_seconds(s, e, ts, te) > 0:
                spks.add(spk)
        return spks

    def best_speaker(s, e):
        """Return speaker with the largest total overlap with [s, e]."""
        overlap_by_speaker = {}
        for ts, te, spk in turns:
            ov = overlap_seconds(s, e, ts, te)
            if ov > 0:
                overlap_by_speaker[spk] = overlap_by_speaker.get(spk, 0.0) + ov
        return max(overlap_by_speaker, key=overlap_by_speaker.get) if overlap_by_speaker else None

    def is_overlapped(s, e):
        # Interval is 'overlapped' if >= 2 different speakers are active in it
        return len(speakers_in_interval(s, e)) >= 2

    # Define how final_result shall look like (structure for saving)
    out = {"segments": []}

    # Loop over each ASR segment
    for seg in asr_result["segments"]:

        # 1. Add basic info: start/end time and full text
        new_seg = {
            "start": seg["start"],
            "end": seg["end"],
            "text": seg["text"],
        }

        # 2. Mark whether this whole segment overlaps with other speech
        new_seg["overlap"] = is_overlapped(seg["start"], seg["end"])

        # 3. Add word-level results in a list (if word_timestamps=True)
        words = seg.get("words", [])
        if words:
            new_words = []
            dur_by_speaker = {}  # Total word duration per speaker within this ASR segment

            for w in words:
                # Assign speaker for this word based on maximum time overlap
                spk = best_speaker(w["start"], w["end"])  # helper function defined above

                # Copy word info and add the speaker label
                wd = dict(w)
                wd["speaker"] = spk

                # Mark whether this word lies in an overlap region
                wd["overlap"] = is_overlapped(w["start"], w["end"])
                new_words.append(wd)

                # Accumulate speaking time (word duration) per speaker
                if spk is not None:
                    dur_by_speaker[spk] = dur_by_speaker.get(spk, 0.0) + (w["end"] - w["start"])

            # Save the word list inside the segment
            new_seg["words"] = new_words

            # Segment-level speaker: pick the one with the most total word duration
            new_seg["speaker"] = max(dur_by_speaker, key=dur_by_speaker.get) if dur_by_speaker else None

        else:
            # If no word-level timestamps exist, assign a speaker for the entire segment directly
            new_seg["speaker"] = best_speaker(seg["start"], seg["end"])

        # Add this processed segment to the output
        out["segments"].append(new_seg)

    return out

Run aligner:

In [None]:
final_result = align(asr_result, diarization_result.speaker_diarization)

In [None]:
print(final_result)

# 9. Save Final Result

Finally, we save a readable transcript as a text file: for each segment, we write out the speaker label, the transcribed text, and the segment’s start time. This produces a simple "Speaker: text [time]" format. This can easily be adapted depending on what is needed for further analysis inside Python or in external tools.

In [None]:
# Save as ...
output_folder = "../Results/"
txt_path = os.path.join(output_folder, f"{file_name}.txt")

# Save 
with open(txt_path, "w", encoding="utf-8") as f:
    for seg in final_result["segments"]:
        speaker = seg.get("speaker")
        text = seg["text"].strip()
        start = str(timedelta(seconds=seg["start"]))[:-3]
        overlap_flag = " [OVERLAP]" if seg.get("overlap") else ""
        f.write(f"{speaker}: {text}{overlap_flag} [{start}]\n\n")

print("Saved transcript to", txt_path)

If you would only run Whisper transcription (without diarization), you can save the result directly like this:

In [None]:
# Save as ...
#output_folder = "../Results/"
#os.makedirs(output_folder, exist_ok=True)
#txt_path = os.path.join(output_folder, f"{file_name}.txt")

# Save
#with open(txt_path, "w", encoding="utf-8") as f:
#    for s in segments:
#        start = str(timedelta(seconds=float(s.start)))[:-3]
#        f.write(f"[{start}] {s.text.strip()}\n")

#print("Saved transcript to", txt_path)