# Localisation sonore

## Introduction

Le cerveau humain est capable grâce aux informations venant du système auditif de déterminer la direction et la distance approximative d'une onde sonore. En se plaçant en coordonnées sphériques ayant pour centre la tête, l'origine d'un son est déterminée par 3 paramètres : l'azimuth (angle sur le plan horizontal), l'élévation (angle sur le plan vertical) et la distance.  Pour un être humain, la précision en azimuth est de 8,5° autour de la vraie position. Cependant, repérer l'élévation et la distance d'un son est plus difficile [[1]](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6219530/). Plusieurs indices sont utilisés par le cerveau afin de déduire l'origine d'un son. Tout d'abord, le décalage de l'onde sonore entre les deux oreilles permet la détermination de l'azimuth. La perte d'amplitude du signal permet d'estimer la distance. Enfin, de nombreux autres phénomènes plus complexes sont pris en compte comme le traitement de l'onde transmise par la tête d'une oreille à l'autre (fonction de transfert de la tête) ou bien l'atténuation différentes des fréquences en fonction de la distance [[2]](https://en.wikipedia.org/wiki/Sound_localization). 

![Axes localisation](localisation_axis.jpg)

## Etat de l'art


Des systèmes de visio-conférence intelligents existent déjà [[3]](https://www.poly.com/fr/fr/innovations/smart-camera-technology). Cependant, cela fonctionne par triangulation et corrélation de signal et nécessite une calibration ainsi que la présence de plusieurs micros à des endroits différents.

Une étude a été menée en utilisant un réseau de neurones profond à convolution [[4]](https://www.researchgate.net/publication/316698525_Sound_Source_Localization_Using_Deep_Learning_Models). Le nombre d'examples utilisés est de 2,43 millions. Pour entrainer leur modèle sur 70 exemples, une epoch met 24 h en utilisant un GPU NVIDIA Titan X.

## Objectifs

Le premier objectif est de constituer un dataset de taille suffisante, idéalement dans différents environnements. Pour cela, une première idée est d'utiliser les caméras de motion capture présentes sur le campus et de déplacer une source sonore (dont la composition spectrale varie en fonction du temps). Ainsi, cela fourni 3 fichiers intéressants : la position de la source sonore, la position du récepteur et l'audio reçu par le récepteur (voir avec Renaud Séguier si cela est faisable). Dans un second temps, le premier objectif sera d'implémenter un algorithme d'IA afin de déterminer la position de la source sonore par rapport au récepteur (selon l'axe d'azimuth) puis ajouter la notion de distance et d'élévation. Il sera intéressant de voir à quel point l'algorithme sera robuste à un changement d'environnement sonore et de type de sons émis.

Voici un premier graphique décrivant l'architecture de l'algorithme :

![Archi v1](arch_v1.png)

Direction of arrival

## Références


[[1]](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6219530/) Accuracy-Precision Trade-off in Human Sound Localisation

[[2]](https://en.wikipedia.org/wiki/Sound_localization) Sound localization

[[3]](https://www.poly.com/fr/fr/innovations/smart-camera-technology) Caméra de visio-conférence

[[4]](https://www.researchgate.net/publication/316698525_Sound_Source_Localization_Using_Deep_Learning_Models) Réseaux de neurones pour la localisation sonore

[[5]](https://github.com/sharathadavanne/seld-net) Datasets


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyroomacoustics

Collecting pyroomacoustics
  Downloading pyroomacoustics-0.6.0.tar.gz (1.1 MB)
[?25l[K     |▎                               | 10 kB 29.6 MB/s eta 0:00:01[K     |▋                               | 20 kB 19.7 MB/s eta 0:00:01[K     |█                               | 30 kB 15.7 MB/s eta 0:00:01[K     |█▎                              | 40 kB 14.5 MB/s eta 0:00:01[K     |█▌                              | 51 kB 8.1 MB/s eta 0:00:01[K     |█▉                              | 61 kB 8.2 MB/s eta 0:00:01[K     |██▏                             | 71 kB 8.8 MB/s eta 0:00:01[K     |██▌                             | 81 kB 9.9 MB/s eta 0:00:01[K     |██▉                             | 92 kB 9.8 MB/s eta 0:00:01[K     |███                             | 102 kB 8.0 MB/s eta 0:00:01[K     |███▍                            | 112 kB 8.0 MB/s eta 0:00:01[K     |███▊                            | 122 kB 8.0 MB/s eta 0:00:01[K     |████                            | 133 kB 8.0 MB/s eta 0:00

## Fonction de génération de données

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.signal import fftconvolve
import IPython
import pyroomacoustics as pra
import random
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
if torch.cuda.is_available():  
  dev = "cuda:0" 
else:  
  dev = "cpu" 

device_gpu = torch.device(dev)

In [3]:
# PARAMETERS

SOUND_FILE_SIZE = 65536
BATCH_SIZE = 124

files = ["riff_1.wav", "riff_2.wav", "riff_3.wav", "riff_4.wav"]

#files = ["/content/drive/MyDrive/sounds/riff_1.wav", "/content/drive/MyDrive/sounds/riff_2.wav", "/content/drive/MyDrive/sounds/riff_3.wav", "/content/drive/MyDrive/sounds/riff_4.wav"]
#files = ["riff_2.wav"]
signals = []
fs = 0

for file in files:
    fs, signal = wavfile.read(file)
    res = []
    for i in range(len(signal)):
      res.append(signal[i][0])
    signal = res
    signals.append(signal)

  fs, signal = wavfile.read(file)


In [4]:
# specify signal source

def generateSound(azimuth, fileSize):

    # set max_order to a low value for a quick (but less accurate) RIR
    corners = np.array([[0,0], [0,20], [20,20], [20,0]]).T  # [x,y]
    room = pra.Room.from_corners(corners, fs=fs, max_order=3, materials=pra.Material(0.2, 0.15), ray_tracing=True, air_absorption=True)
    room.extrude(20., materials=pra.Material(0.2, 0.15))

    '''
    fig, ax = room.plot()
    ax.set_xlim([-1, 31])
    ax.set_ylim([-1, 31])
    ax.set_zlim([-1, 3]);
    '''

    # Set the ray tracing parameters
    room.set_ray_tracing(receiver_radius=20, n_rays=10000, energy_thres=1e-5)

    # add source and set the signal to WAV file content
    distance = random.uniform(1,9)
    elevation = random.uniform(1,19)
    elevation_offset = random.uniform(-0.5,0.5)
    
    x_s = 10 + np.cos(azimuth)*distance
    y_s = 10 + np.sin(azimuth)*distance
    z_s = elevation + elevation_offset
    
    room.add_source([x_s, y_s, z_s], signal=random.choice(signals))

    # add two-microphone array
    room.add_microphone(loc=[10 + np.cos(0), 10 + np.sin(0), 10])
    room.add_microphone(loc=[10 + np.cos(np.pi/2), 10 + np.sin(np.pi/2), 10])
    room.add_microphone(loc=[10 + np.cos(np.pi), 10 + np.sin(np.pi), 10])
    room.add_microphone(loc=[10 + np.cos(3*np.pi/2), 10 + np.sin(3*np.pi/2), 10])

    # compute image sources
    room.image_source_model()

    # generate files
    room.simulate()

    # return files
    mic1 = room.mic_array.signals[0,:][:fileSize]
    mic2 = room.mic_array.signals[1,:][:fileSize]
    mic3 = room.mic_array.signals[2,:][:fileSize]
    mic3 = room.mic_array.signals[3,:][:fileSize]
    return [mic1, mic2, mic3, mic3]

In [5]:
result = generateSound(0, SOUND_FILE_SIZE)

# mic_1
IPython.display.Audio(result[0], rate=fs)

In [6]:
# mic_2
IPython.display.Audio(result[1], rate=fs)

In [7]:
def generateBatch():
    
    batch = []
    labels = []
    
    train_set = []

    for i in range(BATCH_SIZE):
        
        azimuth = random.randint(0,359)

        result = generateSound(azimuth*np.pi/180, SOUND_FILE_SIZE)
        batch.append([result[0],result[1], result[2], result[3]])
        labels.append(azimuth)

    train_set.append(torch.from_numpy(np.array(batch)).float().to(device_gpu))
    train_set.append(torch.from_numpy(np.array(labels)).to(device_gpu))

    return train_set

## CNN implementation

In [8]:
class classifier(nn.Module):
    
    def __init__(self):
        super(classifier, self).__init__()
        '''
        # Implementation 1 : doesn't converge => Maybe max pooling erase micro correlation
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=16, kernel_size=2048, padding='same') # conv layer 1
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=16, padding='same')  # conv layer 2 and 3
        self.mp1 = nn.MaxPool1d(kernel_size=128) # max pooling
        self.fc1 = nn.Linear(in_features=12496, out_features=1000) # fully-connected layer 1
        self.fc2 = nn.Linear(in_features=1000, out_features=1000) # fully-connected layer 2
        self.fc3 = nn.Linear(in_features=1000, out_features=360) # fully-connected layer output
        self.softmax = nn.LogSoftmax(dim=1)
        self.relu = nn.LeakyReLU(negative_slope=0.01, inplace=False)
        '''
        
        '''
        # Implementation 2 :
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=16, kernel_size=2048, padding=0) # conv layer 1
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=16, padding=0)  # conv layer 2 and 3
        self.mp1 = nn.AvgPool1d(kernel_size=128) # avg pooling
        self.fc1 = nn.Linear(in_features=5984, out_features=1000) # fully-connected layer 1
        self.fc2 = nn.Linear(in_features=1000, out_features=1000) # fully-connected layer 2
        self.fc3 = nn.Linear(in_features=1000, out_features=360) # fully-connected layer output
        self.softmax = nn.LogSoftmax(dim=1)
        self.relu = nn.LeakyReLU(negative_slope=0.01, inplace=False)
        '''
        
        # Implementation 3 :
        self.maxpool = nn.MaxPool1d(2)
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=64, kernel_size=64, padding="same") # conv layer 1
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=16, kernel_size=64, padding="same") # conv layer 2
        self.conv3 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=1024, padding="same") # conv layer 3
        self.conv4 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=1024, padding="same") # conv layer 4
        self.conv5 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=1024, padding="same") # conv layer 5
        self.conv6 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=1024, padding="same") # conv layer 6
        self.fc1 = nn.Linear(in_features=16384, out_features=4096) # fully-connected layer 1
        self.fc2 = nn.Linear(in_features=4096, out_features=1024) # fully-connected layer 2
        self.fc3 = nn.Linear(in_features=1024, out_features=512) # fully-connected layer 2
        self.fc4 = nn.Linear(in_features=512, out_features=360) # fully-connected layer output
        self.softmax = nn.LogSoftmax(dim=1)
        self.relu = nn.LeakyReLU(negative_slope=0.01, inplace=False)
        
        
    def forward(self,x):
        # implement your network here, do not forget to flatten your tensor after pooling
        
        '''
        # Implementation 1 : 
        # Idea doesn't converge => Maybe max pooling erase little dt correlation
        
        # conv layer
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.mp1(x)
        
        x = torch.flatten(x,1)

        # fc layer
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.softmax(x)
        '''
        
        '''
        # Implementation 2 :
        # Change : remove blank at wav file start
        
        # conv layer
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.mp1(x)
        
        x = torch.flatten(x,1)

        # fc layer
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.softmax(x)
        '''
        
        # Implementation 3 :
        # Change : remove blank at wav file start
        
        # conv layer
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv4(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv5(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv6(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = torch.flatten(x,1)

        # fc layer
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.fc4(x)
        x = self.softmax(x)
        
        return x

In [9]:
'''
!pip install GPUtil

import torch
from GPUtil import showUtilization as gpu_usage
from numba import cuda

def free_gpu_cache():
    print("Initial GPU Usage")
    gpu_usage()                             

    torch.cuda.empty_cache()

    cuda.select_device(0)
    cuda.close()
    cuda.select_device(0)

    print("GPU Usage after emptying the cache")
    gpu_usage()

free_gpu_cache()
'''

'\n!pip install GPUtil\n\nimport torch\nfrom GPUtil import showUtilization as gpu_usage\nfrom numba import cuda\n\ndef free_gpu_cache():\n    print("Initial GPU Usage")\n    gpu_usage()                             \n\n    torch.cuda.empty_cache()\n\n    cuda.select_device(0)\n    cuda.close()\n    cuda.select_device(0)\n\n    print("GPU Usage after emptying the cache")\n    gpu_usage()\n\nfree_gpu_cache()\n'

In [10]:
def train(model, loss_fn, optimizer, n_epochs=1):
    
    model.train(True)
    
    loss_train = np.zeros(n_epochs)
    acc_train = np.zeros(n_epochs)
    
    for epoch_num in range(n_epochs):
        
        running_loss = 0.0 # loss
        size = 0
        train_set = generateBatch()
        datas_in = train_set[0]
        labels_in = train_set[1]

        inputs = datas_in
        labels = labels_in

        #######
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        #######
        # store loss and compute num. of correct predictions
        running_loss += loss

        # count the number of samples
            
        epoch_loss = running_loss.item()
        loss_train[epoch_num] = epoch_loss
        
        print('Epoch {} - Loss: {:.4f}'.format(epoch_num+1, epoch_loss))
    
        
        
    return loss_train

In [None]:
conv_class = classifier().to(device_gpu)

# choose the appropriate loss
loss_fn = nn.NLLLoss() # initialize loss function

# your Adam optimizer
learning_rate = 1e-4
optimizer_cl = torch.optim.Adam(conv_class.parameters(), lr=learning_rate)
# number of epochs
n_epochs = 100

# and train
training_loss = train(conv_class,loss_fn,optimizer_cl,n_epochs = n_epochs)

  return F.conv1d(input, weight, bias, self.stride,


In [None]:
plt.plot(training_loss)
plt.xlabel('epoch')
plt.xlabel('NLLLoss loss')

NameError: ignored

In [None]:
def test(model):
    
    model.train(False)
    
    test_set_32 = generateBatch_32()
    datas_in = test_set_32[0]
    labels_in = test_set_32[1]

    inputs = datas_in
    labels = labels_in
    bs = labels.size(0)


    outputs = model(inputs)
    print(outputs)
    print(labels)

In [None]:
test(conv_class)

tensor([[-4.7613e+00,  2.3248e+00, -2.2895e+00],
        [-1.0000e+01, -4.9624e+00, -1.0000e+01],
        [-1.0000e+01, -2.5021e+00, -1.0000e+01],
        [-7.5128e+00,  3.2955e+00, -8.1928e+00],
        [-1.0000e+01,  4.9433e+00, -1.0000e+01],
        [-4.1964e+00, -3.8261e+00, -5.3469e+00],
        [-1.7140e+00,  3.4698e+00, -4.2906e+00],
        [-3.2662e+00,  1.3813e+00, -5.1674e+00],
        [-7.6509e+00, -2.1163e+00, -1.0000e+01],
        [-1.0000e+01, -3.6507e+00, -1.0000e+01],
        [-5.4623e+00,  2.6600e-01, -6.3363e+00],
        [-6.2995e+00, -3.3891e+00, -8.4710e+00],
        [-3.7143e+00,  2.7585e+00, -4.2191e+00],
        [-3.2319e+00,  3.6933e+00, -6.3285e+00],
        [-3.6569e+00,  7.1992e-01, -4.2440e+00],
        [-1.0000e+01,  6.2449e+00, -1.0000e+01],
        [-4.2432e+00, -4.9562e+00, -8.9556e+00],
        [-7.8970e+00, -1.4573e+00, -9.0715e+00],
        [-1.0000e+01, -6.5219e+00, -1.0000e+01],
        [-1.0000e+01, -1.4573e-01, -9.4843e+00],
        [-9.5413e+00

## RNN implementation

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class azimuthRNN(nn.Module):
    
    def __init__(self):
        super(classifier, self).__init__()
        
        # Implementation 1 :
        self.fc1 = nn.Linear(in_features=63624, out_features=1000) # fully-connected layer 1
        self.fc2 = nn.Linear(in_features=1000, out_features=1000) # fully-connected layer 2
        self.fc3 = nn.Linear(in_features=1000, out_features=360) # fully-connected layer output
        self.softmax = nn.LogSoftmax(dim=1)
        self.relu = nn.LeakyReLU(negative_slope=0.01, inplace=False)
        
        
    def forward(self,x):

        # Implementation 1 :
        
        # conv layer
        x = self.conv1(x)
        x = self.relu(x)
        
        x = torch.flatten(x,1)

        # fc layer
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.softmax(x)
        
        return x

In [None]:
def train(model, loss_fn, optimizer, n_epochs=1):
    
    model.train(True)
    
    loss_train = np.zeros(n_epochs)
    acc_train = np.zeros(n_epochs)
    
    for epoch_num in range(n_epochs):
        
        running_loss = 0.0 # loss
        size = 0
        train_set = generateBatch()
        datas_in = train_set[0]
        labels_in = train_set[1]

        inputs = datas_in
        labels = labels_in

        #######
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        #######
        # store loss and compute num. of correct predictions
        running_loss += loss

        # count the number of samples
            
        epoch_loss = running_loss.item()
        loss_train[epoch_num] = epoch_loss
        
        print('Epoch {} - Loss: {:.4f}'.format(epoch_num+1, epoch_loss))
    
        
        
    return loss_train