# Train model to recognise voice of people 

In [4]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
from enum import Enum
import pandas as pd
from pathlib import Path
from torchvision.transforms.functional import normalize
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [7]:
from src.columns.combined_triplet_dataset_column import CombinedTripletDatasetColumn
import librosa

from src.columns.combined_dataset_column import CombinedDatasetColumn
from src.transform.transform import CustomAdjustDurationTransform, ResampleTransform, ToMelSpectrogramTransform
import torch
from torch.utils.data import Dataset

from src.transform.composite_transformation import CompositeTransformation


class CombinedTripletSoundDS(Dataset):
    def __init__(
        self,
        voiceDataset,
        original_voice_data_path,
        generated_voice_data_path,
        sample_rate=16000,
        duration=3,
        transform=None
    ):
        self.voiceDataset = voiceDataset
        self.original_voice_data_path = str(original_voice_data_path)
        self.generated_voice_data_path = str(generated_voice_data_path)
        self.duration = duration
        self.sample_rate = sample_rate
        self.transform = None
        if transform is not None:
            self.transform = transform
        else:
            self.transform = CompositeTransformation(
                [
                    ResampleTransform(target_sample_rate=sample_rate),
                    CustomAdjustDurationTransform(duration_seconds=duration),
                    ToMelSpectrogramTransform(
                        sample_rate=sample_rate, n_mels=64, n_fft=512)
                ]
            )

    def __len__(self):
        return len(self.voiceDataset)

    def _get_sgram(self, audio_file):
        audio, sample_rate = librosa.load(audio_file, sr=self.sample_rate)
        spectrogram = self.transform.transform((audio, sample_rate))
        spectrogram = spectrogram.unsqueeze(0)
        return spectrogram

    def _get_sample_path(self, path, source):
        data_path = self.original_voice_data_path if source == 'original' else self.generated_voice_data_path
        return data_path + path

    def __getitem__(self, idx):
        anchor_path, pos_path, neg_path, anchor_source, pos_source, neg_source = self.voiceDataset.iloc[idx][
            [

                CombinedTripletDatasetColumn.ANCHOR_PATH,
                CombinedTripletDatasetColumn.POS_PATH,
                CombinedTripletDatasetColumn.NEG_PATH,
                CombinedTripletDatasetColumn.SOURCE_ANCHOR,
                CombinedTripletDatasetColumn.SOURCE_POS,
                CombinedTripletDatasetColumn.SOURCE_NEG,
            ]
        ].values.tolist()

        anchor_file = self._get_sample_path(anchor_path, anchor_source)
        pos_file = self._get_sample_path(pos_path, pos_source)
        neg_file = self._get_sample_path(neg_path, neg_source)
        
        anchor_sgram = self._get_sgram(anchor_file)
        pos_sgram = self._get_sgram(pos_file)
        neg_sgram = self._get_sgram(neg_file)
        return anchor_sgram, pos_sgram, neg_sgram

In [8]:

DATASET_PATH = './dataset/'
DATA_PATH = './data/'
MODEL_SAVE_PATH = './model_save/'

In [9]:

DATASET_PATH = './dataset/'
DATA_PATH = './data/'
MODEL_SAVE_PATH = './model_save/'
SAMPLE_RATE = 16000

In [10]:
from src.transform.transform import CustomAdjustDurationTransform, ResampleTransform, CustomAdjustDurationTransform, ShiftWrapperTransform, ToMelSpectrogramTransform, AirAbsorptionWrapperTransform, EqualizerWrapperTransform
from src.transform.composite_transformation import CompositeTransformation

train_transform = CompositeTransformation(
	[
		ResampleTransform(target_sample_rate=SAMPLE_RATE),
		CustomAdjustDurationTransform(duration_seconds=3),
		ShiftWrapperTransform(min_shift=-0.2, max_shift=0.2),
		AirAbsorptionWrapperTransform(),
		EqualizerWrapperTransform(),
		ToMelSpectrogramTransform(sample_rate=SAMPLE_RATE, n_mels=64, n_fft=512)
	]
)

test_transform = CompositeTransformation(
	[
		ResampleTransform(target_sample_rate=SAMPLE_RATE),
		CustomAdjustDurationTransform(duration_seconds=3),
		ShiftWrapperTransform(min_shift=-0.2, max_shift=0.2),
		AirAbsorptionWrapperTransform(),
		EqualizerWrapperTransform(),
		ToMelSpectrogramTransform(sample_rate=SAMPLE_RATE, n_mels=64, n_fft=512)
	]
)

In [23]:
train_df = pd.read_csv(DATASET_PATH + "train_combined_triplet_dataset.csv")
train_ds = CombinedTripletSoundDS(train_df, original_voice_data_path=DATA_PATH + "validated_16000/", generated_voice_data_path=DATA_PATH + "generated_16000/", transform=test_transform)

test_df = pd.read_csv(DATASET_PATH + "test_combined_triplet_dataset.csv")
test_ds = CombinedTripletSoundDS(test_df, original_voice_data_path=DATA_PATH + "validated_16000/", generated_voice_data_path=DATA_PATH + "generated_16000/", transform=test_transform)

In [31]:
class SiameseNetwork(nn.Module):
		def __init__(self):
				super(SiameseNetwork, self).__init__()
				self.conv_layers = nn.Sequential(
						nn.Conv2d(1, 32, kernel_size=3),
						nn.ReLU(inplace=True),
						nn.MaxPool2d(kernel_size=2, stride=2),
						nn.Conv2d(32, 64, kernel_size=3),
						nn.ReLU(inplace=True),
						nn.MaxPool2d(kernel_size=2, stride=2),
						nn.Conv2d(64, 128, kernel_size=3),
						nn.ReLU(inplace=True),
						nn.MaxPool2d(kernel_size=2, stride=2),
						nn.Conv2d(128, 256, kernel_size=3),
						nn.ReLU(inplace=True),
						nn.MaxPool2d(kernel_size=2, stride=2),
				)
				self.fc_layers = nn.Sequential(
						nn.Linear(4608, 1024),
						nn.ReLU(inplace=True),
				)

				self.final = nn.Linear(1024, 2)

		def forward_once(self, x):

				x = self.conv_layers(x)
				x = x.view(x.size(0), -1)
				x = self.fc_layers(x)
				return x

		def forward(self, anchor_input, pos_input, neg_input):
			anchor_output= self.forward_once(anchor_input)
			pos_output = self.forward_once(pos_input)
			neg_output = self.forward_once(neg_input)
			return  anchor_output, pos_output, neg_output

In [32]:
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=16, shuffle=False)

In [207]:

import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm


def evaluate_siamese_model_triplet_loss(model, dataloader, criterion, device='cpu'):
	model.eval()
	total_loss = 0.0

	with torch.no_grad():
		for batch in tqdm(dataloader, desc="Evaluating", leave=False):
			anchor_sgram, pos_sgram, neg_sgram = batch
			anchor_sgram = anchor_sgram.to(device)
			pos_sgram = pos_sgram.to(device)
			neg_sgram = neg_sgram.to(device)
			anchor_output, pos_output, neg_output = model(anchor_sgram, pos_sgram, neg_sgram)
			loss = criterion(anchor_output, pos_output, neg_output)
			total_loss += loss.item()

	avg_loss = total_loss / len(dataloader)

	return avg_loss

In [208]:
def train_siamese_model_triplet_loss(model, dataloader, criterion, optimizer, device='cpu'):
	model.train()
	total_loss = 0.0

	for batch in tqdm(dataloader, desc="Training", leave=False):
		anchor_sgram, pos_sgram, neg_sgram = batch
		anchor_sgram = anchor_sgram.to(device)
		pos_sgram = pos_sgram.to(device)
		neg_sgram = neg_sgram.to(device)
		optimizer.zero_grad()
		anchor_output, pos_output, neg_output = model(anchor_sgram, pos_sgram, neg_sgram)
		loss = criterion(anchor_output, pos_output, neg_output)
		loss.backward()
		optimizer.step()
		total_loss += loss.item()

	avg_loss = total_loss / len(dataloader)
	
	return avg_loss

In [206]:
class TripletLoss(torch.nn.Module):
    def __init__(self, margin=2):
        super(TripletLoss, self).__init__()
        self.triplet_loss = nn.TripletMarginLoss(margin=margin, p=2, eps=1e-7)

    def forward(self,anchor, positive, negative):
        loss=self.triplet_loss(anchor, positive, negative)
        return loss

In [209]:
num_epochs = 15
siamese_model = SiameseNetwork().to('cuda')
criterion = TripletLoss().to('cuda')
optimizer = optim.Adam(siamese_model.parameters(), lr=0.0001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [210]:
loss, accuracy, precision, recall, f1 = [], [], [], [], []
for epoch in range(num_epochs):
	_train_loss = train_siamese_model_triplet_loss(siamese_model, train_dl, criterion, optimizer, device)
	print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {_train_loss:.4f}")
	_test_loss = evaluate_siamese_model_triplet_loss(siamese_model, test_dl, criterion, device)
	print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {_test_loss:.4f}")
	loss.append([_train_loss, _test_loss])

                                                           

Epoch 1/15, Loss: 0.8520


                                                           

Epoch 1/15, Loss: 0.5343


                                                           

Epoch 2/15, Loss: 0.5305


                                                           

Epoch 2/15, Loss: 0.5567


                                                           

Epoch 3/15, Loss: 0.4521


                                                           

Epoch 3/15, Loss: 0.5462


                                                           

Epoch 4/15, Loss: 0.4068


                                                           

Epoch 4/15, Loss: 0.3861


                                                           

Epoch 5/15, Loss: 0.3378


                                                           

Epoch 5/15, Loss: 0.3891


                                                           

Epoch 6/15, Loss: 0.3116


                                                           

Epoch 6/15, Loss: 0.3815


                                                           

Epoch 7/15, Loss: 0.2607


                                                           

Epoch 7/15, Loss: 0.4845


                                                          

KeyboardInterrupt: 

In [137]:
def save_model(model, model_name, base_path = './model_save/'):
	model.cpu()
	model.eval()
	torch.save(model, base_path + model_name)

In [138]:
save_model(siamese_model, "contrast_cnn.pt")

In [211]:
from matplotlib import pyplot as plt


def plot_siamese_model(anchor_sgram, pos_sgram, neg_sgram, loss):
	plt.style.use("dark_background")
	plt.figure(figsize=(10, 2), facecolor="#1e1e1e")
	plt.subplot(1, 2, 1)
	plt.title("Anchor Sgram")
	plt.imshow(anchor_sgram,  origin="lower", aspect="auto")
	plt.subplot(1, 2, 2)
	type = "Positive" if label.item() == 0 else "Negative"
	plt.title(f"{type} Sgram")
	plt.imshow(posneg_sgram,  origin="lower", aspect="auto")
	plt.subplot(1, 2, 2)
	plt.show()
	print(f"Similarity: {loss}")

In [212]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm


def evaluate(model, anchor_sgram, posneg_sgram):
	model.eval()
	model.to(device)
	with torch.no_grad():


		anchor_sgram = anchor_sgram.unsqueeze(0).to(device)
		pos_sgram = posneg_sgram.unsqueeze(0).to(device)
		neg_sgram = posneg_sgram.unsqueeze(0).to(device)
		anchor_output, pos_output, neg_output = model(anchor_sgram, pos_sgram,neg_sgram )
		print(f"pos: {F.pairwise_distance(anchor_output, pos_output).item()}")

In [305]:
anchor_sgram, pos_sgram, neg_sgram = train_ds[0]
evaluate(siamese_model, anchor_sgram, pos_sgram)
evaluate(siamese_model, anchor_sgram, neg_sgram)

pos: 8.358107566833496
pos: 15.434844970703125


In [301]:
anchor_sgram, pos_sgram, neg_sgram = train_ds[0]
evaluate(siamese_model, anchor_sgram, pos_sgram)
evaluate(siamese_model, anchor_sgram, neg_sgram)

pos: 8.049684524536133
pos: 11.992538452148438


In [311]:
anchor_sgram, pos_sgram, neg_sgram = train_ds[0]
evaluate(siamese_model, anchor_sgram, pos_sgram)
evaluate(siamese_model, anchor_sgram, neg_sgram)
#plot_siamese_model(anchor_sgram.squeeze(0), posneg_sgram.squeeze(0), label, loss)

pos: 8.399796485900879
pos: 14.188668251037598


In [354]:
anchor_sgram, pos_sgram, neg_sgram = test_ds[367]
evaluate(siamese_model, anchor_sgram, pos_sgram)
evaluate(siamese_model, anchor_sgram, neg_sgram)
#plot_siamese_model(anchor_sgram.squeeze(0), posneg_sgram.squeeze(0), label, loss)

pos: 4.1458892822265625
pos: 11.802881240844727


In [371]:
siamese_model.cpu()

SiameseNetwork(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (10): ReLU(inplace=True)
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=4608, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
  )
  (final): Linear(in_features=1024, out_features=2, bias=True)
)

In [404]:
class Hook():
    def __init__(self, m): 
      self.hook = m.register_forward_hook(self.hook_func)
    def hook_func(self, module, input, output):
      self = (output.cpu()).data
    def remove(self):
      self.hook.remove()

In [411]:
final_layer = siamese_model._modules.get('conv_layers')[-2]
act_maps = Hook(final_layer)

In [412]:
prediction = siamese_model._modules.get('conv_layers')(anchor_sgram.unsqueeze(0))

In [413]:
grad = act_maps.features

AttributeError: 'Hook' object has no attribute 'features'

In [415]:
grad  = torch.tensor(grad)

In [416]:
pooled_gradients = torch.mean(grad, dim=[0, 2, 3])

In [418]:
pooled_gradients.shape

torch.Size([256])

: 

In [397]:
act_maps.remove()

In [384]:
prediction.shape

torch.Size([1, 256, 2, 9])

In [390]:
act_maps.features

AttributeError: 'Hook' object has no attribute 'features'

In [None]:
pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])

In [385]:
for i in range(prediction.shape[1]):
	activations[:, i, :, :] *= pooled_gradients[i]
	
# average the channels of the activations
heatmap = torch.mean(activations, dim=1).squeeze()

# relu on top of the heatmap
# expression (2) in https://arxiv.org/pdf/1610.02391.pdf
heatmap = torch.where(heatmap > 0, heatmap, 0)

# normalize the heatmap
heatmap /= torch.max(heatmap)

# Reshape & Convert Tensor to numpy
heatmap = heatmap.squeeze()
heatmap = heatmap.detach().cpu().numpy()


# Resize Heatmap
heatmap = cv2.resize(heatmap, image_size)
# Convert to [0,255]
heatmap = np.uint8(255 * heatmap)


if figname is not None:
	plt.figure(figsize=figsize);
	fig = plt.imshow(heatmap);
	fig.axes.get_xaxis().set_visible(False)
	fig.axes.get_yaxis().set_visible(False)

	plt.savefig(figname, dpi=300, format='png', 
			bbox_inches='tight', pad_inches=0.1,
			facecolor='auto', edgecolor='auto',
			backend=None, 
		)

In [None]:

    
weight_softmax_params = list(siamese_model._modules.get('fc').parameters())
weight_softmax = np.squeeze(weight_softmax_params[0].cpu().data.numpy())
idx = topk(pred_prob,1)[1].int()
overlay = CAM(act_maps.features, weight_softmax, idx )


MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)

In [359]:
print(siamese_model)

SiameseNetwork(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (10): ReLU(inplace=True)
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=4608, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
  )
  (final): Linear(in_features=1024, out_features=2, bias=True)
)


In [358]:
from torchsummary import summary
summary(siamese_model, (1, 64, 188),  (1, 64, 188))

TypeError: forward() missing 2 required positional arguments: 'pos_input' and 'neg_input'

In [None]:
final_layer = model._modules.get('layer4')
act_maps = Hook(final_layer)