In [1]:
import torch
import numpy
import itertools

In [2]:
if torch.cuda.is_available():
    device = torch.device('cuda:0')
    print('Running on the GPU.')
else:
    device = torch.device('cpu')
    print('Running on the CPU.')

Running on the GPU.


## Data generation

In [3]:
vector_length = 5
models_variance = 1.0
segments_length = 1000
segments_variance = 5.0

speakers_ids = ['A', 'B', 'C']
speakers_models = [-152.0, 43.0, 898.0]
speakers_models = [numpy.random.normal(speakers_models[i], models_variance, vector_length) for i, speaker_id in enumerate(speakers_ids)]

speakers = {}
for i, speaker_id in enumerate(speakers_ids):
    speakers[speaker_id] = {}
    speakers[speaker_id]['speaker_id'] = speaker_id
    speakers[speaker_id]['models'] = [speakers_models[i]]

segments = []
for i, speaker_id in enumerate(speakers):
    speaker = speakers[speaker_id]
    for j in range(segments_length):
        segments.append({ 'speaker_id': speaker_id, 'vector': numpy.asarray([numpy.random.normal(value, segments_variance) for value in speaker['models'][0]]) })


## Data indexation

In [4]:
models_container_length = 3

#permutations = list(itertools.permutations(speakers_ids + ['0' for i in range(models_container_length)], models_container_length))
permutations = list(itertools.permutations(speakers_ids, models_container_length))
permutations = list(set(permutations))
permutations_length = 0
permutations_map = []
for i, permutation in enumerate(permutations):
    permutation_length = int(numpy.prod([len(speakers[speaker_id]['models']) for speaker_id in permutation if speaker_id in speakers_ids]))
    permutations_map.append((permutations_length, permutations_length + permutation_length - 1, i))
    permutations_length += permutation_length
recording_length = len(segments) * permutations_length

In [5]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split

class Recordings_dataset(Dataset):
    def __len__(self):
        return recording_length
    def __getitem__(self, idx):
        segment_i, permutation_i = divmod(idx, permutations_length)
        segment = segments[segment_i]
        permutation_tuple = list(filter(lambda permutation_tuple: permutation_tuple[0] <= permutation_i and permutation_i <= permutation_tuple[1], permutations_map))[0]
        permutation_i = permutation_tuple[2]
        permutation = permutations[permutation_i]
        vector = segment['vector']
        models_container = [speakers[speaker_id]['models'][0] for speaker_id in permutation]
        
        x = numpy.concatenate([vector] + models_container)
        y = numpy.asarray([speaker_id == segment['speaker_id'] for speaker_id in permutation], dtype = float)
        
        return x, y
        
recordings_dataset = Recordings_dataset()
print(len(recordings_dataset))

train_length = int(len(recordings_dataset) * 0.7)
test_length = len(recordings_dataset) - train_length
cross_length = int(train_length * 0.3)
train_length = train_length - cross_length

train_dataset, cross_dataset, test_dataset = random_split(recordings_dataset, [train_length, cross_length, test_length])
train_dataloader = DataLoader(train_dataset, batch_size = 10, shuffle=True, num_workers = 4)
cross_dataloader = DataLoader(cross_dataset, batch_size = len(cross_dataset), num_workers = 4)

18000


In [6]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc0 = nn.Linear(4 * vector_length, 6)
        self.fc1 = nn.Linear(6, 6)
        self.fc2 = nn.Linear(6, 6)
        self.fc3 = nn.Linear(6, 3)
    def forward(self, x):
        x = F.relu(self.fc0(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.sigmoid(self.fc3(x))
        return x

In [10]:
import torch.optim as optim

net = Net().to(device)

optimizer = optim.Adam(net.parameters(), lr = 0.0005)
criterion = nn.BCELoss()

epochs = 100
for epoch in range(epochs):
    for input, target in train_dataloader:
        input = input.to(device, non_blocking=True).float()
        target = target.to(device, non_blocking=True).float()

        net.zero_grad()

        output = net(input)

        loss = criterion(output, target)

        loss.backward()
        optimizer.step()
    with torch.no_grad():
        for input, target in cross_dataloader:
            input = input.to(device, non_blocking=True).float()
            target = target.to(device, non_blocking=True).float()
            output = net(input)
            cross_loss = criterion(output, target)
        
    print('epoch:', epoch, 'loss:', loss, 'cross_loss:', cross_loss)
    
    if cross_loss < 0.05:
        print('Done training.')
        break

epoch: 0 loss: tensor(0.3792, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.3575, device='cuda:0')
epoch: 1 loss: tensor(0.2142, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.2255, device='cuda:0')
epoch: 2 loss: tensor(0.1862, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.1885, device='cuda:0')
epoch: 3 loss: tensor(0.2340, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.1577, device='cuda:0')
epoch: 4 loss: tensor(0.1109, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.1335, device='cuda:0')
epoch: 5 loss: tensor(0.0938, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.1121, device='cuda:0')
epoch: 6 loss: tensor(0.1126, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: tensor(0.0780, device='cuda:0')
epoch: 7 loss: tensor(0.0969, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) cross_loss: 

In [11]:
test_dataloader = DataLoader(test_dataset, batch_size = len(test_dataset), num_workers = 4)
with torch.no_grad():
    for input, target in test_dataloader:
        input = input.to(device, non_blocking=True).float()
        target = target.to(device, non_blocking=True).float()
        output = net(input)
        test_loss = criterion(output, target)
    print('test_loss:', test_loss)

test_loss: tensor(0.0273, device='cuda:0')


In [12]:
test_dataloader = DataLoader(test_dataset, batch_size = 1, num_workers = 1)
correct = 0
with torch.no_grad():
    for input, target in test_dataloader:
        input = input.to(device, non_blocking=True).float()
        target = target.to(device, non_blocking=True).float()
        output = net(input)
        if target.max(1)[1] == output.max(1)[1]:
            correct += 1
print(correct / len(test_dataset))

1.0
