<a href="https://colab.research.google.com/github/LaboratorioSperimentale/Speaker_diarization/blob/main/Speaker_diarization_new.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## STEP1

Make sure your file is _mono_ (and not stereo).

To check it, open the file with audacity and see how many waves you see.

If you see more then one -> @Ele

Once you have your mono-wave file, export it in `.wav` format


Also, install the libraries you'll need

In [None]:
!pip install -q git+https://github.com/openai/whisper.git
!pip install -q git+https://github.com/pyannote/pyannote-audio

In [None]:
!pip install srt

## STEP 2

Upload the file


In [1]:
# upload audio file
path = "/content/StraParlaC(1).wav"

In [None]:
# TODO x Ludovica: find issue
# if path[-3:] != 'wav':
#   subprocess.call(['ffmpeg', '-i', path, '/content/audio.wav', '-y'])
#   path = '/content/audio.waw'

In [2]:
import os

if not os.path.exists(path):
  print("You haven't uploaded the right file!")

## STEP 3

Transcribe audio

In [4]:
num_speakers = 2 # @param {type:"integer"}

language = 'any' # @param ['any', 'English']

model_size = 'medium' # @param ['tiny', 'base', 'small', 'medium', 'large']

In [5]:
import whisper
import datetime

import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding(
    "speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda"))

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


hyperparams.yaml:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

  wrapped_fwd = torch.cuda.amp.custom_fwd(fwd, cast_inputs=cast_inputs)


embedding_model.ckpt:   0%|          | 0.00/83.3M [00:00<?, ?B/s]

mean_var_norm_emb.ckpt:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

classifier.ckpt:   0%|          | 0.00/5.53M [00:00<?, ?B/s]

label_encoder.txt:   0%|          | 0.00/129k [00:00<?, ?B/s]

  state_dict = torch.load(path, map_location=device)
  stats = torch.load(path, map_location=device)


In [6]:
model = whisper.load_model(model_size)

100%|██████████████████████████████████████| 1.42G/1.42G [00:11<00:00, 129MiB/s]
  checkpoint = torch.load(fp, map_location=device)


In [7]:
result = model.transcribe(path)
segments = result["segments"]

In [8]:
import json
with open("transcription.json", "w", encoding="utf-8") as fout:
  print(json.dumps(segments, indent=2), file=fout)

## STEP 4

Diarization

In [10]:
with contextlib.closing(wave.open(path,'r')) as f:
  frames = f.getnframes()
  rate = f.getframerate()
  duration = frames / float(rate)

In [11]:
audio = Audio()

def segment_embedding(segment):
  start = segment["start"]
  # Whisper overshoots the end timestamp in the last segment
  end = min(duration, segment["end"])
  clip = Segment(start, end)
  waveform, sample_rate = audio.crop(path, clip)
  return embedding_model(waveform[None])

In [13]:
embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
  try:
    embeddings[i] = segment_embedding(segment)
  except:
    pass

embeddings = np.nan_to_num(embeddings)

In [14]:
clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
  segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

In [15]:
with open("transcript_first_stage.tsv", "w", encoding="utf-8") as fout:
  curr_speaker = ""
  curr_units = []
  for (i, segment) in enumerate(segments):
    segment_speaker = segment["speaker"]
    text = segment["text"]
    text = text.lower()
    text = "".join(c for c in text if not c in [".", "?", "!", ","])

    if segment_speaker == curr_speaker:
      curr_units.append((segment["start"], segment["end"], text))
    else:
      if len(curr_units):
        first_unit = curr_units[0]
        print(f"{curr_speaker}\t{first_unit[0]:.3f}\t{first_unit[1]:.3f}\t{first_unit[2]}", file=fout)
        for unit in curr_units[1:]:
          print(f"\t{unit[0]:.3f}\t{unit[1]:.3f}\t{unit[2]}", file=fout)
      curr_speaker = segment_speaker
      curr_units = [(segment["start"], segment["end"], text)]

  if len(curr_units):
    first_unit = curr_units[0]
    print(f"{curr_speaker}\t{first_unit[0]:.3f}\t{first_unit[1]:.3f}\t{first_unit[2]}", file=fout)
    for unit in curr_units[1:]:
      print(f"\t{unit[0]:.3f}\t{unit[1]:.3f}\t{unit[2]}", file=fout)

## STEP 5

At this point you should work on the textual file named `transcript_first_stage.tsv`.

Then you can upload it again and go on with the next steps.

In [None]:
import srt
corrected_tsv_path = ""

In [None]:
import datetime

import collections
srts = collections.defaultdict(list)
with open(corrected_tsv_path, encoding="utf-8") as fin:
  for line in fin:
    linesplit = line.strip().split("\t")
    if len(linesplit) == 4:
      speaker = linesplit[0]
      linesplit = linesplit[1:]

    srts[speaker].append(srt.Subtitle(None, datetime.timedelta(seconds=float(linesplit[0])), datetime.timedelta(seconds=float(linesplit[1])), linesplit[2]))

for speaker in srts:
  with open(f"tier_{speaker}.srt", "w", encoding="utf-8") as fout:
    for element in srts[speaker]:
      print(element.to_srt(), file=fout)

ValueError: could not convert string to float: 'SPEAKER 2 6.000'