In [1]:
!pip install gradio speechbrain timm



In [2]:
import gradio as gr
import librosa  # For audio processing
import torch  # Assuming a PyTorch-based speaker verification model
import os
import gradio as gr
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import io
from speechbrain.inference.speaker import EncoderClassifier
import torchaudio
from scipy.spatial.distance import mahalanobis
from sklearn.metrics import roc_curve, confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
import os
import glob

import torch
import torchaudio
import h5py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import timm
import gc

from torchaudio.datasets import VoxCeleb1Verification
from tqdm import tqdm
from torch import nn
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, roc_auc_score, confusion_matrix, ConfusionMatrixDisplay
from IPython.display import display, Audio
import torch.nn.functional as F
from torch.cuda import empty_cache
from torchvision.transforms import Compose

from sklearn.decomposition import PCA
import pickle
from torchaudio.functional import resample

In [3]:
def load_sound(file_path, new_sr=16000):
    waveform, sr = torchaudio.load(file_path, normalize=True)
    waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=new_sr)
    return waveform, new_sr

# PLDA

In [4]:
def log_likelyhood_score(enrollment_embeddings, test_embeddings, mu, F, Sigma):
    inv_Sigma = np.linalg.inv(Sigma)
    inv_phi = np.linalg.inv(F.T @ inv_Sigma @ F + np.eye(F.shape[1]))
    scores = []

    for enroll_emb, test_emb in zip(enrollment_embeddings, test_embeddings):
        enroll_emb_cent = enroll_emb - mu
        test_emb_cent = test_emb - mu
        
        v1 = inv_Sigma @ enroll_emb_cent
        v2 = inv_Sigma @ test_emb_cent

        t1 = v1.T @ inv_phi @ F.T @ v2
        t2 = v1.T @ inv_phi @ v1 / 2
        t3 = v2.T @ inv_phi @ v2 / 2
        
        score = t1 - t2 - t3
        scores.append(score)
    
    return np.array(scores)

In [5]:
PLDA_BASE_PATH = '../plda/models/'

classifier = EncoderClassifier.from_hparams(source="speechbrain/spkrec-xvect-voxceleb", savedir="pretrained_models/spkrec-xvect-voxceleb")

with open(PLDA_BASE_PATH + 'pca_model.pkl', 'rb') as file:
    pca = pickle.load(file)

plda_params = np.load(PLDA_BASE_PATH + 'xv-plda.npz')

mean = plda_params['mean']
Sigma = plda_params['Sigma']
F = plda_params['F']

THRESHOLD = -0.19

In [6]:
def verify_plda(audio1, audio2):
    emb1 = classifier.encode_batch(load_sound(audio1)[0])[0, 0].numpy()
    emb2 = classifier.encode_batch(load_sound(audio2)[0])[0, 0].numpy()
    
    en_embeddings = np.array([emb1, emb2])
    te_embeddings = np.array([emb2, emb1])

    en_embeddings, te_embeddings = pca.transform(en_embeddings), pca.transform(te_embeddings)
    
    scores = log_likelyhood_score(en_embeddings, te_embeddings, mean, F, Sigma)
    score = np.mean(scores)
    
    return score, score > THRESHOLD

In [7]:
audio1 = os.path.join(os.path.dirname("example_sounds/"), "maksym_ukr_phone.wav")
audio2 = os.path.join(os.path.dirname("example_sounds/"), "maksym_eng_comp.wav")

In [8]:
verify_plda(audio1, audio2)

(1.0970898249031573, True)

# CNN

In [9]:
class SiameseCNN(nn.Module):
    def __init__(self, backbone_name: str, backbone_pretrained: bool, res_dim: int, n_fft: int, hop_size: int, n_mels: int, mapper_dropout_p: float, power: float = 1.0, sr: int = 16000, logmel: bool = False):
        super().__init__()

        self.melspec = nn.Sequential(torchaudio.transforms.Spectrogram(
            n_fft=n_fft,
            hop_length=hop_size,
            power=power,
        ), torchaudio.transforms.MelScale(
            n_mels=n_mels,
            sample_rate=sr,
            n_stft=n_fft // 2 + 1,
            f_min=0,
        ))

        self.augm = nn.Sequential(
            torchaudio.transforms.TimeMasking(time_mask_param=40, p=0.8, iid_masks=True),
            torchaudio.transforms.FrequencyMasking(freq_mask_param=n_mels//5, iid_masks=True),
            # ToComplexTensor(),
            # RandomTimeStretch(max_size = MAX_LENGTH, n_freq=n_mels, hop_length=hop_size),
        )

        self.logmel = logmel

        self.backbone = timm.create_model(
            backbone_name,
            features_only=True,
            pretrained=backbone_pretrained,
            in_chans=1,
            exportable=True
        )

        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.mapper = nn.Sequential(
            nn.Dropout(p=mapper_dropout_p),
            nn.Linear(self.backbone.feature_info.channels()[-1], res_dim),
        )


    def forward(self, input, return_specs=False, return_augm=False):
        specs = self.melspec(input)
        if self.logmel:
            specs = torch.log10(torch.clamp(specs, min=torch.tensor(1e-3, device=specs.device)))
        
        if return_specs:
            return specs

        if self.training:
            specs = self.augm(specs)
            specs = specs.type(torch.float32)
        if return_augm:
            return specs

        # emb = self.backbone(input)[-1]
        emb = self.backbone(specs)[-1]

        bs, ch, _, _ = emb.shape
        emb = self.pool(emb)
        emb = emb.view(bs, ch)

        emb = self.mapper(emb)
        return emb


In [10]:
class SiameseHead(nn.Module):
    def __init__(self, input_dim: int, drop_p=0.25):
        super().__init__()

        self.dropout = nn.Dropout(p=drop_p)
        self.layers = nn.Sequential(
            nn.BatchNorm1d(input_dim),
            self.dropout,
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            self.dropout,
            nn.Linear(128, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            self.dropout,
            nn.Linear(32, 1)
        )


    def forward(self, input):
        return self.layers(input)


In [11]:
EMBEDDING_DIM = 256
MODEL_BASE_PATH = '../neural/models/'

model = SiameseCNN('tf_efficientnet_b0.in1k', True, EMBEDDING_DIM, 1024, 512, 128, 0.25, logmel=True)
model.load_state_dict(torch.load(MODEL_BASE_PATH + "efficientnet_1.0.8.pt", map_location=torch.device('cpu')))
model.eval()

model_head = SiameseHead(input_dim=2*EMBEDDING_DIM)
model_head.load_state_dict(torch.load(MODEL_BASE_PATH + "head_efficientnet_1.0.8.pt", map_location=torch.device('cpu')))
model_head.eval()

Unexpected keys (bn2.bias, bn2.num_batches_tracked, bn2.running_mean, bn2.running_var, bn2.weight, classifier.bias, classifier.weight, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


SiameseHead(
  (dropout): Dropout(p=0.25, inplace=False)
  (layers): Sequential(
    (0): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (1): Dropout(p=0.25, inplace=False)
    (2): Linear(in_features=512, out_features=128, bias=True)
    (3): ReLU()
    (4): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): Dropout(p=0.25, inplace=False)
    (6): Linear(in_features=128, out_features=32, bias=True)
    (7): ReLU()
    (8): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): Dropout(p=0.25, inplace=False)
    (10): Linear(in_features=32, out_features=1, bias=True)
  )
)

In [12]:
def verify_cnn(audio_path1, audio_path2):
    audio_1, _ = load_sound(audio_path1)
    audio_2, _ = load_sound(audio_path2)
    
    with torch.no_grad():
        audio_1 = audio_1.unsqueeze(0).mean(dim=1, keepdim=True)
        audio_2 = audio_2.unsqueeze(0).mean(dim=1, keepdim=True)
        out_1 = model(audio_1)
        out_2 = model(audio_2)
        out_proba1 = torch.sigmoid(model_head(torch.cat((out_1, out_2), dim=-1)))
        out_proba2 = torch.sigmoid(model_head(torch.cat((out_2, out_1), dim=-1)))

    return ((out_proba1 + out_proba2) / 2).numpy()[0][0]

In [13]:
audio_path1 = os.path.join(os.path.dirname("example_sounds/"), "maksym_eng_comp.wav")
audio_path2 = os.path.join(os.path.dirname("example_sounds/"), "maksym_ukr_phone.wav")

In [14]:
verify_cnn(audio_path1, audio_path2)

0.560362

# UI

In [15]:
def generate_spectrogram(audio, sr):
    # Generate the spectrogram
    S = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=256, fmax=8000)
    S_DB = librosa.power_to_db(S, ref=np.max)
    
    # Plot the spectrogram
    fig, ax = plt.subplots(figsize=(10, 4))
    img = librosa.display.specshow(S_DB, sr=sr, x_axis='time', y_axis='mel', fmax=8000, ax=ax)
    fig.colorbar(img, ax=ax, format='%+2.0f dB')
    
    # Convert the Matplotlib figure to a PIL Image and return it
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(fig)
    buf.seek(0)
    image = Image.open(buf)
    return image

In [16]:
def speaker_verification(audio1, audio2, method="Probabilistic Linear Discriminant Analysis  (PLDA)"):
    """Performs speaker verification using the chosen method.

    Args:
        audio (bytes): Audio data from user upload or microphone.
        method (str, optional): Verification method ("PLDA" or "Siamese CNN"). Defaults to "PLDA".

    Returns:
        str: Verification result, including speaker name and similarity score.
    """
    y1, sr1 = librosa.load(audio1, sr=16000)
    y2, sr2 = librosa.load(audio2, sr=16000)
    
    
    if method == "Siamese Convolutional Neural Network (SCNN)":
        similarity_score = verify_cnn(audio1, audio2)
        result_text = f"""
        Method: {method}
        Similaroty Score: {similarity_score*100:.0f}%
        Threshold: 70%
        
        Verification Result: {"Successful" if similarity_score > 0.7 else "Failed"}.
        """
        
    elif method == "Probabilistic Linear Discriminant Analysis  (PLDA)":
        score, verification_result = verify_plda(audio1, audio2)
        
        result_text = f"""
        Method: {method}
        Score: {score:.2f}
        Threshold: {THRESHOLD:.2f}
        
        Verification Result: {"Successful" if verification_result else "Failed"}.
        """
        
    else:
        similarity_score = "Wrong method selected."

    return generate_spectrogram(y1, sr1), generate_spectrogram(y2, sr2), result_text

In [17]:
with gr.Blocks(title="Speaker Verification Problem") as demo:
    gr.Markdown("# Speaker Verification Problem")
    gr.Markdown("## Maksym Palamariuk, Andrii Shevtsov, and Artur Shevtsov")
    
    with gr.Row():
        with gr.Column(scale=1):
            with gr.Group():
                audio1 = gr.Audio(type="filepath", label="Upload Audio Or Use Microphone")
                gr.Examples(
                    examples=glob.glob(os.path.dirname("example_sounds/") + "/*.wav"),
                    inputs=[audio1]
                ) 
                
            with gr.Group():
                audio2 = gr.Audio(type="filepath", label="Upload Another Audio")
                gr.Examples(
                    examples=glob.glob(os.path.dirname("example_sounds/") + "/*.wav"),
                    inputs=[audio2]
                )


            method = gr.Radio(choices=["Probabilistic Linear Discriminant Analysis  (PLDA)", "Siamese Convolutional Neural Network (SCNN)"], label="Method")

            verify_button = gr.Button("Verify")

        with gr.Column(scale=1):
            spec1 = gr.Image(label="Spectrogram of the first audio")
            spec2 = gr.Image(label="Spectrogram of the second audio")
            output=gr.Textbox(label="Result")

        verify_button.click(fn=speaker_verification, 
                            inputs=[audio1, audio2, method], 
                            outputs=[spec1, spec2, output])

demo.launch()

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


