# **Whisper + Diarization**

In [None]:
! pip install -qq faster-whisper
! pip install -qq https://github.com/pyannote/pyannote-audio/archive/refs/heads/develop.zip
! pip install -qq pytube

import os
os.environ['HUGGINGFACE_HUB_CACHE'] = '/kaggle/working/'

In [None]:
import os

import pandas as pd
import torch
from faster_whisper import WhisperModel
from pyannote.audio import Audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
from pyannote.core import Segment
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score


In [None]:
import pytube as pt

# a sample zoom meeting extracted from youtube
yt_url = "https://www.youtube.com/watch?v=53yPfrqbpkE"
yt = pt.YouTube(yt_url)
video_title = yt.title
stream = yt.streams.filter(only_audio=True).first()
# download the audio
stream.download(filename=f"{video_title}.mp3")


In [None]:
audio_path = f"{video_title}.mp3"
audio_path_wav = f"{video_title}.wav"
audio_path

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

whisper_model = "small"
model = WhisperModel(
    whisper_model, compute_type="int8", device=device
)  # compute_type is by default float16

# segments is a generator that yields segments of the transcript
segments, info = model.transcribe(audio_path, language="en")

In [None]:
info


In [None]:
%%time 

# generate the transcript and parse the relevant data to a dicts
segments = [{"start": seg.start, "end": seg.end, "text": seg.text} for seg in segments]

In [None]:
# sample transcript segment
segments[3]

In [None]:
print(f"Total number of segments", len(segments))

In [None]:
# converts the file to .wav format and the gets the duration of the audio

import subprocess

_, file_ending = os.path.splitext(f"{audio_path}")
audio_file_wav = audio_path.replace(file_ending, ".wav")
print("Starting conversion to .wav")
subprocess.run(
    f'ffmpeg -i "{audio_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file_wav}"',
    shell=True,
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL,
)


In [None]:
# load speaker embedding model
embedding_model = PretrainedSpeakerEmbedding(
    "speechbrain/spkrec-ecapa-voxceleb", device=device
)

In [None]:
max([s["end"] for s in segments]), info.duration

In [None]:
def segment_embedding(segment, audio_path_wav):
    """Generates the speaker embedding of the segment."""

    audio = Audio()
    start = segment["start"]
    end = min(segment["end"], info.duration) 
    clip = Segment(start, end)
    waveform, sample_rate = audio.crop(audio_path_wav, clip)

    return embedding_model(waveform.unsqueeze(0))


In [None]:
import numpy as np
from tqdm import tqdm

EMBEDDING_SIZE = 192
# TODO: replace this code with torch tensors
embeddings = np.zeros(shape=(len(segments), EMBEDDING_SIZE))
i = 0
for segment in tqdm(segments):
    embeddings[i] = segment_embedding(segment, audio_file_wav)
    i += 1
embeddings = np.nan_to_num(embeddings)


In [None]:
MAX_SPEAKERS = 15
num_speakers = 0

if num_speakers == 0:
    # Find the best number of speakers
    score_num_speakers = {}

    for num_speakers in range(2, MAX_SPEAKERS + 1):
        clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
        score = silhouette_score(embeddings, clustering.labels_, metric="euclidean")
        score_num_speakers[num_speakers] = score
    best_num_speaker = max(score_num_speakers, key=lambda x: score_num_speakers[x])
    print(
        f"The best number of speakers: {best_num_speaker} with {score_num_speakers[best_num_speaker]} score"
    )
else:
    best_num_speaker = num_speakers


In [None]:
# Assign speaker label
clustering = AgglomerativeClustering(best_num_speaker).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
    segments[i]["speaker"] = "SPEAKER " + str(labels[i] + 1)


In [None]:
def convert_time(seconds):
    """Convert time in seconds to HH:MM:SS format."""
    seconds = int(seconds)
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{str(hours).rjust(2, '0')}:{str(minutes).rjust(2, '0')}:{str(seconds).rjust(2, '0')}"

MAX_TEXT_LENGTH = 500
# parse the time segments into speaker segments
objects = {"start_time": [], "end_time": [], "speaker": [], "text": []}
text = ""
for i, segment in enumerate(segments):
    if i == 0 or segments[i - 1]["speaker"] != segment["speaker"] or len(text) >= MAX_TEXT_LENGTH:
        objects["start_time"].append(str(convert_time(segment["start"])))
        objects["speaker"].append(segment["speaker"])
        if i != 0:
            objects["end_time"].append(str(convert_time(segments[i - 1]["end"])))
            objects["text"].append(text)
            text = ""
    text += segment["text"] + " "
objects["end_time"].append(str(convert_time(segments[i - 1]["end"])))
objects["text"].append(text)


In [None]:
sorted([len(t) for t in objects["text"]], reverse=True)[:5]

In [None]:
transcription_df = pd.DataFrame(objects)
transcription_df


In [None]:
transcription_df.to_csv("transcription.csv")