In [1]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import librosa
import librosa.display
from scipy.signal import lfilter,correlate
import tensorflow.keras.layers as tfl
import tensorflow as tf
from keras.callbacks import ReduceLROnPlateau
from PIL import Image

2024-06-21 20:03:13.429461: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-21 20:03:13.429584: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-21 20:03:13.562659: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
def pre_emphasis(signal,factor=0.97):
    return np.append(signal[0], signal[1:] - factor * signal[:-1])

In [3]:
def inverse_filtering(signal, lpc_coeffs):
    return lfilter(np.concatenate(([1], -lpc_coeffs[1:])), [1], signal)

In [4]:
def compute_lpc(signal, order):
    autocorr = correlate(signal, signal, mode='full')
    autocorr = autocorr[len(signal)-1:len(signal)+order]
    
    a = np.concatenate(([1], -autocorr[1:order+1]))
    b = [1]
    
    lpc_coeffs = lfilter(b, a, signal)
    
    lpc_coeffs = np.concatenate(([1], -lpc_coeffs[1:order+1]))
    
    del autocorr,a,b
    
    return lpc_coeffs

In [5]:
def lpc_to_cepstrum(lpc_coeffs):
    return np.fft.ifft(np.log(np.abs(np.fft.fft(lpc_coeffs))))

In [6]:
import gc
from IPython.display import clear_output

def create_spectrograms(train_folder, create_folder, verbose=False, speakers=50, utterances=10, sr=7000, frame_length=2048, hop_length=512, lpc_order=16):
    spc_folder = os.path.join(create_folder, "spectrogram")
    vt_folder = os.path.join(create_folder, "vocal_tract")
    glot_folder = os.path.join(create_folder, "glottal")
    
    total_speaker = 0
    for speaker in os.listdir(train_folder):
        total_utterances = 0
        speaker_folder_spc = os.path.join(spc_folder, speaker)
        speaker_folder_vt = os.path.join(vt_folder, speaker)
        speaker_folder_glot = os.path.join(glot_folder, speaker)
        os.makedirs(speaker_folder_spc, exist_ok=True)
        os.makedirs(speaker_folder_vt, exist_ok=True)
        os.makedirs(speaker_folder_glot, exist_ok=True)
        
        for vidID in os.listdir(os.path.join(train_folder, speaker)):
            for file in os.listdir(os.path.join(train_folder, speaker, vidID)):
                if file.endswith(".wav"):
                    wav_file_path = os.path.join(train_folder, speaker, vidID, file)
                    
                    y, sr = librosa.load(wav_file_path, sr=sr)
                    
#                     y_preemphasized=pre_emphasis(y)
                    y_preemphasized=y
                    
                    frames = librosa.util.frame(y_preemphasized, frame_length=frame_length, hop_length=hop_length).T
                    del y_preemphasized
                    
                    window = np.hamming(frame_length)
                    frames_windowed = frames * window  
                    del frames
                    
                    lpc_coeffs = []
                    for frame in frames_windowed:
                        frame=np.array(frame)
                        coeff = compute_lpc(frame, lpc_order)
                        lpc_coeffs.append(coeff)
                        
                    glottal_waveforms = [inverse_filtering(frame, coeff) for frame, coeff in zip(frames_windowed, lpc_coeffs)]
                    glottal_waveforms_avg = np.mean(np.abs(np.array(glottal_waveforms)), axis=0)
                    del glottal_waveforms
                    
                    residuals = []
                    for i, coeff in enumerate(lpc_coeffs):
                        frame = frames_windowed[i]
                        residual = lfilter(coeff, [1.0], frame)
                        residuals.append(residual)
                        
                    residual_avg = np.mean(np.abs(np.array(residuals)), axis=0)
                    
                    del frames_windowed, lpc_coeffs, residuals
                    gc.collect()

                    try:
                        plt.figure(figsize=(5, 5))
                        ax = plt.axes()
                        ax.set_axis_off()
                        plt.set_cmap('hot')
                        amp=np.abs(librosa.stft(y))
                        D = librosa.amplitude_to_db(amp, ref=np.max)
                        del y,amp
                        librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
                        output_path = os.path.join(speaker_folder_spc, "spc"+str(total_utterances+1) + ".png")
                        plt.savefig(output_path, bbox_inches='tight', transparent=True, pad_inches=0.0)
                        plt.close('all')
                        del D,ax,output_path
                        gc.collect()

                        plt.figure(figsize=(5, 5))
                        ax = plt.axes()
                        ax.set_axis_off()
                        plt.set_cmap('hot')
                        amp=np.abs(librosa.stft(residual_avg))
                        D_residual = librosa.amplitude_to_db(amp, ref=np.max)
                        del amp
                        librosa.display.specshow(D_residual, sr=sr, x_axis='time', y_axis='log')
                        output_path = os.path.join(speaker_folder_vt, "vt"+str(total_utterances+1) + ".png")
                        plt.savefig(output_path, bbox_inches='tight', transparent=True, pad_inches=0.0)
                        plt.close('all')
                        del D_residual,ax,output_path
                        gc.collect()

                        plt.figure(figsize=(5, 5))
                        ax = plt.axes()
                        ax.set_axis_off()
                        plt.set_cmap('hot')
                        amp=np.abs(librosa.stft(glottal_waveforms_avg))
                        D_glot = librosa.amplitude_to_db(amp, ref=np.max)
                        del amp
                        librosa.display.specshow(D_glot, sr=sr, x_axis='time', y_axis='log')
                        output_path = os.path.join(speaker_folder_glot, "glot"+str(total_utterances+1) + ".png")
                        plt.savefig(output_path, bbox_inches='tight', transparent=True, pad_inches=0.0)
                        plt.close('all')
                        del D_glot,ax,output_path
                        gc.collect()
                        
#                         clear_output()
                    
                    except OSError as e:
                        print(f"Error saving spectrogram for {wav_file_path}: {e}")
                    
                    del residual_avg, glottal_waveforms_avg
                    gc.collect()
    
                    total_utterances += 1
                    if total_utterances == utterances:
                        break
            
            if total_utterances == utterances:
                break
        
        total_speaker += 1
        if total_speaker != 0 and total_speaker % 1 == 0 and verbose:
            print(f"{total_speaker} speakers completed.\n")
        
        if total_speaker == speakers:
            break

In [2]:
raw_dataset_loc="/kaggle/input/voxceleb1train/wav"
save_loc="/kaggle/working/"
num_speakers=10
num_utterances=100

In [8]:
create_spectrograms(raw_dataset_loc,save_loc,True,num_speakers,num_utterances)

1 speakers completed.

2 speakers completed.

3 speakers completed.

4 speakers completed.

5 speakers completed.

6 speakers completed.

7 speakers completed.

8 speakers completed.

9 speakers completed.

10 speakers completed.



In [3]:
dataset_path_spc="/kaggle/working/spectrogram"
dataset_path_vt="/kaggle/working/vocal_tract"
dataset_path_glot="/kaggle/working/glottal"
folders=os.listdir(dataset_path_spc)

In [4]:
dict={}
i=0
for val in folders:
    dict[val]=i
    i=i+1

dict

{'id11079': 0,
 'id10719': 1,
 'id10036': 2,
 'id10480': 3,
 'id10484': 4,
 'id10459': 5,
 'id10116': 6,
 'id10061': 7,
 'id11123': 8,
 'id11250': 9}

In [5]:
dataset_path_spc="/kaggle/working/spectrogram"
dataset_path_vt="/kaggle/working/vocal_tract"
dataset_path_glot="/kaggle/working/glottal"
folders=os.listdir(dataset_path_spc)

# Initializing training and test dataset
X_train=[[],[],[]]
y_train=[[],[],[]]
X_test=[[],[],[]]
y_test=[[],[],[]]

# Split the dataset into training and test set.
num=np.random.rand(num_utterances*num_speakers)
mask=num<0.2
split=mask.astype(int)

for dirs in folders:
    i=0
    for img in os.listdir(os.path.join(dataset_path_spc,dirs)):
        image=Image.open(os.path.join(dataset_path_spc,dirs,img))
        new_img=image.resize((200,200))
        tmp_array=np.array(new_img)/255.
        if split[i]==0:
            X_train[0].append(tmp_array)
            y_train[0].append(dict[str(dirs)])
        else:
            X_test[0].append(tmp_array)
            y_test[0].append(dict[str(dirs)])
            
        i+=1
        
    i=0    
    for img in os.listdir(os.path.join(dataset_path_vt,dirs)):
        image=Image.open(os.path.join(dataset_path_vt,dirs,img))
        new_img=image.resize((200,200))
        tmp_array=np.array(new_img)/255.
        if split[i]==0:
            X_train[1].append(tmp_array)
            y_train[1].append(dict[str(dirs)])
        else:
            X_test[1].append(tmp_array)
            y_test[1].append(dict[str(dirs)])
            
        i+=1
            
    i=0
    for img in os.listdir(os.path.join(dataset_path_glot,dirs)):
        image=Image.open(os.path.join(dataset_path_glot,dirs,img))
        new_img=image.resize((200,200))
        tmp_array=np.array(new_img)/255.
        if split[i]==0:
            X_train[2].append(tmp_array)
            y_train[2].append(dict[str(dirs)])
        else:
            X_test[2].append(tmp_array)
            y_test[2].append(dict[str(dirs)])
        
        i+=1

In [6]:
def create_train_triplets(X_train, y_train, num_triplets):
    triplets = []
    labels = np.unique(y_train)
    
    for _ in range(num_triplets):
        anchor_label = np.random.choice(labels)
        negative_label = np.random.choice(labels[labels != anchor_label])
        
        anchor_indices = np.where(y_train == anchor_label)[0]
        positive_indices = np.where(y_train == anchor_label)[0]
        negative_indices = np.where(y_train == negative_label)[0]
        
        anchor = X_train[np.random.choice(anchor_indices)]
        positive = X_train[np.random.choice(positive_indices)]
        negative = X_train[np.random.choice(negative_indices)]
        
        triplets.append((anchor, positive, negative))
    
    return np.array(triplets)

In [18]:
def create_test_triplets(X_test, y_test, num_triplets):
    triplets = []
    range_test = len(y_test)
    idx = np.arange(range_test)
    
    for _ in range(num_triplets):
        label_1 = np.random.choice(idx)
        label_2 = np.random.choice(idx)
        
        anchor = X_test[label_1]
        inp = X_test[label_2]
        out = 1 if y_test[label_1] == y_test[label_2] else 0
        
        triplets.append((anchor, inp, out))
        
    return np.array(triplets, dtype=object)  

In [8]:
def convolutional_model(input_shape):
    input_img = tf.keras.Input(shape=input_shape)
    Z1=tfl.Conv2D(filters=32,kernel_size=(3,3),strides=(1,1),padding='same')(input_img)
    A1=tfl.ReLU()(Z1)
    P1=tfl.MaxPool2D(pool_size=(4,4),padding='same')(A1)
    Z2=tfl.Conv2D(filters=64,kernel_size=(2,2),strides=(1,1),padding='same')(P1)
    A2=tfl.ReLU()(Z2)
    P2=tfl.MaxPool2D(pool_size=(4,4),padding='same')(A2)
    Z3=tfl.Conv2D(filters=128,kernel_size=(2,2),strides=(1,1),padding='same')(P2)
    A3=tfl.ReLU()(Z3)
    P3=tfl.MaxPool2D(pool_size=(4,4),padding='same')(A3)
    F=tfl.Flatten()(P3)
    D1=tfl.Dense(128)(F)
        
    model = tf.keras.Model(inputs=input_img, outputs=D1)
    return model

input_shape = (200, 200, 4)
base_model = convolutional_model(input_shape)

In [9]:
def triplet_loss(y_true, y_pred, margin=1.0):
    anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]
    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)
    loss = tf.maximum(pos_dist - neg_dist + margin, 0.0)
    return tf.reduce_mean(loss)


In [10]:
anchor_input = tfl.Input(shape=input_shape, name='anchor_input')
positive_input = tfl.Input(shape=input_shape, name='positive_input')
negative_input = tfl.Input(shape=input_shape, name='negative_input')

encoded_anchor = base_model(anchor_input)
encoded_positive = base_model(positive_input)
encoded_negative = base_model(negative_input)

triplet_model_spc = tf.keras.Model(inputs=[anchor_input, positive_input, negative_input],
                      outputs=[encoded_anchor, encoded_positive, encoded_negative])

triplet_model_spc.compile(optimizer='adam', loss=triplet_loss)

triplet_model_vt = tf.keras.Model(inputs=[anchor_input, positive_input, negative_input],
                      outputs=[encoded_anchor, encoded_positive, encoded_negative])

triplet_model_vt.compile(optimizer='adam', loss=triplet_loss)

triplet_model_glot = tf.keras.Model(inputs=[anchor_input, positive_input, negative_input],
                      outputs=[encoded_anchor, encoded_positive, encoded_negative])

triplet_model_glot.compile(optimizer='adam', loss=triplet_loss)

In [13]:
triplets_spc=create_train_triplets(X_train[0],y_train[0],500)

anchor_images = triplets_spc[:, 0]
positive_images = triplets_spc[:, 1]
negative_images = triplets_spc[:, 2]

triplet_model_spc.fit([anchor_images, positive_images, negative_images], 
                  np.zeros((500, )),
                  batch_size=32, 
                  epochs=100)

Epoch 1/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 29ms/step - loss: 1.0041
Epoch 2/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 28ms/step - loss: 1.0015
Epoch 3/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - loss: 1.0001
Epoch 4/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step - loss: 0.9865
Epoch 5/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step - loss: 0.9802
Epoch 6/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step - loss: 0.9270
Epoch 7/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step - loss: 1.0954
Epoch 8/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - loss: 1.0078
Epoch 9/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step - loss: 1.0127
Epoch 10/100
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step - lo

<keras.src.callbacks.history.History at 0x7b0d641fce50>

In [19]:
triplets_spc_test=create_test_triplets(X_test[0],y_test[0],100)

anchor_images = triplets_spc_test[:, 0]
input_images = triplets_spc_test[:, 1]
label_test_spc = triplets_spc_test[:, 2]

In [21]:
true_pos = 0
num_triplets = 100  

triplets_test = create_test_triplets(X_test[0], y_test[0], num_triplets)

anchor_input_test = np.stack(triplets_test[:, 0])
positive_input_test = np.stack(triplets_test[:, 1])
labels_test = triplets_test[:, 2]

encoded_anchor = base_model.predict(anchor_input_test)
encoded_positive = base_model.predict(positive_input_test)

distances = np.linalg.norm(encoded_anchor - encoded_positive, axis=1)

threshold = 1.7

for i in range(num_triplets):
    if distances[i] > threshold and labels_test[i] == 1:
        true_pos += 1
    if distances[i] <= threshold and labels_test[i] == 0:
        true_pos += 1

accuracy = true_pos / num_triplets
print("Accuracy: {:.2f}".format(accuracy))


[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 273ms/step
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step 
Accuracy: 0.90


In [22]:
import zipfile

# Define the paths
source_path = '/kaggle/working'  # Source directory in Kaggle
zip_filename = 'saved_data.zip'  # Name for the zip file

# Create a zip archive
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    # Add all files in source_path to the zip
    for root, _, files in os.walk(source_path):
        for file in files:
            zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), source_path))
