In [None]:
import json
import os
import pickle
import re
from glob import glob

import editdistance
import numpy as np
import pandas as pd
from pyannote.audio import Model
from pyannote.audio.pipelines import VoiceActivityDetection
from pyannote.core import notebook, Segment
from pydub import AudioSegment
from tqdm import tqdm
from whisper_model import WhisperASR
import matplotlib.pyplot as plt


SAMPLING_RATE = 16000

import torch
torch.set_num_threads(1)

from IPython.display import Audio
from pprint import pprint
USE_ONNX = False # change this to True if you want to test onnx model
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                              model='silero_vad',
                              force_reload=True,
                              onnx=USE_ONNX)

(get_speech_timestamps,
 save_audio,
 read_audio,
 VADIterator,
 collect_chunks) = utils




modelPyannote = Model.from_pretrained("pyannote/segmentation", use_auth_token="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")

pipeline = VoiceActivityDetection(segmentation=modelPyannote)
HYPER_PARAMETERS = {
    # onset/offset activation thresholds
    "onset": 0.5,
    "offset": 0.5,
    # remove speech regions shorter than that many seconds.
    "min_duration_on": 0.0,
    # fill non-speech regions shorter than that many seconds.
    "min_duration_off": 0.05,
}
pipeline.instantiate(HYPER_PARAMETERS)

padding = 0.025

def edit_distance(s1, s2):
    return editdistance.eval(s1, s2)


def format_int(i):
    return str(i).zfill(8)


# trim the audio using start end end time in secs
def trim_audio(path, start, end, out_path):
    sound = AudioSegment.from_file(path, format="wav")
    # make sure that the start and end are in between the audio duration
    start = max(0, start)
    end = min(end, len(sound) / 1000)
    trimmed_sound = sound[start * 1000 : end * 1000]
    trimmed_sound.export(out_path, format="wav")
    return out_path, start, end

def run_pyannote_vad(file):
    vad_segments = pipeline(file)
    pyannote_timeline = vad_segments.get_timeline().support()
    response_timeline = []
    for segment in pyannote_timeline:
        start, end = list(segment)
        response_timeline.append((start, end))
    # get start of first and end of last
    if len(response_timeline) > 0:
        start = response_timeline[0][0]
        end = response_timeline[-1][1]
    else:
        start = 0
        end = 0
    response_timeline = [(start, end)]
    return response_timeline

def run_silerio_vad(file):
    wav = read_audio(file, sampling_rate=SAMPLING_RATE)
    # get speech timestamps from full audio file
    silerio_timeline = get_speech_timestamps(wav, model, sampling_rate=SAMPLING_RATE)
    # show the timestamps as s
    silerio_timeline = [(segment["start"] / SAMPLING_RATE, segment["end"] / SAMPLING_RATE) for segment in silerio_timeline]

    # get start of first and end of last
    if len(silerio_timeline) > 0:
        start = silerio_timeline[0][0]
        end = silerio_timeline[-1][1]
    else:
        start = 0
        end = 0
    silerio_timeline = [(start, end)]
    return silerio_timeline


In [None]:
files_folder = "/data/tts-qa/tts-data/French(Dorsaf)/raw"
files = glob(os.path.join(files_folder, "*.wav"))


In [None]:
# randomly select 10 files

np.random.seed(10)
selected_files = np.random.choice(files, 50, replace=False)

selected_files = [
    "/data/tts-qa/tts-data/French(Dorsaf)/raw/FR00000026.wav",
    "/data/tts-qa/tts-data/French(Dorsaf)/raw/FR00000032.wav",
    "/data/tts-qa/tts-data/French(Dorsaf)/raw/FR00000033.wav",
    "/data/tts-qa/tts-data/French(Dorsaf)/raw/FR00000035.wav",
]


In [None]:

def my_custom_vad(pyannote_timeline, silerio_timeline, waveform, sample_rate, energy_threshold=0.9, window_size=0.02):
    merged_timeline = []

    # Your logic for merging or comparing pyannote and silerio timelines
    for pyannote_segment in pyannote_timeline:
        for silerio_segment in silerio_timeline:
            pyannote_start, pyannote_end = list(pyannote_segment)
            silerio_start, silerio_end = list(silerio_segment)

            # If the segments are close enough, merge them
            if abs(pyannote_start - silerio_start) < 0.2 and abs(pyannote_end - silerio_end) < 0.2:
                merged_start = min(pyannote_start, silerio_start)
                merged_end = max(pyannote_end, silerio_end)
                merged_timeline.append((merged_start, merged_end))
            else:
                # Divide the segment into smaller windows and check energy
                segment_start = min(pyannote_start, silerio_start)
                segment_end = max(pyannote_end, silerio_end)
                num_windows = int((segment_end - segment_start) / window_size)
                for i in range(num_windows):
                    window_start = int((segment_start + i * window_size) * sample_rate)
                    window_end = int(window_start + window_size * sample_rate)
                    window_samples = waveform[window_start:window_end]
                    window_energy = np.sum(window_samples ** 2) / len(window_samples)

                    if window_energy > energy_threshold:
                        merged_timeline.append((segment_start + i * window_size, segment_start + (i + 1) * window_size, window_energy))
    
    custom_timeline = [(merged_timeline[0][0], merged_timeline[-1][1])]
    return custom_timeline


In [None]:

for file in selected_files:
    # load the waveform
    audio = AudioSegment.from_file(file, format="wav")
    waveform = np.array(audio.get_array_of_samples())
    sample_rate = audio.frame_rate

    pyannote_timeline = run_pyannote_vad(file)
    silerio_timeline = run_silerio_vad(file)
    

    custom_timeline = my_custom_vad(pyannote_timeline, silerio_timeline, waveform, sample_rate)

    audio = AudioSegment.from_file(file, format="wav")
    waveform = np.array(audio.get_array_of_samples())
    # Calculate time vector
    time_vector = np.linspace(0, len(waveform) / sample_rate, num=len(waveform))


    fig = plt.figure(figsize=(20, 5))
    plt.yticks([])
    plt.plot(time_vector, waveform)
    # add the filename as title
    plt.title(os.path.basename(file))
#     for segment in silerio_timeline:
#         start, end = list(segment)
#         start = max(0, start-padding)
#         end = min(len(waveform) / sample_rate, end+padding)
#         plt.axvspan(start, end , color="red", alpha=0.5, label="SILERIO-VAD")
    for segment in pyannote_timeline:
        start, end = list(segment)
        start = max(0, start-padding)
        end = min(len(waveform) / sample_rate, end+padding)
        plt.axvspan(start, end, color="green", alpha=0.3, label="PYANNOTE")
    
    
    for segment in custom_timeline:
        start, end = list(segment)
        plt.axvspan(start, end, color="red", alpha=0.3, label="MY_CUSTOM_VAD")


    plt.legend()
    plt.show()    
    