# Set Up

## Installation

In [None]:
!pip install pydub

In [None]:
!pip install light-the-torch

In [None]:
!ltt install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0

In [None]:
!pip install --upgrade --user hmmlearn

In [None]:
!pip install  pyannote.audio

In [None]:
!pip install git+https://github.com/openai/whisper.git

## Imports

In [3]:
import os
from pydub import AudioSegment
from pyannote.audio import Pipeline
import torch
import re
import whisper
import json
from datetime import timedelta

  torchaudio.set_audio_backend("soundfile")
  from .autonotebook import tqdm as notebook_tqdm
  torchaudio.set_audio_backend("soundfile")


## Prepending a spacer

`pyannote.audio` seems to miss the first 0.5 seconds of the audio, and, therefore, we prepend a spcacer.

In [4]:
import os  # for file and folder path manipulation

FOLDER = "tmp"  # temporary folder for work

AUDIO_TITLE = "example"
FILE = AUDIO_TITLE + ".mov"  # example audio file

FILE_PATH = os.path.join(FOLDER, FILE)

In [5]:
# Create silent audio clip
spacermilli = 2000
spacer = AudioSegment.silent(duration=spacermilli)

# Load audio file
file_extension = FILE.split(".")[-1]  # get audio file extension
audio = AudioSegment.from_file(FILE_PATH, file_extension)
# Append silent clip
audio = spacer.append(audio, crossfade=0)
# Save modified audio
PREP_FILE = os.path.join(FOLDER, 'input_prep.wav')
audio.export(PREP_FILE, format='wav')

<_io.BufferedRandom name='tmp/input_prep.wav'>

# Pyannote's Diarization

In [6]:
# Hugging Face Access Token
access_token = ""

In [7]:
# Download Hugging Face model using access token
# Will download the model or use a cache
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.0', use_auth_token= (access_token) or True )

In [8]:
# Define machine device and set model's device

# Get device
device  = "cpu"
if torch.cuda.is_available():
    device = "cuda"
if torch.backends.mps.is_available():
    device = "mps"
    
device = torch.device(device)
pipeline.to(device)

<pyannote.audio.pipelines.speaker_diarization.SpeakerDiarization at 0x2b7f68510>

Running pyannote.audio to generate the diarizations.

In [9]:
DIARIZATION_FILE = os.path.join(FOLDER, "diarization.txt")
DEMO_FILE = {'uri': 'blabla', 'audio': PREP_FILE}

In [10]:
# Run pipeline
diarizations = pipeline(DEMO_FILE)

# Save diarization times
with open(DIARIZATION_FILE, "w") as text_file:
    text_file.write(str(diarizations))

# Preparing audio files according to the diarization

In [11]:
def millisec(timeStr):
    spl = timeStr.split(":")
    s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
    return s

Grouping the diarization segments according to the speaker.

In [12]:
diarizations = None

In [13]:
# Load diarization split
if diarizations is None:
    diarizations = open(DIARIZATION_FILE).read().splitlines()
else:
    diarizations = str(diarizations).split("\n")

groups = []
group = []
lastend = 0

for diarizarion in diarizations:  # for each diarization split
    if group and (group[0].split()[-1] != diarizarion.split()[-1]):  # if the same speaker
        groups.append(group)
        group = []

    group.append(diarizarion)  # Append the diarization information

    end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=diarizarion)[1]  # Regex to find the ending time-string
    end = millisec(end)  # convert to mili-seconds
    if (lastend > end):  # if segment engulfed by a previous segment
        groups.append(group)
        group = []
    else:
        lastend = end

if group:  # append final temporary grouping
    groups.append(group)
print(*groups, sep='\n')

['[ 00:00:02.096 -->  00:00:06.561] 0 SPEAKER_00', '[ 00:00:08.140 -->  00:00:13.998] 0 SPEAKER_00', '[ 00:00:14.490 -->  00:00:17.835] 0 SPEAKER_00', '[ 00:00:19.074 -->  00:00:25.458] 0 SPEAKER_00', '[ 00:00:26.086 -->  00:00:27.835] 0 SPEAKER_00', '[ 00:00:28.344 -->  00:00:29.448] 0 SPEAKER_00', '[ 00:00:30.398 -->  00:00:33.200] 0 SPEAKER_00', '[ 00:00:33.539 -->  00:00:35.814] 0 SPEAKER_00', '[ 00:00:36.086 -->  00:00:41.689] 0 SPEAKER_00', '[ 00:00:41.994 -->  00:00:56.290] 0 SPEAKER_00']
['[ 00:00:56.171 -->  00:01:16.731] 2 SPEAKER_02']
['[ 00:01:16.001 -->  00:01:25.000] 1 SPEAKER_01']
['[ 00:01:23.896 -->  00:01:43.998] 2 SPEAKER_02']


Save the audio part corresponding to each diarization group.

In [14]:
audio = AudioSegment.from_wav(os.path.join(FOLDER, "input_prep.wav"))
gidx = -1
for group in groups:
    start = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=group[0])[0]
    end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=group[-1])[1]
    start = millisec(start) #- spacermilli
    end = millisec(end)  #- spacermilli
    gidx += 1
    audio[start:end].export(os.path.join(FOLDER, str(gidx) + '.wav'), format='wav')
    print(f"group {gidx}: {start}--{end}")

group 0: 2096--56290
group 1: 56171--76731
group 2: 76001--85000
group 3: 83896--103997


Freeing up some memory

In [15]:
del   DEMO_FILE, pipeline, spacer,  audio, diarizations

# Whisper's Transcriptions

In [16]:
# Get device
device  = "cpu"
if torch.cuda.is_available():
    device = "cuda"
if torch.backends.mps.is_available():
    device = "mps"

model = whisper.load_model('medium.en')  # load Whisper model

In [None]:
transcript = []

for i, group in enumerate(groups):

    audiof = os.path.join(FOLDER, str(i) + '.wav')  # audio file path
    result = model.transcribe(audio=audiof, language='en', word_timestamps=True)  # transcribe audio file

    # Get time shift
    shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=group[0])[0]  # get starting time for speaker cluster
    shift = millisec(shift) - spacermilli  # the start time in the original video
    shift = max(shift, 0)  # time shift in miliseconds

    # Lambda function to apply time shift
    apply_shift = lambda time: (shift + (time * 1000.0)) / 1000.0

    segments = result["segments"]
    speaker = group[0].split()[-1]  # first section in speaker cluster, get speaker name

    if not segments:
        continue

    for segment in segments:

        # Update times for segment
        segment['start'] = apply_shift(segment['start'])
        segment['end'] = apply_shift(segment['end'])

        for i, word in enumerate(segment['words']):
            if word == "":
                continue
            # Update times for word
            word['start'] = apply_shift(word['start'])
            word['end'] = apply_shift(word['end'])

    result['segments'] = segments
    transcript.append({"speaker": speaker, "result": result})

with open(os.path.join(FOLDER, 'transcript'+'.json'), "w") as outfile:  # write result
    json.dump(transcript, outfile, indent=4)

In [35]:
speakers = {'SPEAKER_00':('Customer', '#e1ffc7', 'darkgreen'), 'SPEAKER_01':('Call Center', 'white', 'darkorange') }
def_boxclr = 'white'
def_spkrclr = 'orange'

In [36]:

preS = '\n<!DOCTYPE html>\n<html lang="en">\n\n<head>\n\t<meta charset="UTF-8">\n\t<meta name="viewport" content="whtmlidth=device-width, initial-scale=1.0">\n\t<meta http-equiv="X-UA-Compatible" content="ie=edge">\n\t<title>' + \
AUDIO_TITLE+ \
'</title>\n\t<style>\n\t\tbody {\n\t\t\tfont-family: sans-serif;\n\t\t\tfont-size: 14px;\n\t\t\tcolor: #111;\n\t\t\tpadding: 0 0 1em 0;\n\t\t\tbackground-color: #efe7dd;\n\t\t}\n\n\t\ttable {\n\t\t\tborder-spacing: 10px;\n\t\t}\n\n\t\tth {\n\t\t\ttext-align: left;\n\t\t}\n\n\t\t.lt {\n\t\t\tcolor: inherit;\n\t\t\ttext-decoration: inherit;\n\t\t}\n\n\t\t.l {\n\t\t\tcolor: #050;\n\t\t}\n\n\t\t.s {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t.c {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t.e {\n\t\t\t/*background-color: white; Changing background color */\n\t\t\tborder-radius: 10px;\n\t\t\t/* Making border radius */\n\t\t\twidth: 50%;\n\t\t\t/* Making auto-sizable width */\n\t\t\tpadding: 0 0 0 0;\n\t\t\t/* Making space around letters */\n\t\t\tfont-size: 14px;\n\t\t\t/* Changing font size */\n\t\t\tmargin-bottom: 0;\n\t\t}\n\n\t\t.t {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t#player-div {\n\t\t\tposition: sticky;\n\t\t\ttop: 20px;\n\t\t\tfloat: right;\n\t\t\twidth: 40%\n\t\t}\n\n\t\t#player {\n\t\t\taspect-ratio: 16 / 9;\n\t\t\twidth: 100%;\n\t\t\theight: auto;\n\t\t}\n\n\t\ta {\n\t\t\tdisplay: inline;\n\t\t}\n\t</style>';
preS += '\n\t<script>\n\twindow.onload = function () {\n\t\t\tvar player = document.getElementById("audio_player");\n\t\t\tvar player;\n\t\t\tvar lastword = null;\n\n\t\t\t// So we can compare against new updates.\n\t\t\tvar lastTimeUpdate = "-1";\n\n\t\t\tsetInterval(function () {\n\t\t\t\t// currentTime is checked very frequently (1 millisecond),\n\t\t\t\t// but we only care about whole second changes.\n\t\t\t\tvar ts = (player.currentTime).toFixed(1).toString();\n\t\t\t\tts = (Math.round((player.currentTime) * 5) / 5).toFixed(1);\n\t\t\t\tts = ts.toString();\n\t\t\t\tconsole.log(ts);\n\t\t\t\tif (ts !== lastTimeUpdate) {\n\t\t\t\t\tlastTimeUpdate = ts;\n\n\t\t\t\t\t// Its now up to you to format the time.\n\t\t\t\t\tword = document.getElementById(ts)\n\t\t\t\t\tif (word) {\n\t\t\t\t\t\tif (lastword) {\n\t\t\t\t\t\t\tlastword.style.fontWeight = "normal";\n\t\t\t\t\t\t}\n\t\t\t\t\t\tlastword = word;\n\t\t\t\t\t\t//word.style.textDecoration = "underline";\n\t\t\t\t\t\tword.style.fontWeight = "bold";\n\n\t\t\t\t\t\tlet toggle = document.getElementById("autoscroll");\n\t\t\t\t\t\tif (toggle.checked) {\n\t\t\t\t\t\t\tlet position = word.offsetTop - 20;\n\t\t\t\t\t\t\twindow.scrollTo({\n\t\t\t\t\t\t\t\ttop: position,\n\t\t\t\t\t\t\t\tbehavior: "smooth"\n\t\t\t\t\t\t\t});\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t}, 0.1);\n\t\t}\n\n\t\tfunction jumptoTime(timepoint, id) {\n\t\t\tvar player = document.getElementById("audio_player");\n\t\t\thistory.pushState(null, null, "#" + id);\n\t\t\tplayer.pause();\n\t\t\tplayer.currentTime = timepoint;\n\t\t\tplayer.play();\n\t\t}\n\t\t</script>\n\t</head>';
preS += '\n\n<body>\n\t<h2>' + AUDIO_TITLE + '</h2>\n\t<i>Click on a part of the transcription, to jump to its portion of audio, and get an anchor to it in the address\n\t\tbar<br><br></i>\n\t<div id="player-div">\n\t\t<div id="player">\n\t\t\t<audio controls="controls" id="audio_player">\n\t\t\t\t<source src="input.wav" />\n\t\t\t</audio>\n\t\t</div>\n\t\t<div><label for="autoscroll">auto-scroll: </label>\n\t\t\t<input type="checkbox" id="autoscroll" checked>\n\t\t</div>\n\t</div>\n';

postS = '\t</body>\n</html>'

In [38]:
def timeStr(t):
  return '{0:02d}:{1:02d}:{2:06.2f}'.format(round(t // 3600),
                                                round(t % 3600 // 60),
                                                t % 60)

html = list(preS)
txt = list("")
gidx = -1

for data in transcript:  # for each speaker cluster

  captions = data['result']['segments']  # get JSON of segments ()

  if captions: 
    speaker = data['speaker'] 
    boxclr = def_boxclr  # set speaker box background color
    spkrclr = def_spkrclr  # set speaker name text color
    if speaker in speakers:  # if speaker color's are defined
      speaker, boxclr, spkrclr = speakers[speaker]  # set color variables

    html.append(f'<div class="e" style="background-color: {boxclr}">\n')  # starting div for speaker box
    html.append('<p  style="margin:0;padding: 5px 10px 10px 10px;word-wrap:normal;white-space:normal;">\n')  # starting p for speaker text
    html.append(f'<span style="color:{spkrclr};font-weight: bold;">{speaker}</span><br>\n\t\t\t\t')  # span for speaker name

    for c in captions:  # for segment in segments
      start = c['start']
      end = c['end']
      txt.append(f'[{timeStr(start)} --> {timeStr(end)}] [{speaker}] {c["text"]}\n')

      for i, w in enumerate(c['words']):
        if w == "":
           continue
        start = w['start']
        html.append(f'<a href="#{timeStr(start)}" id="{"{:.1f}".format(round(start*5)/5)}" class="lt" onclick="jumptoTime({int(start)}, this.id)">{w["word"]}</a><!--\n\t\t\t\t-->')
    html.append('</p>\n')  # ending p for speaker text
    html.append(f'</div>\n')  # ending div for speaker box

html.append(postS)

In [None]:
with open(os.path.join(FOLDER, f"capspeaker.txt"), "w", encoding='utf-8') as file:
  s = "".join(txt)
  file.write(s)
  print('captions saved to capspeaker.txt:')
  print(s+'\n')

In [None]:
with open(os.path.join(FOLDER, f"capspeaker.html"), "w", encoding='utf-8') as file:    #TODO: proper html embed tag when video/audio from file
  s = "".join(html)
  file.write(s)
  print('captions saved to capspeaker.html:')
  print(s+'\n')