<img src="../Images/DSC_Logo.png" style="width: 400px;">

This notebook applies a pipeline that is built on [faster-whisper](https://github.com/SYSTRAN/faster-whisper) and [pyannote.audio](https://github.com/pyannote/pyannote-audio), with installation procedures based on their official GitHub repositories (accessed September 25, 2025). This notebook also includes some comparisons with the [WhisperX](https://github.com/m-bain/whisperX) workflow.

# 1. One-Time Setup: Install Software & Hugging Face Account

## 1.1 FFmpeg

In [None]:
!conda install -c conda-forge ffmpeg -y

## 1.2 faster_whisper (CTranslate2)

In [None]:
!pip install faster-whisper

## 1.3 pyannote

Compared to WhisperX, the faster-whisper + pyannote workflow requires you to load and combine the diarization model yourself, which makes the process a bit more manual but also more flexible.

In [None]:
!pip install pyannote.audio

## 1.4 Hugging Face Account

If you don't have a [Hugging Face Account](https://huggingface.co/) you need to create one. 

You will require token later on for the speaker diarization. You can create one by clicking on your profile icon; next click on "Access Tokens": Create a new access token. 

>Important: Don't share your token.

In addition, pyannote requires you to agree to share your contact information to access it's models. For that, go on the [pyannote speaker-diarization model](https://huggingface.co/pyannote/speaker-diarization-3.1) page, enter your information, and click on "Agree and access repository". Do the same for the [pyannote segmentation model](https://huggingface.co/pyannote/segmentation-3.0). 

In [None]:
own_token = "ENTER_YOUR_TOKEN"

# 2. Import Packages

In [None]:
import os
from datetime import timedelta

In [None]:
from faster_whisper import WhisperModel # faster-whisper
from faster_whisper import BatchedInferencePipeline # if batched transcription is wanted

from pyannote.audio import Pipeline # pyannote: a wrapper that gives ready-to-use model for speaker diarization task

You can check the installed PyTorch version and whether your environment has access to a GPU:

In [None]:
import torch

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

# 3. Setup

## 3.1 Runtime Setup

Automatically set device and compute type depending on hardware availability:

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

if device == "cuda":
    compute_type = "float16"  # Faster and more memory efficient on GPU
    batch_size = 16           # Adjust based on GPU memory
else:
    compute_type = "int8"     # Required or more efficient on CPU
    batch_size = 1            # Keep at 1 on CPU (larger values don’t help and may cause memory issues)

## 3.2 Select Audio File

Provide the relative path to one audio file (goes up one folder into, e.g., "Data/buffy/"). Both .wav and .mp3 files work because the transcription library uses ffmpeg under the hood to read many common audio formats.

In [None]:
audio_file = "../Data/buffy/shortened_Buffy_Seas01-Epis01.en.wav"
#audio_file = "../Data/moon-landing/CA138clip.wav"
#audio_file = "../Data/qualitative-interview-de/DE_example_2.mp3"
#audio_file = "../Data/qualitative-interview-en/EN_example_1.mp3"

# 4. Load Whisper Model

If you know the language, set it explicitly. This reduces errors and makes decoding faster. The "language" variable is passed when calling model.transcribe() in Sect. 5.

In [None]:
language = "en"

Load the model with the given device ("cpu" or "cuda") and precision type ("float16", "int8", etc.). Refer to [Whisper](https://github.com/openai/whisper) or use a custom (e.g. fine-tuned) model here.

In [None]:
model = WhisperModel("large-v3", 
                     device, 
                     compute_type=compute_type) # "tiny", ...

With faster-whisper you can call the model in two ways: with or without batching enabled. Use batched only on GPU. If you want the batched model (alternative):

In [None]:
#batched_model = BatchedInferencePipeline(model=model)

# 5. Automatic Speech Recognition (ASR)

The transcription process can be tuned through several parameters. In the setup below, we enable word-level timestamps and the Voice Activity Detection (VAD) filter. Word-level timestamps are generated during decoding, while the VAD filter runs as a preprocessing step before transcription. Beam size controls how many alternative transcriptions the model considers at each step (larger values can improve accuracy slightly but make decoding slower). For details on available decoding and alignment options, refer to the [faster-whisper](https://github.com/SYSTRAN/faster-whisper) documentation.

faster-whisper outputs a list of segments, where each segment corresponds to a decoded audio chunk of up to ~30 seconds. Every segment includes its start and end time, the recognized text, the underlying token IDs, and several confidence-related metrics (avg_logprob, compression_ratio, no_speech_prob) that could be further analyzed. If word-level timestamps are enabled, each segment also contains a list of words with their own start and end times and probabilities.

>Comparison with WhisperX: WhisperX always applies two additional models: a speech activity detector to remove silence (VAD) and a forced aligner to re-time each word with high precision. By contrast, with faster-whisper, VAD is just a lightweight filter before decoding, and word timings are produced directly by Whisper. This difference can make WhisperX word timings more precise. WhisperX skips confidence-related metrics because its focus is on producing transcriptions with accurate word-level timings, while faster-whisper exposes additional internal information from Whisper.

In [None]:
segments, info = model.transcribe(
    audio_file, 
    beam_size=1, 
    word_timestamps=True, 
    vad_filter=True, 
    language="en") 

segments = list(segments)  # The transcription will actually run here

In [None]:
print(segments)

Alternative with batched model:

In [None]:
#segments, info = batched_model.transcribe(audio_file, batch_size=batch_size, word_timestamps=True, vad_filter=True, language="en")

#segments = list(segments)  # The transcription will actually run here

# 6. Speaker Diarization

## 6.1 Load Model

First, we load the speaker diarization model from pyannote.

In [None]:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=own_token)

## 6.2 Perform Diarization

Next, we run the diarization model on the audio file. This gives us a timeline of speaker activity, for example: SPEAKER_00 speaks from 0–10 seconds, then SPEAKER_01 speaks from 10–15 seconds, and so on. By setting both min and max speakers to 2, we force the model to split the conversation into exactly two different speakers.

In [None]:
diarization_result = pipeline(audio_file, 
                              min_speakers=2, 
                              max_speakers=2)

In [None]:
print(diarization_result)

# 7. Merge Results

We merge ASR output with diarization so that each spoken segment (or each word) is linked to a speaker.

## 7.1 ASR Result Formating

Since the output of faster-whisper is stored in its own objects, we first convert it into a simple Python dictionary format. We define a function "to_whisper_result". This makes the transcript easier to work with. Each segment gets a start time, an end time, the transcribed text, and (if word-level timestamps were enabled) a list of words with their own timing information.

In [None]:
def to_whisper_result(segments, language=None):
    out = {"segments": []}
    for s in segments:
        item = {
            "start": float(s.start),
            "end": float(s.end),
            "text": s.text or "",
        }
        if getattr(s, "words", None):
            item["words"] = [
                {
                    "word": w.word,
                    "start": float(w.start),
                    "end": float(w.end),
                }
                for w in s.words
                if w.start is not None and w.end is not None
            ]
        out["segments"].append(item)
    return out

Run conversion:

In [None]:
asr_result = to_whisper_result(segments)

In [None]:
print(asr_result)

## 7.2 Align Results

We combine the converted ASR output with the diarization output in the align function. For each word in the transcript, we check which diarization segment overlaps most with that word in time (majority vote), and that speaker label is assigned to the word. We also assign a single main speaker to each ASR segment by choosing the speaker who spoke for the longest total word duration within that segment. 

The merge itself is straightforward (and similar to what WhisperX performs): each word is assigned to the single diarization segment that overlaps with it the most. This shows how merging can be done manually, and the align step can easily be adapted to different needs. For example, you could explicitly preserve overlaps (and maybe add per-word probabilities), merge adjacent segments with the same speaker into longer turns for cleaner transcripts, or change the voting rule (e.g., duration vs. word count). Thanks to word-level timestamps, any errors from overlaps are confined to very short spans.

In [None]:
def align(asr_result, diarization_result):

    def best_speaker(s, e):
        best, best_ov = None, 0.0
        for turn, _, spk in diarization_result.itertracks(yield_label=True):
            ts, te = float(turn.start), float(turn.end)
            ov = min(e, te) - max(s, ts) # Overlap = time where the word/segment and this diarization turn both exist
            if ov > best_ov:
                best, best_ov = spk, ov
        return best

    # Define how final_result shall look like (structure for saving)
    out = {"segments": []}

    # Loop over each ASR segment
    for seg in asr_result["segments"]:
        
        # 1. Add basic info: start/end time and full text
        new_seg = {
            "start": seg["start"],
            "end": seg["end"],
            "text": seg["text"],
        }

        # 2. Add word-level results in a list (if word_timestamps=True)
        words = seg.get("words", [])
        if words:
            new_words = []
            dur_by_speaker = {}  # Keep track of how long each speaker spoke in this segment
            for w in words:
                # Find which speaker overlaps most with this word
                spk = best_speaker(w["start"], w["end"]) # Function from above
                # Copy word info and add the speaker label
                wd = dict(w)
                wd["speaker"] = spk
                new_words.append(wd)
                # Count how much speaking time this speaker had (duration of word)
                if spk is not None:
                    dur_by_speaker[spk] = dur_by_speaker.get(spk, 0.0) + (w["end"] - w["start"])
            # Save the word list inside the segment
            new_seg["words"] = new_words
            # Assign the segment-level speaker by picking the one with the most word duration
            new_seg["speaker"] = max(dur_by_speaker, key=dur_by_speaker.get) if dur_by_speaker else None
        else:
            # If no word-level timestamps exist, assign a speaker for the entire segment directly
            new_seg["speaker"] = best_speaker(seg["start"], seg["end"])

        # Add this processed segment to the output
        out["segments"].append(new_seg)

    return out


Run aligner:

In [None]:
final_result = align(asr_result, diarization_result)

In [None]:
print(final_result)

# 8. Save Final Result

Finally, we save a readable transcript as a text file: for each segment, we write out the speaker label, the transcribed text, and the segment’s start time. This produces a simple "Speaker: text [time]" format. This can easily be adapted depending on what is needed for further analysis inside Python or in external tools.

In [None]:
# Save as ...
method = "fw-pyannote"
file = "Buffy"
output_folder = "../Results/"
output_name = file + "_" + method
txt_path = os.path.join(output_folder, f"{output_name}.txt")

# Save 
with open(txt_path, "w", encoding="utf-8") as f:
    for seg in final_result["segments"]:
        speaker = seg.get("speaker")  # already present in your example
        text = seg["text"].strip()
        start = str(timedelta(seconds=seg["start"]))[:-3]
        f.write(f"{speaker}: {text} [{start}]\n\n")

print("Saved transcript to", txt_path)