In [None]:
# Install these if not there
# !pip install pydub
# !pip install light-the-torch
# !pip install openai-whisper
# !pip install  git+https://github.com/hmmlearn/hmmlearn.git
# !pip install  git+https://github.com/pyannote/pyannote-audio.git@develop

In [1]:
from pyannote.audio import Pipeline
import whisper
import torch
from datetime import timedelta
from pydub import AudioSegment
from pathlib import Path
import re
import json
import os

In [2]:
device = torch.device("cuda")

In [3]:
device

device(type='cuda')

### With first usage of whisper first needs to install/update "numba.jit" - happens automatically - 1.42gb

# Generate a token from hugginface and insert it below
1. visit hf.co/pyannote/speaker-diarization and accept user conditions
2. visit hf.co/pyannote/segmentation and accept user conditions
3. visit hf.co/settings/tokens to create an access token
4. instantiate pretrained speaker diarization pipeline

In [3]:
hf_token = "<HuggingFace_TOKEN>" # Add your huggingface token here - see above to see how to get one
NUM_SPEAKERS = None # None does autodetection, if you only have 2 speakers try adding a 2 here - I do not yet know how good each option works

# convert m4a to wav and create subfolder structure

In [16]:
data = "./data" # Folder with all your m4a audio files right next to this file
if not os.path.isdir(data):
    !mkdir {data}
pathlist = Path(data).glob('**/*.m4a') # Change m4a to your audio file format
audio_files = []
for audio_file in pathlist:
    f = f"{str(audio_file).split('.')[0]}"
    print(f.split("/")[1])
    # Create folders
    if not os.path.isdir(f):
        !mkdir {f} 
    ftmp = f"{f}/tmp"
    if not os.path.isdir(ftmp):
        !mkdir {ftmp}
    fa = f"{f}/{str(audio_file).split('/')[1].split('.')[0]}.wav"
    tmp = {"name": f.split("/")[1], "og_audio": fa, "tmp_folder": ftmp}
    audio_files.append(tmp)
    # !ffmpeg -y -i {audio_file} -ss 00:00:00 -to 00:00:30 {fa} # create files which are only 2min long to test
    !ffmpeg -y -i {audio_file} {fa}

koschi
ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx

In [17]:
audio_files

[{'name': 'koschi',
  'og_audio': 'data/koschi/koschi.wav',
  'tmp_folder': 'data/koschi/tmp'}]

# Append time before actual audio, else pyannot might not pick up the first 30 seconds

In [18]:
spacermilli = 2000
for file in audio_files:
    processed_audio_file = file["og_audio"]
    processed_audio_file2 = file["og_audio"].split(".")[0] +  "_prep.wav"
    file["proc_audio"] = processed_audio_file2
    spacer = AudioSegment.silent(duration=spacermilli)
    audio = AudioSegment.from_wav(processed_audio_file) 
    audio = spacer.append(audio, crossfade=0)
    audio.export(processed_audio_file2, format='wav')
audio_files

[{'name': 'koschi',
  'og_audio': 'data/koschi/koschi.wav',
  'tmp_folder': 'data/koschi/tmp',
  'proc_audio': 'data/koschi/koschi_prep.wav'}]

# Init Pyannot pipeline for speaker identification and whisper for transcription

In [19]:

pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1.1", use_auth_token=hf_token)

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.0.6. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint --file ../../.cache/torch/pyannote/models--pyannote--segmentation/snapshots/c4c8ceafcbb3a7a280c2d357aee9fbc9b0be7f9b/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 2.1.1. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.0.1+cu117. Bad things might happen unless you revert torch to 1.x.


In [20]:
# Takes a bit
model = whisper.load_model('large') # Can use 'small' or 'base' if used on a local PC

# Create diarization file (when does a new speaker speak) - this will take a long while

In [21]:
def create_diarization_file(audiofile, numspeaker):
    if numspeaker == None:
        dz = pipeline(audiofile)  
    else:
        dz = pipeline(audiofile, num_speakers=numspeaker) 
    pipeline.to(device) # this should make the pipeline use the GPU, but times are still too slow IMO 
    diarization_file = audiofile.split(".")[0] + "_dia.txt"
    print(diarization_file)
    if os.path.isfile(diarization_file):
        return diarization_file
    with open(diarization_file, "w") as text_file:
        text_file.write(str(dz))
    return diarization_file

def millisec(timeStr):
    spl = timeStr.split(":")
    s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
    return s

In [22]:
%%time
for file in audio_files:
    file["dia_file"] = create_diarization_file(file["proc_audio"], NUM_SPEAKERS)
audio_files


KeyboardInterrupt



# Create audio segments based on the previous timestamps

In [None]:
def create_segments(file):
    dzs = open(file["dia_file"]).read().splitlines()
    groups = []
    g = []
    lastend = 0
    for d in dzs:   
        if g and (g[0].split()[-1] != d.split()[-1]):      #same speaker
            groups.append(g)
            g = []

        g.append(d)

        end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=d)[1]
        end = millisec(end)
        if (lastend > end):       #segment engulfed by a previous segment
            groups.append(g)
            g = [] 
        else:
            lastend = end
    if g:
        groups.append(g)
    audio = AudioSegment.from_wav(file["proc_audio"])
    
    gidx = -1
    tmp_files = []
    for g in groups:
        start = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
        end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[-1])[1]
        start = millisec(start) #- spacermilli
        end = millisec(end)  #- spacermilli
        gidx += 1
        fname = f"{file['tmp_folder']}/{gidx}.wav"
        tmp_files.append(fname)
        audio[start:end].export(fname, format='wav')
    print(tmp_files[:10])
    return tmp_files, groups 

In [None]:
%%time
for file in audio_files:
    file["tmp_wavs"], file["groups"] = create_segments(file)
audio_files

# Create Transcripts for each segment

In [None]:
def create_segment_transcripts(file):
    # print(len(file["tmp_wavs"]))
    for i in range(len(file["tmp_wavs"])):
        audiof = str(i) + '.wav'
        segment_audio = f"{file['tmp_folder']}/{audiof}"
        result = model.transcribe(audio=segment_audio, language='en', word_timestamps=True)#, initial_prompt=result.get('text', ""))
        o = f"{file['tmp_folder']}/"
        tmp_files = []
        # print(f"{o}{i}.json")
        with open(f"{o}{i}.json", "w") as outfile:
            tmp_files.append(str(outfile))
            json.dump(result, outfile, indent=4) 
    return tmp_files

In [None]:
%%time
for file in audio_files:
    file["tmp_jsons"] = create_segment_transcripts(file)

# Creates txt file with all spoken text split by speaker

You can edit names of the speakers below, colors are currently not used

In [None]:
speakers = {
    'SPEAKER_00': ('SPEAKER_00', '#e1ffc7', 'darkgreen'),
    'SPEAKER_01': ('SPEAKER_01', 'white', 'darkorange'),
    'SPEAKER_02': ('SPEAKER_02', 'lightblue', 'darkblue'),
    'SPEAKER_03': ('SPEAKER_03', 'lightpink', 'darkred'),
    'SPEAKER_04': ('SPEAKER_04', '#f0e68c', 'goldenrod'),
    'SPEAKER_05': ('SPEAKER_05', 'lightcoral', 'darkred'),
    'SPEAKER_06': ('SPEAKER_06', '#98fb98', 'forestgreen'),
    'SPEAKER_07': ('SPEAKER_07', '#dda0dd', 'purple'),
    'SPEAKER_08': ('SPEAKER_08', '#ffcccb', 'darkred'),
    'SPEAKER_09': ('SPEAKER_09', 'lightcyan', 'darkcyan')
}
def_boxclr = 'white'
def_spkrclr = 'orange'

In [None]:
def timeStr(t):
    return '{0:02d}:{1:02d}:{2:06.2f}'.format(round(t // 3600), round(t % 3600 // 60), t % 60)
def create_capfile(file):
    txt = list("")
    gidx = -1
    for g in file["groups"]:  
        shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
        shift = millisec(shift) - spacermilli #the start time in the original video
        shift=max(shift, 0)

        gidx += 1
        o = f"{file['tmp_folder']}/"
        captions = json.load(open(f"{o}{gidx}.json"))['segments']

        if captions:
            speaker = g[0].split()[-1]
            boxclr = def_boxclr
            spkrclr = def_spkrclr
        if speaker in speakers:
            speaker, boxclr, spkrclr = speakers[speaker] 


        for c in captions:
            start = shift + c['start'] * 1000.0 
            start = start / 1000.0   #time resolution ot youtube is Second.            
            end = (shift + c['end'] * 1000.0) / 1000.0      
            txt.append(f'[{timeStr(start)} --> {timeStr(end)}] [{speaker}] {c["text"]}\n')


    capfile = f"{file['proc_audio'].split('.')[0]}_capspeaker.txt"

    with open(capfile, "w", encoding='utf-8') as file:
        s = "".join(txt)
        file.write(s)
        print(f'captions saved to {capfile}:')
        # print(s+'\n')

# Create the actual transcribed files with speaker differentiation

In [None]:
for file in audio_files:
    create_capfile(file)

In [23]:
print("done")

done
CPU times: user 36 µs, sys: 1e+03 ns, total: 37 µs
Wall time: 48.9 µs
