# Whisper's transcription & Pyannote's Diarization

---

#### Credits to the original creator of this notebook: [`Majdoddin`](https://github.com/Majdoddin)  

[![notebook shield](https://img.shields.io/static/v1?label=&message=Notebook&color=blue&style=for-the-badge&logo=googlecolab&link=https://colab.research.google.com/github/ArthurFDLR/whisper-youtube/blob/main/whisper_youtube.ipynb)](https://colab.research.google.com/github/Majdoddin/nlp/blob/main/Pyannote_plays_and_Whisper_rhymes_v_2_0.ipynb)
[![repository shield](https://img.shields.io/static/v1?label=&message=Repository&color=blue&style=for-the-badge&logo=github&link=https://github.com/openai/whisper)](https://github.com/majdoddin/nlp)

#### It has been modified to suit our needs and facilitate execution on a local jupyter notebook in order to extract large amounts of diarizations...

#### Please pay attention to the environment set up part, as it is crucial to streamline the diarization process.

#### [pyannote.audio](https://github.com/pyannote/pyannote-audio):  
Open-source toolkit written in Python for **speaker diarization**. Based on [`PyTorch`](https://pytorch.org) machine learning framework, it provides a set of trainable end-to-end neural building blocks that can be combined and jointly optimized to build speaker diarization pipelines. `pyannote.audio` also comes with pretrained [models](https://huggingface.co/models?other=pyannote-audio-model) and [pipelines](https://huggingface.co/models?other=pyannote-audio-pipeline) covering a wide range of domains for voice activity detection, speaker segmentation, overlapped speech detection, speaker embedding reaching state-of-the-art performance for most of them.

**Important:** To load the pyannote speaker diarization pipeline,

* accept the user conditions on both [hf.co/pyannote/speaker-diarization](https://hf.co/pyannote/speaker-diarization) and [hf.co/pyannote/segmentation](https://huggingface.co/pyannote/segmentation).
* paste your access_token or login using `notebook_login` below

# Setting up the environment

In [1]:
# Run this only once to move to the correct directory
%cd ..

/home/guayo/Desktop/programming/python/NLP/YT-WHISPER_SUMMARIZATION


In [2]:
from pathlib import Path
import pandas as pd
import os


setup = False
videos = {}

urls_path = "urls"
audio_path = "audio"
diarizations_path = "diarized_audio"
transcriptions_path = "transcribed_audio"
raw_audio_path = os.path.join(audio_path, "raw_podcast_audio")
split_audio_path = os.path.join(audio_path, "splitted_podcast_audio")
raw_diarizations_path = os.path.join(diarizations_path, "raw_diarizations")
final_diarizations_path = os.path.join(diarizations_path, "final_diarizations")
access_token = ""

# Add a credentials.txt file to the directory where the repository was cloned,
# containing your huggingface token in the following format:
# e.g: huggingface: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
# If more credentials where needed in the future, they'd be imported with that format

with open("credentials.txt", "r") as file:
    for line in file.readlines():
        key, value = line.split(":")

        if key == "huggingface":
            access_token = value

for file in os.listdir(urls_path):
    if os.path.isfile(os.path.join(urls_path, file)):
        with open(os.path.join(urls_path, file)) as f:
                for line in f:
                    name, url = line.split(";")
                    videos[str(name)] = str(url)

In [3]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

In [4]:
import sys
# sys.path.append("..")

In [5]:
# Should say:
# Current path --> /home/guayo/Desktop/programming/python/NLP/YT-WHISPER_SUMMARIZATION
# Other: /home/guayo/Desktop/programming/python/NLP/YT-WHISPER_SUMMARIZATION/whisper-speaker-recognition
print("Current path -->", os.getcwd())
print("Other:", sys.path[0])

Current path --> /home/guayo/Desktop/programming/python/NLP/YT-WHISPER_SUMMARIZATION
Other: /home/guayo/Desktop/programming/python/NLP/YT-WHISPER_SUMMARIZATION/whisper-speaker-recognition


Create the appropiate file structure

In [6]:
# ------------ Define the path of every resource/output ------------ #

urls_path = "urls"
transcriptions_path = "transcribed_audio"

diarizations_path = "diarized_audio"
# Path --> YT-WHISPER_SUMMARIZATION/diarized_audio/...
raw_diarizations_path = os.path.join(diarizations_path, "raw_diarizations")
final_diarizations_path = os.path.join(diarizations_path, "final_diarizations")

audio_path = "audio"
# Path --> YT-WHISPER_SUMMARIZATION/audio/...
raw_audio_path = os.path.join(audio_path, "raw_podcast_audio")
split_audio_path = os.path.join(audio_path, "splitted_podcast_audio")
# Path --> YT-WHISPER_SUMMARIZATION/audio/splitted_podcast_audio/...
initial_split_path = os.path.join(split_audio_path, "initial_splits")
prepared_split_path = os.path.join(split_audio_path, "prepared_splits")

# ------------ Create the actual directory structure ------------ #

if not os.path.exists(urls_path):
    !mkdir {urls_path}
if not os.path.exists(transcriptions_path):
    !mkdir {transcriptions_path}

if not os.path.exists(diarizations_path):
    !mkdir {diarizations_path}
if not os.path.exists(raw_diarizations_path):
    !mkdir {raw_diarizations_path}
if not os.path.exists(final_diarizations_path):
    !mkdir {final_diarizations_path}

if not os.path.exists(audio_path):
    !mkdir {audio_path}
if not os.path.exists(raw_audio_path):
    !mkdir {raw_audio_path}
if not os.path.exists(split_audio_path):
    !mkdir {split_audio_path}
if not os.path.exists(initial_split_path):
    !mkdir {initial_split_path}
if not os.path.exists(prepared_split_path):
    !mkdir {prepared_split_path}

`Careful!` -- the following cell deletes previous work. If intended, run to make way for the new one

In [None]:
# !rm -r {os.path.join(os.getcwd(), audio_path)}/*

### Necessary downloads:

Custom build of `ffmpeg` as [recommended](https://github.com/yt-dlp/yt-dlp#strongly-recommended) by `yt-dlp`.

In [None]:
if setup:
  !wget -O - -q  https://github.com/yt-dlp/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz | xz -qdc | tar -x
  !pip install pydub
  !pip install light-the-torch
  !ltt install torch==1.13.1 torchvision==0.14.1 torchaudio==0.13.1
  !pip install git+https://github.com/hmmlearn/hmmlearn.git
  !pip install git+https://github.com/pyannote/pyannote-audio.git@develop
  !pip install pyannote.audio
  !pip install openai-whisper

# Start!

### Downloading the video from youtube

It takes more or less 15 minutes to download all the podcasts.

In [None]:
# Use this to download files instead:
from yt_dlp import YoutubeDL

for video_name, video_url in list(list(videos.items())):

    !yt-dlp -xv --ffmpeg-location ffmpeg-master-latest-linux64-gpl/bin --audio-format wav  -o "../{str(raw_audio_path) + '/' + video_name + '/'}input.wav" -- {video_url}

In [None]:
# The code below should work, but returns this error:
# CouldntDecodeError: Decoding failed. ffmpeg returned error code: 1
# ...
#   libswresample   3.  9.100 /  3.  9.100
#   libpostproc    55.  9.100 / 55.  9.100
# [wav @ 0x5594e30956c0] invalid start code [0][0][0][24] in RIFF header
# input.wav: Invalid data found when processing input
#
# We believe it is because the format that the file is downloaded in is .mp4,
# and then it is set to .wav with a simple rename. 

# from yt_downloader import GetFromYoutube

# url_kanyewest = "https://www.youtube.com/watch?v=qxOeWuAHOiw"
# GetFromYoutube(url_kanyewest).get_audio_and_rename("input", ".wav")

# Whisper's Transcriptions

In [None]:
from whisper_analisis import *

model = "base.en"
transcriber = TranscribeAudio(model)

video_names = list(videos.keys())
for idx, directory in enumerate(video_names):
    print(f"Transcribing video {idx + 1} out of {len(video_names)}...", )

    file = "input.wav"   #file shall be in .wav format preferably
    audio_file = os.path.join(os.path.join(raw_audio_path, directory), file)

    output_name = str(directory + "_transcribed.csv")  #The name of the directory is the name of the podcast
    transcriber.transcribe(audio_file, verbose=False)    #put False as a parameter to turn off verbose
    transcriber.save_DF_as_CSV(transcriptions_path, output_name)

# Getting split points for the files

The diarization process is very demanding. A video of more than 15 minutes is very likely to occupy the whole memory of the host computer, resulting in a kernel crash and losing all the progress made.  

Therefore, the audio needs to be split in chunks smaller than 10-15 minutes each in order to be processed.  

This comes with a problem: we don't want to cut the audio while a speaker is talking, this will mess up the diarization. Moreover, we have to guarantee that the speakers are recognized over audio chunks, that is, if we have speaker 1, 2 and 3 in one chunk, they should be indentified as 1, 2 and 3 in the next one, not to mix their dialogues to get a consistent diarization.

The code below is an approach to tackle these issues and prepare the audios for diarization.

In [13]:
def find_split_points(audio_df, avg_split_duration=10):

    """
    Function that finds split points in an extracted transcription while not cutting speakers dialogue.
    The split cannot be performed at exactly x timestamp because we could cut a speaker's words, therefore
    we find the closest timestamp to our split duration where a speaker stops talking.

    Parameters
    
    audio_df: pd.Dataframe
        dataframe containing transcribed audio
    avg_split_duration: int
        the avg duration of splits
    """

    start = 0   # start begings at 0
    end = start + ((avg_split_duration + 5) * 60)     # end begins at 15 min
    split_points = [0]

    while end < audio_df["end"].max():

        min_timestamp = (
            audio_df
            .query("end < @end & end > (@end - 5 * 60)")["end"].min()   #find the possible split range, and choose smallest
        )
        start = min_timestamp       #start should be the newly found split point
        end = start + ((avg_split_duration + 5) * 60)     #sum (split_size + 5) more minutes from the latest split point
        split_points.append(min_timestamp)  #save the split point
        
    split_points.append(audio_df["end"].max())

    return split_points

In [14]:
from pydub import AudioSegment

def split_wav_pydub(file, split_points, export_path):
    # Load the audio file
    audio = AudioSegment.from_wav(file)

    # Split the file and save each part
    for i in range(len(split_points) - 1):
        # Calculate the start and end of the split
        start = int(split_points[i]) * 1000     #get it in ms
        end = int(split_points[i+1]) * 1000     #get it in ms

        # Extract the split audio
        split_audio = audio[start:end]
        # Export the split audio to a new .wav file
        split_audio.export(os.path.join(export_path, f'split_{i+1}.wav'), format='wav')

    return i + 1    #This is the number of splits performed

In [15]:
def audio_splitter(file_path, transcription_path, avg_split_duration, export_path):
    audio_df = pd.read_csv(transcription_path)
    split_points = find_split_points(audio_df, avg_split_duration)
    split_wav_pydub(file_path, split_points, export_path)

Now that we have the functions, run them over all of our audio, succesfully splitting it: (11 min to run more or less)

In [16]:
videos = os.listdir(raw_audio_path)

for idx, directory in enumerate(videos):
    print(f"Splitting {idx + 1} of {len(videos)}...", end="\r")

    if directory not in os.listdir(initial_split_path):
        !mkdir {os.path.join(initial_split_path, directory)}
    audio_splitter(
        os.path.join(os.path.join(raw_audio_path, directory), "input.wav"), 
        os.path.join(transcriptions_path, directory + "_transcribed.csv"),
        10,
        os.path.join(initial_split_path, directory)
    )

Splitting 23 of 23...

# Prepending a spacer

`pyannote.audio` seems to miss the first 0.5 seconds of the audio, and, therefore, we prepend a spacer.  
This spacer should be applied to all the splitted audio for every podcast.

In [18]:
from pydub import AudioSegment

videos = os.listdir(initial_split_path)

for idx, directory in enumerate(videos):
    print(f"Running the spacer on video {idx + 1} of {len(videos)}...", end="\r")

    orig_path = os.path.join(initial_split_path, directory)
    dest_path = os.path.join(prepared_split_path, directory)
    if not os.path.exists(dest_path):
        !mkdir {dest_path}

    #Here we run the spacer for every split
    for file in os.listdir(orig_path):

        f, ext = file.split(".")

        input_file = os.path.join(orig_path, file)
        output_file = os.path.join(dest_path, f + "_prep" + "." + ext)

        spacermilli = 1000
        spacer = AudioSegment.silent(duration=spacermilli)

        audio = AudioSegment.from_wav(input_file)

        audio = spacer.append(audio, crossfade=0)

        audio.export(output_file, format='wav')

Running the spacer on video 1 of 23...

Running the spacer on video 23 of 23...

# Pyannote's Diarization

In [9]:
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization', use_auth_token= (access_token) or True )

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.1.1. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../../../.cache/torch/pyannote/models--pyannote--segmentation/snapshots/c4c8ceafcbb3a7a280c2d357aee9fbc9b0be7f9b/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.0.1. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.1.0+cu121. Bad things might happen unless you revert torch to 1.x.


In [12]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pipeline.to(device)

<pyannote.audio.pipelines.speaker_diarization.SpeakerDiarization at 0x7f9df2f0c5e0>

Running pyannote.audio to generate the diarizations.

In [13]:
prep_splits = os.path.join(split_audio_path, "prepared_splits")

for directory in os.listdir(prep_splits):

    path = os.path.join(os.path.join(prep_splits, directory))
    if not os.path.exists(os.path.join(diarizations_path, directory)):
        !mkdir {os.path.join(diarizations_path, directory)}

    for file in os.listdir(path):
        DEMO_FILE = {'uri': 'blabla', 'audio': os.path.join(path, file)}
        dz = pipeline(DEMO_FILE)

        number = file.split("_")[1]
        with open(f"{os.path.join(diarizations_path, directory) + '/split_' + number}_diarized.txt", "w") as text_file:
            text_file.write(str(dz))

# Preparing audio files according to the diarization

In [8]:
def millisec(timeStr):
  spl = timeStr.split(":")
  s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
  return s

Grouping the diarization segments according to the speaker.

In [9]:
import re

diarized_groups = []

for directory in os.listdir(raw_diarizations_path):

  base_dir = os.path.join(raw_diarizations_path, directory)

  for file in os.listdir(base_dir):
    
    dzs = open(os.path.join(base_dir, file)).read().splitlines() 
    groups = []
    g = []
    lastend = 0

    for d in dzs:
      if g and (g[0].split()[-1] != d.split()[-1]):      #same speaker
        groups.append(g)
        g = []

      g.append(d)

      end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=d)[1]
      end = millisec(end)
      if (lastend > end):       #segment engulfed by a previous segment
        groups.append(g)
        g = []
      else:
        lastend = end
    if g:
      groups.append(g)
    #print(*groups, sep='\n')

    diarized_groups.append(groups)