### RawNetBasicBlock

In [19]:
import torch
import torch.nn as nn
from asteroid_filterbanks import Encoder, ParamSincFB

In [20]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class PreEmphasis(torch.nn.Module):
    def __init__(self, coef: float = 0.97) -> None:
        super().__init__()
        self.coef = coef
        self.register_buffer(
            "flipped_filter",
            torch.FloatTensor([-self.coef, 1.0]).unsqueeze(0).unsqueeze(0),
        )

    def forward(self, input: torch.tensor) -> torch.tensor:
        assert (
            len(input.size()) == 2
        ), "The number of dimensions of input tensor must be 2!"
        input = input.unsqueeze(1)
        input = F.pad(input, (1, 0), "reflect")
        return F.conv1d(input, self.flipped_filter)

class AFMS(nn.Module):
    def __init__(self, nb_dim: int) -> None:
        super().__init__()
        self.alpha = nn.Parameter(torch.ones((nb_dim, 1)))
        self.fc = nn.Linear(nb_dim, nb_dim)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        y = F.adaptive_avg_pool1d(x, 1).view(x.size(0), -1)
        y = self.sig(self.fc(y)).view(x.size(0), x.size(1), -1)

        x = x + self.alpha
        x = x * y
        return x

class Bottle2neck(nn.Module):
    def __init__(
        self,
        inplanes,
        planes,
        kernel_size=None,
        dilation=None,
        scale=4,
        pool=False,
    ):
        super().__init__()
        width = int(math.floor(planes / scale))
        self.conv1 = nn.Conv1d(inplanes, width * scale, kernel_size=1)
        self.bn1 = nn.BatchNorm1d(width * scale)
        self.nums = scale - 1
        convs = []
        bns = []
        num_pad = math.floor(kernel_size / 2) * dilation
        for i in range(self.nums):
            convs.append(
                nn.Conv1d(
                    width,
                    width,
                    kernel_size=kernel_size,
                    dilation=dilation,
                    padding=num_pad,
                )
            )
            bns.append(nn.BatchNorm1d(width))
        self.convs = nn.ModuleList(convs)
        self.bns = nn.ModuleList(bns)
        self.conv3 = nn.Conv1d(width * scale, planes, kernel_size=1)
        self.bn3 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU()
        self.width = width
        self.mp = nn.MaxPool1d(pool) if pool else False
        self.afms = AFMS(planes)
        if inplanes != planes:
            self.residual = nn.Sequential(
                nn.Conv1d(inplanes, planes, kernel_size=1, stride=1, bias=False)
            )
        else:
            self.residual = nn.Identity()

    def forward(self, x):
        residual = self.residual(x)
        out = self.conv1(x)
        out = self.relu(out)
        out = self.bn1(out)
        spx = torch.split(out, self.width, 1)
        for i in range(self.nums):
            if i == 0:
                sp = spx[i]
            else:
                sp = sp + spx[i]
            sp = self.convs[i](sp)
            sp = self.relu(sp)
            sp = self.bns[i](sp)
            if i == 0:
                out = sp
            else:
                out = torch.cat((out, sp), 1)
        out = torch.cat((out, spx[self.nums]), 1)
        out = self.conv3(out)
        out = self.relu(out)
        out = self.bn3(out)
        out += residual
        if self.mp:
            out = self.mp(out)
        out = self.afms(out)
        return out

### RawNet3

In [21]:
# RawNet3.py
class RawNet3(nn.Module):
    def __init__(self, block, model_scale, context, summed, C=1024, **kwargs):
        super().__init__()

        nOut = kwargs["nOut"]

        self.context = context
        self.encoder_type = kwargs["encoder_type"]
        self.log_sinc = kwargs["log_sinc"]
        self.norm_sinc = kwargs["norm_sinc"]
        self.out_bn = kwargs["out_bn"]
        self.summed = summed

        self.preprocess = nn.Sequential(
            PreEmphasis(), nn.InstanceNorm1d(1, eps=1e-4, affine=True)
        )
        self.conv1 = Encoder(
            ParamSincFB(
                C // 4,
                251,
                stride=kwargs["sinc_stride"],
            )
        )
        self.relu = nn.ReLU()
        self.bn1 = nn.BatchNorm1d(C // 4)

        self.layer1 = block(
            C // 4, C, kernel_size=3, dilation=2, scale=model_scale, pool=5
        )
        self.layer2 = block(
            C, C, kernel_size=3, dilation=3, scale=model_scale, pool=3
        )
        self.layer3 = block(C, C, kernel_size=3, dilation=4, scale=model_scale)
        self.layer4 = nn.Conv1d(3 * C, 1536, kernel_size=1)

        if self.context:
            attn_input = 1536 * 3
        else:
            attn_input = 1536
        print("self.encoder_type", self.encoder_type)
        if self.encoder_type == "ECA":
            attn_output = 1536
        elif self.encoder_type == "ASP":
            attn_output = 1
        else:
            raise ValueError("Undefined encoder")

        self.attention = nn.Sequential(
            nn.Conv1d(attn_input, 128, kernel_size=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, attn_output, kernel_size=1),
            nn.Softmax(dim=2),
        )

        self.bn5 = nn.BatchNorm1d(3072)

        self.fc6 = nn.Linear(3072, nOut)
        self.bn6 = nn.BatchNorm1d(nOut)

        self.mp3 = nn.MaxPool1d(3)

    def forward(self, x):
        """
        :param x: input mini-batch (bs, samp)
        """

        with torch.cuda.amp.autocast(enabled=False):
            x = self.preprocess(x)
            x = torch.abs(self.conv1(x))
            if self.log_sinc:
                x = torch.log(x + 1e-6)
            if self.norm_sinc == "mean":
                x = x - torch.mean(x, dim=-1, keepdim=True)
            elif self.norm_sinc == "mean_std":
                m = torch.mean(x, dim=-1, keepdim=True)
                s = torch.std(x, dim=-1, keepdim=True)
                s[s < 0.001] = 0.001
                x = (x - m) / s

        if self.summed:
            x1 = self.layer1(x)
            x2 = self.layer2(x1)
            x3 = self.layer3(self.mp3(x1) + x2)
        else:
            x1 = self.layer1(x)
            x2 = self.layer2(x1)
            x3 = self.layer3(x2)

        x = self.layer4(torch.cat((self.mp3(x1), x2, x3), dim=1))
        x = self.relu(x)

        t = x.size()[-1]

        if self.context:
            global_x = torch.cat(
                (
                    x,
                    torch.mean(x, dim=2, keepdim=True).repeat(1, 1, t),
                    torch.sqrt(
                        torch.var(x, dim=2, keepdim=True).clamp(
                            min=1e-4, max=1e4
                        )
                    ).repeat(1, 1, t),
                ),
                dim=1,
            )
        else:
            global_x = x

        w = self.attention(global_x)

        mu = torch.sum(x * w, dim=2)
        sg = torch.sqrt(
            (torch.sum((x**2) * w, dim=2) - mu**2).clamp(min=1e-4, max=1e4)
        )

        x = torch.cat((mu, sg), 1)

        x = self.bn5(x)

        x = self.fc6(x)

        if self.out_bn:
            x = self.bn6(x)

        return x

In [22]:
def MainModel(**kwargs):

    model = RawNet3(
        Bottle2neck, model_scale=8, context=True, summed=True, **kwargs
    )
    return model

#### Load pre-trained weights from the submodule

In [23]:
model = MainModel(nOut=256, encoder_type="ECA", log_sinc=True, norm_sinc="mean", out_bn=False, sinc_stride=10)
model.load_state_dict(torch.load("./models/model.pt", map_location=lambda storage, loc: storage)["model"])
model.eval()

self.encoder_type ECA


  model.load_state_dict(torch.load("./models/model.pt", map_location=lambda storage, loc: storage)["model"])


RawNet3(
  (preprocess): Sequential(
    (0): PreEmphasis()
    (1): InstanceNorm1d(1, eps=0.0001, momentum=0.1, affine=True, track_running_stats=False)
  )
  (conv1): Encoder(
    (filterbank): ParamSincFB()
  )
  (relu): ReLU()
  (bn1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Bottle2neck(
    (conv1): Conv1d(256, 1024, kernel_size=(1,), stride=(1,))
    (bn1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (convs): ModuleList(
      (0-6): 7 x Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(2,), dilation=(2,))
    )
    (bns): ModuleList(
      (0-6): 7 x BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (conv3): Conv1d(1024, 1024, kernel_size=(1,), stride=(1,))
    (bn3): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU()
    (mp): MaxPool1d(kernel_size=5, stride=5, padding=0, dilation=1, ceil_

#### Extract embeddings of audio file

This function takes an audio file, splits it into segments, extracts the embeddings using the RawNet3 model, and returns the embeddings.

An embedding in the context of RawNet3 is a compact and discriminative vector representation of an audio segment that captures the unique characteristics of the speaker, allowing for various speech processing and analysis tasks.

In [24]:
import librosa
import numpy as np
import soundfile as sf

def extract_speaker_embd(model, audio_file, n_samples=48000, n_segments=10, gpu=False):
    audio, sample_rate = librosa.load(audio_file, sr=16000, mono=True)
    
    if len(audio) < n_samples:
        shortage = n_samples - len(audio) + 1
        audio = np.pad(audio, (0, shortage), "wrap")
    
    audios = []
    startframe = np.linspace(0, len(audio) - n_samples, num=n_segments)
    for asf in startframe:
        audios.append(audio[int(asf):int(asf) + n_samples])
    
    audios = torch.from_numpy(np.stack(audios, axis=0).astype(np.float32))
    if gpu:
        audios = audios.to("cuda")
    with torch.no_grad():
        output = model(audios)
    
    return output


In [25]:
audio_file = "./sample1.wav"
embeddings = extract_speaker_embd(model, audio_file, n_samples=48000, n_segments=10)
print("Embeddings shape:", embeddings.shape)

  with torch.cuda.amp.autocast(enabled=False):


Embeddings shape: torch.Size([10, 256])


In [26]:
import os
import itertools
from tqdm import tqdm
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np

# Cargar el conjunto de datos VoxCeleb
dataset = tfds.load('voxceleb', split='train[:10%]', shuffle_files=True, as_supervised=True)

# Función para extraer embeddings de un archivo de audio
def extract_speaker_embd(model, audio, n_samples=48000):
    # Preprocesar el audio
    audio = audio[:n_samples]
    if len(audio) < n_samples:
        audio = np.pad(audio, (0, n_samples - len(audio)), 'wrap')
    
    # Convertir el audio a un tensor de TensorFlow
    audio_tensor = tf.convert_to_tensor(audio, dtype=tf.float32)
    audio_tensor = tf.expand_dims(audio_tensor, axis=0)
    
    # Obtener los embeddings utilizando el modelo
    embeddings = model(audio_tensor)
    
    return embeddings

# Cargar los trials de Vox1-O
with open("./trials/cleaned_test_list.txt", "r") as f:
    trials = f.readlines()

# Extraer embeddings para cada archivo de audio único en los trials
files = list(itertools.chain(*[x.strip().split()[-2:] for x in trials]))
setfiles = list(set(files))
setfiles.sort()
print("setfiles: ", setfiles)

embd_dic = {}
missing_files = []

for f in tqdm(setfiles):
    try:
        # Obtener el ejemplo de audio correspondiente al archivo
        audio = next(iter(dataset.filter(lambda x: x[1].numpy().decode('utf-8') == f)))[0].numpy()
        
        # Extraer los embeddings del audio
        embeddings = extract_speaker_embd(model, audio, n_samples=48000)
        embd_dic[f] = embeddings
    except StopIteration:
        missing_files.append(f)
        continue

print("embd_dic: ", embd_dic)

# Calcular puntuaciones de verificación para cada par de archivos en los trials
labels, scores = [], []
for line in trials:
    data = line.split()
    try:
        ref_feat = tf.nn.l2_normalize(embd_dic[data[1]], axis=1)
        com_feat = tf.nn.l2_normalize(embd_dic[data[2]], axis=1)
    except KeyError:
        missing_files.append(line.strip())
        continue
    
    dist = tf.reduce_mean(tf.square(ref_feat - com_feat), axis=1)
    score = -1.0 * tf.reduce_mean(dist).numpy()
    labels.append(int(data[0]))
    scores.append(score)

# Imprimir el resumen de archivos faltantes
if missing_files:
    print(f"Skipped {len(missing_files)} trials due to missing audio files.")
    print("Missing files:")
    for file in missing_files:
        print(file)


OSError: Not enough disk space. Needed: 107.98 GiB (download: 4.68 MiB, generated: 107.98 GiB)

In [None]:
import os
import itertools
from tqdm import tqdm
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np

# Cargar el conjunto de datos VoxCeleb
dataset = tfds.load('voxceleb', split='train', shuffle_files=True, as_supervised=True, download_and_prepare_kwargs={'max_examples_per_split': 10000})

# Función para extraer embeddings de un archivo de audio
def extract_speaker_embd(model, audio, n_samples=48000):
    # Preprocesar el audio
    audio = audio[:n_samples]
    if len(audio) < n_samples:
        audio = np.pad(audio, (0, n_samples - len(audio)), 'wrap')
    
    # Convertir el audio a un tensor de TensorFlow
    audio_tensor = tf.convert_to_tensor(audio, dtype=tf.float32)
    audio_tensor = tf.expand_dims(audio_tensor, axis=0)
    
    # Obtener los embeddings utilizando el modelo
    embeddings = model(audio_tensor)
    
    return embeddings

# Cargar los trials de Vox1-O
with open("./trials/cleaned_test_list.txt", "r") as f:
    trials = f.readlines()

# Extraer embeddings para cada archivo de audio único en los trials
files = list(itertools.chain(*[x.strip().split()[-2:] for x in trials]))
setfiles = list(set(files))
setfiles.sort()
print("setfiles: ", setfiles)

embd_dic = {}
missing_files = []

for f in tqdm(setfiles):
    try:
        # Obtener el ejemplo de audio correspondiente al archivo
        audio = next(iter(dataset.filter(lambda x: x[1].numpy().decode('utf-8') == f)))[0].numpy()
        
        # Extraer los embeddings del audio
        embeddings = extract_speaker_embd(model, audio, n_samples=48000)
        embd_dic[f] = embeddings
    except StopIteration:
        missing_files.append(f)
        continue

print("embd_dic: ", embd_dic)

# Calcular puntuaciones de verificación para cada par de archivos en los trials
labels, scores = [], []
for line in trials:
    data = line.split()
    try:
        ref_feat = tf.nn.l2_normalize(embd_dic[data[1]], axis=1)
        com_feat = tf.nn.l2_normalize(embd_dic[data[2]], axis=1)
    except KeyError:
        missing_files.append(line.strip())
        continue
    
    dist = tf.reduce_mean(tf.square(ref_feat - com_feat), axis=1)
    score = -1.0 * tf.reduce_mean(dist).numpy()
    labels.append(int(data[0]))
    scores.append(score)

# Imprimir el resumen de archivos faltantes
if missing_files:
    print(f"Skipped {len(missing_files)} trials due to missing audio files.")
    print("Missing files:")
    for file in missing_files:
        print(file)


In [None]:
import os
import itertools
from tqdm import tqdm
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np

# Cargar el conjunto de datos VoxCeleb
dataset = tfds.load('voxceleb', split='train')

# Función para extraer embeddings de un archivo de audio
def extract_speaker_embd(model, audio, n_samples=48000):
    # Preprocesar el audio
    audio = audio[:n_samples]
    if len(audio) < n_samples:
        audio = np.pad(audio, (0, n_samples - len(audio)), 'wrap')
    
    # Convertir el audio a un tensor de TensorFlow
    audio_tensor = tf.convert_to_tensor(audio, dtype=tf.float32)
    audio_tensor = tf.expand_dims(audio_tensor, axis=0)
    
    # Obtener los embeddings utilizando el modelo
    embeddings = model(audio_tensor)
    
    return embeddings

# Cargar los trials de Vox1-O
with open("./trials/cleaned_test_list.txt", "r") as f:
    trials = f.readlines()

# Extraer embeddings para cada archivo de audio único en los trials
files = list(itertools.chain(*[x.strip().split()[-2:] for x in trials]))
setfiles = list(set(files))
setfiles.sort()
print("setfiles: ", setfiles)

embd_dic = {}
missing_files = []

for f in tqdm(setfiles):
    try:
        # Obtener el ejemplo de audio correspondiente al archivo
        example = next(iter(dataset.filter(lambda x: x['youtube_id'] == f)))
        audio = example['audio'].numpy()
        
        # Extraer los embeddings del audio
        embeddings = extract_speaker_embd(model, audio, n_samples=48000)
        embd_dic[f] = embeddings
    except StopIteration:
        missing_files.append(f)
        continue

print("embd_dic: ", embd_dic)

# Calcular puntuaciones de verificación para cada par de archivos en los trials
labels, scores = [], []
for line in trials:
    data = line.split()
    try:
        ref_feat = tf.nn.l2_normalize(embd_dic[data[1]], axis=1)
        com_feat = tf.nn.l2_normalize(embd_dic[data[2]], axis=1)
    except KeyError:
        missing_files.append(line.strip())
        continue
    
    dist = tf.reduce_mean(tf.square(ref_feat - com_feat), axis=1)
    score = -1.0 * tf.reduce_mean(dist).numpy()
    labels.append(int(data[0]))
    scores.append(score)

# Imprimir el resumen de archivos faltantes
if missing_files:
    print(f"Skipped {len(missing_files)} trials due to missing audio files.")
    print("Missing files:")
    for file in missing_files:
        print(file)
