## Import des libraries

In [1]:
import numpy as np
import torch
import torch.nn as nn
import os
from scipy.io import wavfile
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import torchinfo

  from .autonotebook import tqdm as notebook_tqdm


## Téléchargement du dataset

In [None]:
!gdown 1Q3sbordKpx65ExOvqtM4fif7rDXhIXwS
!unzip datasetaudio.zip

Sinon, le lien est ici: https://drive.google.com/file/d/1Q3sbordKpx65ExOvqtM4fif7rDXhIXwS/view?usp=sharing

## Load le dataset

### Load le dataset de train
Pour load un fichier .wav, on utilise la fonction `wavfile.read()` de `scipy.io` qui nous retourne un tuple `(samplerate, data)`. `data` est un array numpy de dimension 1, contenant les valeurs du signal audio. Le nombre d'échantillons par seconde pour nos données est de 4000, donc chaque fichier .wav contient 6000 valeurs car la durée du signal est de 1.5 secondes. `data` est donc de shape `(6000,)`.

Il y a 4096 fichiers .wav dans le dossier `train/x_train` et 2048 fichiers .wav dans le dossier `test/x_test`. Donc on peut créer un array numpy de shape `(4096, 6000)` pour les données d'entraînement et un array numpy de shape `(2048, 6000)` pour les données de test.

In [2]:
X_train = []
Y1_train = []
Y2_train = []

N_TRAIN = 4096

for i in range(N_TRAIN):
    x = wavfile.read(f"train/x_train/{i}.wav")[1]
    y1 = wavfile.read(f"train/y_train/{i}-a.wav")[1]
    y2 = wavfile.read(f"train/y_train/{i}-b.wav")[1]

    X_train.append(x)
    Y1_train.append(y1)
    Y2_train.append(y2)

X_train = np.array(X_train)
Y1_train = np.array(Y1_train)
Y2_train = np.array(Y2_train)

print(X_train.shape)
print(Y1_train.shape)
print(Y2_train.shape)

(4096, 6000)
(4096, 6000)
(4096, 6000)


### Data augmentation

In [3]:
fact = 2**1
aug = (fact-1)*N_TRAIN

#get aug pairs of random indices
idx = np.random.randint(0, N_TRAIN, size=(aug, 2))
#sum the signals
Y1_train_aug = Y1_train[idx[:,0]]
Y2_train_aug = Y2_train[idx[:,1]]
X_train_aug = Y1_train_aug + Y2_train_aug
X_train_aug.shape

X_train = np.concatenate([X_train, X_train_aug])
Y1_train = np.concatenate([Y1_train, Y1_train_aug])
Y2_train = np.concatenate([Y2_train, Y2_train_aug])

print(X_train.shape)
print(Y1_train.shape)
print(Y2_train.shape)


(8192, 6000)
(8192, 6000)
(8192, 6000)


### Load le dataset de test

In [4]:
X_test = []

N_TEST = 512

for i in range(N_TEST):
    x = wavfile.read(f"test/x_test/{i}.wav")[1]
    X_test.append(x)

X_test = np.array(X_test)

print(X_test.shape)

(512, 6000)


## Lecture des données

In [None]:
import IPython.display as ipd

SAMPLERATE = 4000

In [None]:

ipd.Audio(X_train[0], rate=SAMPLERATE)

In [None]:
ipd.Audio(Y1_train[0], rate=SAMPLERATE)

In [None]:
ipd.Audio(Y2_train[0], rate=SAMPLERATE)

## Batch les données

In [5]:
batch_size = 64
X_train_reshaped = X_train.reshape(-1, batch_size, 6000)
Y1_train_reshaped = Y1_train.reshape(-1, batch_size, 6000)
Y2_train_reshaped = Y2_train.reshape(-1, batch_size, 6000)

print(X_train_reshaped.shape)
print(Y1_train_reshaped.shape)
print(Y2_train_reshaped.shape)

(128, 64, 6000)
(128, 64, 6000)
(128, 64, 6000)


## Convertir les données en torch.tensor

In [6]:
X_train_torch = torch.from_numpy(X_train_reshaped).float()
Y1_train_torch = torch.from_numpy(Y1_train_reshaped).float()
Y2_train_torch = torch.from_numpy(Y2_train_reshaped).float()

## Faire la même chose pour le dataset de test

In [7]:
X_test_reshaped = X_test.reshape(-1, batch_size, 6000)
X_test_torch = torch.from_numpy(X_test_reshaped).float()

print(X_test_torch.shape)

torch.Size([8, 64, 6000])


### Netoyage

In [8]:
#delete unsuse arrays
del X_train
del Y1_train
del Y2_train
del X_test
del X_train_reshaped
del Y1_train_reshaped
del Y2_train_reshaped
del X_test_reshaped

## Créer le modèle

In [None]:
kernel_size = 51

class Conv1D(nn.Module):
    def __init__(self):
        super(Conv1D, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=32, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(32),
            nn.PReLU(),
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(64),
            nn.PReLU(),
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(128),
            nn.PReLU(),
            nn.Conv1d(in_channels=128, out_channels=256, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(256),
            nn.PReLU(),
            nn.Conv1d(in_channels=256, out_channels=512, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(512),
            nn.PReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.Conv1d(in_channels=512, out_channels=256, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(256),
            nn.PReLU(),
            nn.Conv1d(in_channels=256, out_channels=128, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(128),
            nn.PReLU(),
            nn.Conv1d(in_channels=128, out_channels=64, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(64),
            nn.PReLU(),
            nn.Conv1d(in_channels=64, out_channels=32, kernel_size=kernel_size, stride=1, padding="same"),
            nn.BatchNorm1d(32),
            nn.PReLU(),
            nn.Conv1d(in_channels=32, out_channels=1, kernel_size=kernel_size, stride=1, padding="same"),
        )
        
    def forward(self, x):
        x = x.unsqueeze(1)
        x1 = self.encoder(x)
        x1 = self.decoder(x1)
        x2 = x - x1
        
        return x1.squeeze(1), x2.squeeze(1)

In [9]:
class conv_block(nn.Module):
    def __init__(self, in_c, out_c, kernel_size):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv1d(in_c, out_c, kernel_size=kernel_size, padding="same"),
            nn.BatchNorm1d(out_c),
            nn.ReLU(),
            nn.Conv1d(out_c, out_c, kernel_size=kernel_size, padding="same"),
            nn.BatchNorm1d(out_c),
            nn.ReLU()
        )
    def forward(self, x):
        y = self.network(x)
        return y

class encoder_block(nn.Module):
    def __init__(self, in_c, out_c, kernel_size):
        super().__init__()
        self.conv = conv_block(in_c, out_c, kernel_size)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
    def forward(self, x):
        y = self.conv(x)
        p = self.pool(y)
        return x, p

class decoder_block(nn.Module):
    def __init__(self, in_c, out_c, in_skip, kernel_size):
        super().__init__()
        self.up = nn.ConvTranspose1d(in_c, out_c, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(out_c+in_skip, out_c, kernel_size)
    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

class Unet(nn.Module):
    def __init__(self, kernel_size):
        super().__init__()
        self.e1 = encoder_block(1, 16, kernel_size=kernel_size)
        self.e2 = encoder_block(16, 32, kernel_size=kernel_size)
        self.e3 = encoder_block(32, 64, kernel_size=kernel_size)
        self.e4 = encoder_block(64, 64, kernel_size=kernel_size)
        self.b = conv_block(64, 128, kernel_size=kernel_size)
        self.d1 = decoder_block(128, 64, 64, kernel_size=kernel_size)
        self.d2 = decoder_block(64, 32, 32, kernel_size=kernel_size)
        self.d3 = decoder_block(32, 16, 16, kernel_size=kernel_size)
        self.d4 = decoder_block(16, 8, 1, kernel_size=kernel_size)
        self.outputs = nn.Conv1d(8, 1, kernel_size=1, padding=0)
        
    def forward(self, inputs):
        inputs = inputs.unsqueeze(1)
        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)
        b = self.b(p4)
        d1 = self.d1(b, s4)
        d2 = self.d2(d1, s3)
        d3 = self.d3(d2, s2)
        d4 = self.d4(d3, s1)
        outputs = self.outputs(d4)
        
        y1 = outputs
        y2 = inputs - outputs
        return y1.squeeze(1), y2.squeeze(1)

class MultiUnet(nn.Module):
    def __init__(self, list_kernel_size):
        super().__init__()
        self.len = len(list_kernel_size)
        for i, kernel_size in enumerate(list_kernel_size):
            setattr(self, f"unet{i}", Unet(kernel_size))
            
        self.fuse = nn.Conv1d(self.len, 1, kernel_size=1, padding=0)
        
    def forward(self, inputs):
        Y1 = []
        for i in range(self.len):
            y1, _ = getattr(self, f"unet{i}")(inputs)
            Y1.append(y1.unsqueeze(1))
        
        y1 = torch.cat(Y1, axis=1)
        y1 = self.fuse(y1)
        y2 = inputs.unsqueeze(1) - y1
        
        return y1.squeeze(1), y2.squeeze(1)

## Boucle d'entraînement

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultiUnet([3, 5]).to(device)

loss_history = []

In [37]:
torchinfo.summary(model, X_train_torch[0].shape)

Layer (type:depth-idx)                        Output Shape              Param #
MultiUnet                                     [64, 6000]                --
├─Unet: 1-1                                   [64, 6000]                --
│    └─encoder_block: 2-1                     [64, 1, 6000]             --
│    │    └─conv_block: 3-1                   [64, 16, 6000]            912
│    │    └─MaxPool1d: 3-2                    [64, 16, 3000]            --
│    └─encoder_block: 2-2                     [64, 16, 3000]            --
│    │    └─conv_block: 3-3                   [64, 32, 3000]            4,800
│    │    └─MaxPool1d: 3-4                    [64, 32, 1500]            --
│    └─encoder_block: 2-3                     [64, 32, 1500]            --
│    │    └─conv_block: 3-5                   [64, 64, 1500]            18,816
│    │    └─MaxPool1d: 3-6                    [64, 64, 750]             --
│    └─encoder_block: 2-4                     [64, 64, 750]             --
│    │    └─

In [11]:
lr = 1e-4
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.7)

In [12]:
epochs = 10
model.train()
for e in range(epochs):
    print(f"Epoch {e+1}/{epochs}")
    for i in tqdm(range(len(X_train_torch))):
        optimizer.zero_grad()
        X = X_train_torch[i].to(device)
        Y1 = Y1_train_torch[i].to(device)
        Y2 = Y2_train_torch[i].to(device)

        Y1_pred, Y2_pred = model(X)
        loss = torch.min(loss_fn(Y1_pred, Y1) + loss_fn(Y2_pred, Y2), loss_fn(Y1_pred, Y2) + loss_fn(Y2_pred, Y1))

        loss.backward()
        optimizer.step()
        loss_history.append(loss.item())
    scheduler.step()

Epoch 1/10


  2%|▏         | 2/128 [00:03<03:53,  1.85s/it]


KeyboardInterrupt: 

In [None]:
plt.plot(np.log(loss_history), label="loss")

## Prédiction

In [None]:
model.eval()

predictions = np.array([])
predictions = predictions.reshape(0, 2, 6000)

for i in range(len(X_test_torch)):
    X = X_test_torch[i].to(device)
    with torch.no_grad():
        Y1_pred, Y2_pred = model(X)
        Y_pred = torch.stack([Y1_pred, Y2_pred], dim=1)
    predictions = np.concatenate([predictions, Y_pred.cpu().numpy()])

np.save("predictions.npy", predictions)
!zip predictions.zip predictions.npy