<a href="https://colab.research.google.com/github/hoodini/transcription-and-diarization/blob/main/transcripts_with_speaker_names.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Notes on usage:

- Make sure to [change runtime to GPU](https://www.tutorialspoint.com/google_colab/google_colab_using_free_gpu.htm).
- The transcript will be saved in Files, which you can find in the menu on the left.
- Change the number of speakers below if different from two.
- Pick a bigger model if you want more accuracy and a smaller model if you want the program to run faster ([more info](https://github.com/openai/whisper#available-models-and-languages)).
- If you know the language being spoken is English, then change language to 'English' as this improves performance.


High level overview of what's happening here:


1.   I'm using Open AI's Whisper model to seperate audio into segments and generate transcripts.
2.   I'm then generating speaker embeddings for each segments.
3.   Then I'm using agglomerative clustering on the embeddings to identify the speaker for each segment.   

Let me know if I can make it better!


In [1]:
# upload audio file
from google.colab import files
uploaded = files.upload()
path = next(iter(uploaded))

Saving roman.m4a to roman.m4a


In [2]:
path

'roman.m4a'

In [3]:
num_speakers = 3 #@param {type:"integer"}

language = 'Hebrew' #@param ['any', 'English', 'Hebrew']

model_size = 'large' #@param ['tiny', 'base', 'small', 'medium', 'large']

model_name = model_size
if language == 'English' and model_size != 'large':
  model_name += '.en'
elif language == 'Hebrew':
  model_name += '.he'


In [15]:
!pip install -q git+https://github.com/openai/whisper.git > /dev/null
!pip install -q git+https://github.com/pyannote/pyannote-audio > /dev/null
!pip install librosa



In [16]:
import whisper
import datetime
import librosa
import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding(
    "speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda"))

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np

In [12]:
# create an instance of the Audio class
audio = Audio()

In [6]:
if path[-3:] != 'wav':
  subprocess.call(['ffmpeg', '-i', path, 'audio.wav', '-y'])
  path = 'audio.wav'

In [7]:
model = whisper.load_model(model_size)

100%|█████████████████████████████████████| 2.87G/2.87G [00:49<00:00, 62.8MiB/s]


In [8]:
result = model.transcribe(path)
segments = result["segments"]

In [9]:
with contextlib.closing(wave.open(path,'r')) as f:
  frames = f.getnframes()
  rate = f.getframerate()
  duration = frames / float(rate)

In [13]:
def segment_embedding(segment):
  start = segment["start"]
  # Whisper overshoots the end timestamp in the last segment
  end = min(duration, segment["end"])
  clip = Segment(start, end)
  waveform, sample_rate = audio.crop(path, clip)

  # Convert the waveform to mono here
  mono_waveform = librosa.to_mono(waveform.numpy())

  # Adding an extra dimension to represent the channel
  mono_waveform = mono_waveform[None, :]

  return embedding_model(torch.tensor(mono_waveform)[None])


In [17]:
embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
  embeddings[i] = segment_embedding(segment)

embeddings = np.nan_to_num(embeddings)


In [18]:
clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
  segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

In [19]:
def time(secs):
  return datetime.timedelta(seconds=round(secs))

f = open("transcript.txt", "w")

for (i, segment) in enumerate(segments):
  if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
    f.write("\n" + segment["speaker"] + ' ' + str(time(segment["start"])) + '\n')
  f.write(segment["text"][1:] + ' ')
f.close()

In [20]:
def time(secs):
    return str(datetime.timedelta(seconds=round(secs)))

In [21]:
with open("transcript.srt", "w") as f:
    for i, segment in enumerate(segments):
        # Write segment index
        f.write(f"{i+1}\n")

        # Write time range in SRT format (hh:mm:ss,mmm)
        start_time = time(segment["start"]).split(':')
        end_time = time(segment["end"]).split(':')
        f.write(f"{start_time[0]}:{start_time[1]}:{float(start_time[2]):06.3f} --> {end_time[0]}:{end_time[1]}:{float(end_time[2]):06.3f}\n")

        # Write speaker and text
        f.write(f"{segment['speaker']}: {segment['text'][1:]}\n")

        # Add a blank line to separate different dialogues
        f.write("\n")

In [22]:
with open('transcript.srt', 'r') as file:
    data = file.read()

print(data)


1
0:00:00.000 --> 0:00:14.000
SPEAKER 3: קשה לחשוב על איתוי מצמרר מזה. דווקא היום ביום השנה ה-15 לרצח תאי רדה החליט בית המשפט לקיים סיור בזירה בבית הספר בקצרין.

2
0:00:14.000 --> 0:00:30.000
SPEAKER 1: ולבוא ליום הזה עם כל הראיות האלה, הן החדשות והן הישנות, זה קשה מאוד. עד כמה שניסיתי להחזיק את עצמי חזקה שמה, היו שלבים שקצת לא יכולתי אפילו לעמוד על הרגליים.

3
0:00:30.000 --> 0:00:39.000
SPEAKER 2: אתה בלחץ עכשיו? לא. למה? לפני דיון כזה פה בזירה. דיון, שבתים רצים לדעת, לבדוק את זה, זה טוב?

4
0:00:40.000 --> 0:00:47.000
SPEAKER 2: איך ההרגשה לחזור לפה היום? רגילה הרגשה. מה יכול להרגיש באדם שלא עשה שום דבר? רגיל.

5
0:00:48.000 --> 0:00:55.000
SPEAKER 3: רומן זדורוב חוזר לכאן בפעם השלישית מאז הרצח. בפעם הראשונה היה זה כששחזר מול מצלמות המשטרה.

6
0:00:55.000 --> 0:01:00.000
SPEAKER 2: אני רוצה קצת לשון. כל הזמן לחץ, אני לא יכול, אני לא ברזל.

7
0:01:00.000 --> 0:01:06.000
SPEAKER 3: בפעם השנייה היה זה במהלך משפטו הראשון. גם אז קיימו השופטים סיור בזירה.

8
0:01:06.000 --> 0:01:14.000
SP