In [11]:
from Dirization import preprocess_wav, VoiceEncoder
from pathlib import Path

In [12]:
#give the file path to your audio file and for the model
audio_file_path = './Data/DataForDirization/sample.wav'
wav_path = Path(audio_file_path)


model_file_path = './Models/DIRIZATIONMODEL.pt'
model_path= Path(model_file_path)

wav = preprocess_wav(wav_path)
encoder = VoiceEncoder("cpu", weights_fpath=model_path)
_, cont_embeds, wav_splits = encoder.embed_utterance(wav, return_partials=True, rate=16)
print(cont_embeds.shape)

Loaded the voice encoder model on cpu in 0.04 seconds.
(360, 256)


In [13]:
from spectralcluster import SpectralClusterer
from spectralcluster.utils import EigenGapType

clusterer = SpectralClusterer(
    min_clusters=2,
    max_clusters=100,
    eigengap_type=EigenGapType.Ratio
)
labels = clusterer.predict(cont_embeds)

In [14]:
import json
# import numpy as np

def create_labelling(labels, wav_splits):
    
    from Dirization import sampling_rate
    times = [((s.start + s.stop) / 2) / sampling_rate for s in wav_splits]
    labelling = []
    start_time = 0

    for i, time in enumerate(times):
        if i > 0 and labels[i] != labels[i - 1]:
            temp = {
                "start": float(start_time),
                "end": float(time),
                "speaker": int(labels[i - 1])
            }
            labelling.append(temp)
            start_time = time
        if i == len(times) - 1:
            temp = {
                "start": float(start_time),
                "end": float(time),
                "speaker": int(labels[i])
            }
            labelling.append(temp)

    return labelling

predicted_data = create_labelling(labels, wav_splits)

# Save the labelling to a JSON file
with open('./Output/Ourlabelling.json', 'w', encoding='utf-8') as f:
    json.dump(predicted_data, f, ensure_ascii=False, indent=4)

print("Labelling saved to labelling.json")

Labelling saved to labelling.json


In [15]:
predicted_data

[{'start': 0.0, 'end': 1.58, 'speaker': 1},
 {'start': 1.58, 'end': 3.26, 'speaker': 0},
 {'start': 3.26, 'end': 4.34, 'speaker': 1},
 {'start': 4.34, 'end': 7.58, 'speaker': 0},
 {'start': 7.58, 'end': 11.66, 'speaker': 1},
 {'start': 11.66, 'end': 11.72, 'speaker': 0},
 {'start': 11.72, 'end': 11.9, 'speaker': 1},
 {'start': 11.9, 'end': 11.96, 'speaker': 0},
 {'start': 11.96, 'end': 12.62, 'speaker': 1},
 {'start': 12.62, 'end': 15.08, 'speaker': 0},
 {'start': 15.08, 'end': 21.26, 'speaker': 1},
 {'start': 21.26, 'end': 22.34, 'speaker': 0}]

In [16]:
import json

# Function to read JSON file
def read_json(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

# Read the JSON files
reference_path = './Data/DataForDirization/audio_sample_20.json'
# file2_path = 'path/to/second_file.json'

reference_data = read_json(reference_path)
# data2 = read_json(file2_path)


In [17]:
def calculate_overlap(interval1, interval2):
    start = max(interval1['start'], interval2['start'])
    end = min(interval1['end'], interval2['end'])
    return max(0, end - start)

def calculate_diarization_errors(reference, predicted):
    FA = 0
    Miss = 0
    Error = 0
    Total = 0

    for ref in reference:
        ref_interval = {'start': ref['start'], 'end': ref['end']}
        Total += ref_interval['end'] - ref_interval['start']
        overlap_found = False

        for pred in predicted:
            pred_interval = {'start': pred['start'], 'end': pred['end']}
            overlap = calculate_overlap(ref_interval, pred_interval)

            if overlap > 0:
                overlap_found = True
                if ref['speaker'] != pred['speaker']:
                    Error += overlap
                # Reduce overlap from predicted interval to avoid double counting
                pred['start'] = max(pred['start'], ref['end'])
        
        if not overlap_found:
            Miss += ref_interval['end'] - ref_interval['start']

    for pred in predicted:
        pred_interval = {'start': pred['start'], 'end': pred['end']}
        FA += pred_interval['end'] - pred_interval['start']

    DER = (FA + Miss + Error) / Total
    return DER, FA, Miss, Error, Total


In [None]:
# Calculate DER
DER, FA, Miss, Error, Total = calculate_diarization_errors(reference_data, predicted_data)

# Print the results
print(f'DER: {DER:.4f}')
print(f'False Alarm (FA): {FA:.4f}')
print(f'Miss: {Miss:.4f}')
print(f'Error: {Error:.4f}')
print(f'Total: {Total:.4f}')