In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
%load_ext autoreload
%autoreload 2

In [73]:
from pyannote.audio              import Model
from pyannote.audio              import Inference
from huggingface_hub.hf_api      import HfFolder
from pydub.utils                 import mediainfo
from pyannote.audio.utils.signal import Binarize
from pyannote.audio.utils.signal import Peak
from pydub                       import AudioSegment
from scipy.spatial.distance      import cdist

import numpy                as np
import soundfile            as sf
import plotly.express       as px 
import pandas               as pd

import itertools
import librosa
import IPython

In [None]:
import plotly.io as pio
pio.renderers.default='notebook'

<h1 style="background-color:LightGreen;"> <center> Link </center></h1>

https://herve.niderb.fr/fastpages/2022/10/23/One-speaker-segmentation-model-to-rule-them-all

<h1 style="background-color:LightGreen;"> <center> Youtube </center></h1>

youtube:
https://www.youtube.com/watch?v=_ksvdZK3UEg

<h1 style="background-color:LightGreen;"> <center> Utils </center></h1>

In [4]:
def convert_to_16sr_file(source_path, dest_path):    
    speech, sr = librosa.load(source_path, sr=16000)
    print(sr)
    sf.write(dest_path, speech, sr)

In [5]:
def get_sample_rate(file):
    info      = mediainfo(file)
    return int(info['sample_rate'])

In [6]:
def get_wav_duration(wav_file):
    #res = librosa.get_duration(path=wav_file)
    f = sf.SoundFile(wav_file)
    return f.frames / f.samplerate

In [7]:
def get_part_of_wav(file_path, start_time_sec, end_time_sec, new_file_path):    
    t1       = start_time_sec * 1000 
    t2       = end_time_sec * 1000
    newAudio = AudioSegment.from_wav(file_path)    
    newAudio = newAudio[t1:t2]        
    newAudio.export(new_file_path, format="wav")

In [8]:
SAMPLE_WAV = "/home/amitli/Datasets/MyVoiceTests_16SR/-3dd58g7rQQ/-3dd58g7rQQ_0000.wav"
SAMPLE_WAV = "/home/amitli/Datasets/Voice/Hebrew/Police/-37_ZAraHQM.wav"
SAMPLE_WAV = "/home/amitli/Datasets/test_speaker_segmentation.wav"

SAMPLE_WAV = "/home/amitli/Datasets/SpkeakerSegmentation/amit_tv.wav"
#SAMPLE_WAV = "/home/amitli/Datasets/SpkeakerSegmentation/han2018.wav"
#SAMPLE_WAV = "/home/amitli/Datasets/SpkeakerSegmentation/test.wav"
#SAMPLE_WAV = "/home/amitli/Datasets/second_lebanon.wav"

In [None]:
#convert_to_16sr_file(SAMPLE_WAV, SAMPLE_WAV)

In [None]:
#IPython.display.Audio(SAMPLE_WAV)

In [9]:
get_sample_rate(SAMPLE_WAV), get_wav_duration(SAMPLE_WAV)

(16000, 24.9510625)

In [10]:
MY_TOKEN   = "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"
HfFolder.save_token(MY_TOKEN)

In [11]:
BATCH_AXIS   = 0
TIME_AXIS    = 1
SPEAKER_AXIS = 2

<h1 style="background-color:LightGreen;"> <center> Speaker Segmentation </center></h1>

In [12]:
segment_model     = Model.from_pretrained("pyannote/segmentation")
inference         = Inference(segment_model, duration=5.0, step=2.5)
sg_output         = inference(SAMPLE_WAV)

In [None]:
sg_output

In [None]:
# --- choose window
i              = 5 # window #6
current_window = sg_output.data[i]

# --- get relvant times and probailties
start_time     = i*2.5
end_time       = start_time + 5 
arr_time       = np.arange(start = start_time, 
                           stop  = end_time, 
                           step  = segment_model.introspection.frames.step)

print(f"start_time = {start_time}, end_time = {end_time}")

prob_len       = len(current_window[:, 0])
print(f"arr_time = {len(arr_time)}, prob_len = {prob_len}")

# --- ? the arr_time is smaller than prob_len, why ?
arr_time       = arr_time[0: prob_len]

# --- create dataframe
arr_speakers   = ["Speaker A"] * len(arr_time) + ["Speaker B"] * len(arr_time)  + ["Speaker C"] * len(arr_time)
arr_times      = np.concatenate((arr_time, arr_time, arr_time))
arr_prob       = np.concatenate((current_window[:, 0], current_window[:, 1], current_window[:, 2]))

df             = pd.DataFrame()
df["Time"]     = arr_times
df["Prob"]     = arr_prob
df["Speaker"]  = arr_speakers
fig            = px.line(df, x ="Time", y="Prob", color="Speaker")

fig.update_layout(
    title       = dict(text = "Speaker Segmentation", font=dict(size=24)),
    xaxis_title = "Time", 
    yaxis_title = "Probability"
)

fig.show()

<h1 style="background-color:LightGreen;"> <center> VAD </center></h1>

In [13]:
def myBinary(prob, onset):
    for i in range(len(prob)):
        if prob[i] < onset:
            prob[i] = None
        else:
            prob[i] = 0.5
    return prob

In [14]:
to_vad   = lambda o: np.max(o, axis=SPEAKER_AXIS, keepdims=True)
vad      = Inference("pyannote/segmentation", pre_aggregation_hook=to_vad)
vad_prob = vad(SAMPLE_WAV)
binarize = Binarize(onset=0.5)
speech   = binarize(vad_prob)

#speech.get_timeline()

In [15]:
my_vad           = myBinary(vad_prob.data.reshape(-1), 0.5)
arr_time         = np.arange(start = 0, 
                             stop  = get_wav_duration(SAMPLE_WAV), 
                             step  = segment_model.introspection.frames.step)

In [None]:
fig      = px.line(x=arr_time, y=my_vad)

fig.update_layout(
    title       = dict(text = "VAD", font=dict(size=24)),
    xaxis_title = "Time", 
    yaxis_title = "Probability"
)

fig.show()

<h1 style="background-color:LightGreen;"> <center> Change Detection </center></h1>

In [16]:
to_scd   = lambda probability: np.max(np.abs(np.diff(probability, n=1, axis=TIME_AXIS)), 
                                    axis=SPEAKER_AXIS, keepdims=True)
scd      = Inference("pyannote/segmentation", pre_aggregation_hook=to_scd)
scd_prob = scd(SAMPLE_WAV)

In [17]:
peak     = Peak(alpha=0.05)
scd_peak = peak(scd_prob).crop(speech.get_timeline())

In [None]:
scd_peak

In [None]:
#IPython.display.Audio(SAMPLE_WAV)

In [None]:
scd_prob

In [None]:
arr_time = np.arange(start = 0, 
                     stop  = get_wav_duration(SAMPLE_WAV), 
                     step  = segment_model.introspection.frames.step)
fig      = px.line(x=arr_time, y=scd_prob.data.reshape(-1))

fig.update_layout(
    title       = dict(text = "Change Detection", font=dict(size=24)),
    xaxis_title = "Time", 
    yaxis_title = "Probability"
)

fig.show()

In [19]:
peak = Peak(alpha=0.05)
res = peak(scd_prob).crop(speech.get_timeline())

In [None]:
res[6]

In [None]:
for i in range(len(res)):
    
    if res[i].duration < 1:
        continue
    
    start_time = res[i].start
    end_time   = res[i].end
    file_name  = f"/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_{i}.wav"
    
    get_part_of_wav(SAMPLE_WAV, start_time, end_time, file_name)

<h1 style="background-color:LightGreen;"> <center> Streaming (waveform input) </center></h1>

In [None]:
segment_model     = Model.from_pretrained("pyannote/segmentation")
inference         = Inference(segment_model, duration=5.0, step=2.5)
sg_output         = inference(SAMPLE_WAV)


<h1 style="background-color:LightGreen;"> <center> Embedding </center></h1>

In [26]:
emb_model     = Model.from_pretrained("pyannote/embedding")
emb_infer     = Inference(emb_model, window="whole")

In [20]:
#emb_model = Inference(model, window="whole", device="cuda")

In [None]:
embedding1 = inference("/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_11.wav")

In [68]:
arr_emb   = []
arr_title = []
for i in range(len(res)):
    
    if res[i].duration < 1:
        continue
    
    start_time = res[i].start
    end_time   = res[i].end
    file_name  = f"/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_{i}.wav"
    
    arr_emb.append(emb_infer(file_name).reshape(-1, 1))
    arr_title.append(f"{i}")

In [61]:
l = np.array(arr_emb)
l = l.squeeze()
l.shape

(5, 512)

In [62]:
import umap
reducer = umap.UMAP()
embedding = reducer.fit_transform(l)

In [69]:
df_emb = pd.DataFrame()
df_emb['x']     = embedding[:, 0]
df_emb['y']     = embedding[:, 1]
df_emb['title'] = arr_title

In [70]:
#fig  = px.scatter(x = embedding[:, 0], y = embedding[:, 1])
fig  = px.scatter(df_emb, x = "x", y = "y", color="title")
fig.show()

In [72]:
ME = ["/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_5.wav",
     "/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_6.wav",
     "/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_11.wav"]

TV = ["/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_3.wav",
     "/home/amitli/Datasets/SpkeakerSegmentation/TMP_DEL/SS_9.wav"]

In [81]:
labels = ["TV", "ME", "ME", "TV", "ME"]
for i in range(len(arr_emb)):
    for j in range(len(arr_emb)):
        if i==j:
            continue
        distance = cdist(arr_emb[i], arr_emb[j], metric="cosine")[0,0]
        if labels[i] == labels[j]:
            if distance != 0:                
                print(f"{labels[i]}, {labels[j]}, {distance}")