# **TEAM: AI_Atlantique**

In [1]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Tue Dec  3 21:01:59 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA L4                      Off | 00000000:00:03.0 Off |                    0 |
| N/A   59C    P8              12W /  72W |      1MiB / 23034MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

# **metric.py**

In [4]:
from os.path import dirname, join
import numpy as np
import sofar
import torch

class MeanSpectralDistortion:
    """
    Metric Class used for evaluation, can also be used as loss function.
    MeanSpectalDistortion().get_spectral_distortion(ground_truth, predicted) for calculating error.
    """

    def __init__(self):

        self.avg_hrir = sofar.read_sofa(join(dirname(__file__), 'data', 'Average_HRTFs.sofa'), verbose=False)
        self.source_positions = self.avg_hrir.SourcePosition
        self.elevation_index = self._get_elevation_index()
        self.weights = self._get_weights()

    def _get_weights(self):
        """
        This function load the weights which are used when you calculate the spectral distortion/ baseline predictions
        weights were calculated based on the paper "Looking for a relevant similarity criterion fo HRTF clustering: a comparative study - Rozenn Nicol".

        Returns:
               normalized_weights: torch.tensor

        """
        # Generate a list of frequencies up to 24 kHz
        frequencies_Hz = np.linspace(0, 24000, 129)  # 129 points between 0 Hz and 24 kHz
        frequencies_kHz = frequencies_Hz / 1000
        inv_cb = 1 / (25 + 75 * (1 + 1.4 * frequencies_kHz**2) ** 0.69)  # inverse of delta (critical bandwidth)
        a0 = sum(inv_cb)
        normalized_weights = inv_cb / a0
        return normalized_weights

    def _get_elevation_index(self):
        """
        Helper function to get elevation indexes.
        Args:
            You can change the elevation range as required. We will use the elvation range between -30 to 30
            Returns:
             all index for the elevation range"""
        # this function gives the index of the directions for which you need to evaluate your results.

        azimuths = self.source_positions[:, 0]
        elevations = self.source_positions[:, 1]

        # Define the elevation range
        elevation_min = -30
        elevation_max = 30
        # Find the indices for the specific elevation range
        elevation_indices = np.where((elevations >= elevation_min) & (elevations <= elevation_max))[0]

        # Ensure that elevation_indices is a NumPy array of integers
        return np.array(elevation_indices, dtype=int)

    def get_spectral_distortion(self, hrtf_ground_truth: torch.Tensor, hrtf_predicted: torch.Tensor) -> torch.Tensor:
        """
        Computes the spectral distortion between the inputs.

        Args:
            hrtf_ground_truth: torch.tensor
            hrtf_predicted: torch.tensor
        Returns:
            weighted_error: torch.tensor in dB

        """

        weighted_error = ((torch.from_numpy(self.weights) * (hrtf_ground_truth[self.elevation_index].abs() - hrtf_predicted[self.elevation_index].abs())) ** 2).mean()
        return weighted_error.log10() * 10

    def get_spectral_distortion_numpy(self, hrtf_ground_truth: np.ndarray, hrtf_predicted: np.ndarray) -> float:
        """
        Computes the spectral distortion between the inputs.

        Args:
            hrtf_ground_truth: np.ndarray
            hrtf_predicted: np.ndarray
        Returns:
            weighted_error: float

        """

        weighted_error = np.mean(self.weights * (np.abs(hrtf_ground_truth[self.elevation_index]) - np.abs(hrtf_predicted[self.elevation_index])) ** 2)
        return float(10. * np.log10(weighted_error))

# **utils.py**

In [5]:
from __future__ import annotations
from typing import Dict, List, Tuple
import sofar
import glob
import numpy as np
from imageio.v3 import imread
import os
import torch
import tqdm
from torchvision.transforms import Compose, Resize, ToTensor



all_tasks = [np.arange(19).tolist(), np.arange(19, step=3).tolist(), [3, 6, 9]]


class SonicomDatabase(torch.utils.data.Dataset):

    def __init__(
        self,
        root_dir: str,
        hrtf_type="FreeFieldCompMinPhase",
        no_itd=True,
        sampling_rate="48kHz",
        nfft=256,
        training_data: bool = True,
        task_id: int = 0,
        folder_structure: str = 'v2'
    ):
        """
        Args:
            root_dir: Directory with all the HRTF files in subfolder.
            hrtf_type: can be any of ['Raw','Windowed','FreeFieldComp','FreeFieldCompMinPhase'(default)]
            sampling_rate: any of 44kHz, 48kHz, 96kHz
            nfft: fft length
            training_data: if true then return training dataset
            task_id: task id determines how many images will be used for inference. Can be 0, 1, or 2.
        """
        super().__init__()
        self.root_dir = root_dir
        self.hrtf_type = hrtf_type
        self.nfft = nfft

        if no_itd:
            itd_str = "NoITD_"
        else:
            itd_str = ""

        if folder_structure not in {'v1', 'v2'}:
            raise RuntimeError('Unknown folder structure version')
        pathname = f"P*/P*/HRTF/HRTF/{sampling_rate}/*_{hrtf_type}_{itd_str}{sampling_rate}.sofa" if folder_structure == 'v1' else \
                   f"SONICOM_HRTF/P*/HRTF/HRTF/{sampling_rate}/*_{hrtf_type}_{itd_str}{sampling_rate}.sofa"
        self.hrtf_files = glob.glob(os.path.join(root_dir, pathname))
        # print('Found ' + str(len(self.hrtf_files)) + ' files')




        if training_data:
            self.image_dir = os.path.join(root_dir, "SONICOM_TrainingData_pics")
            self.task = all_tasks[0]
        else:
            print(os.path.join(root_dir, "SONICOM_TestData_pics"))
            self.image_dir = os.path.join(root_dir, "SONICOM_TestData_pics")
            self.task = all_tasks[task_id]

        self.all_image_names = [i for i in os.listdir(self.image_dir) if ".png" in i]
        self.all_subjects = self.get_available_ids()

        # read one to get coordinate system information
        try:
            tmp = sofar.read_sofa(self.hrtf_files[0], verbose=False)
            self.training_data = training_data
            self.position = tmp.SourcePosition
        except (IndexError, ValueError):
            print("Check if Dataset is saved as described in the notebook.")
            return None


    def __len__(self):
        return len(self.all_subjects)

    def load_all_hrtfs(self) -> torch.Tensor:
        """
        This function loads all the HRTFs from the list of IDs.

        Returns:
            Magnitude Spectrum of HRTFs : torch.Tensor
        """
        HRTFs = torch.zeros(
            (self.__len__(), self.position.shape[0], 2, self.nfft // 2 + 1)
        )

        allids = np.unique([cur_id[:5] for cur_id in self.all_image_names])
        for idx in range(len(allids)):
            if allids[idx] == allids[idx - 1] and idx > 0:
                HRTFs[idx] = HRTFs[idx - 1]
            else:
                HRTFs[idx] = torch.from_numpy(
                    self.load_subject_id_hrtf(allids[idx])
                ).abs()
        return HRTFs

    def load_image(self, image_name: str) -> Tuple[np.ndarray, str, str]:
        """
        This function read all the image files in the directory, get the ID of the image, Left or Right side of the pinna.

        Args:
            image_name (str): e.g. P0002_left_0.png

        Returns:
            image: torch.Tensor
            ID: (str) Subject ID of the loaded image
            Face_Side: (str) If the image loaded is of the left ear or the right ear
        """

        image = imread(os.path.join(self.image_dir, image_name))
        ID = image_name[:5]
        Face_Side = ["left" if "left" in image_name else "right"]

        return image, ID, Face_Side

    def get_image_names_from_id(self, id: str) -> List[str]:
        """
        This function helps to get the image names from the directory.

        Args:
            id (str): Subject ID e.g. 'P0001'
        Returns:
            List of image name
        """
        return [
            x for x in os.listdir(self.image_dir) if f"{id}" in x
        ]  # glob.glob(os.path.join(self.image_dir, f'{id}*'))

    def get_available_ids(self) -> List[str]:
        """
        This function returns all unique IDs from the list of images.

        Args:
            all_images (list of str)
        Returns:
            list of unique IDs
        """
        return list({name[:5] for name in self.all_image_names})

    def _extract_number_of_image(self, image_name: str) -> List[int]:
        """
        Extracts the image number of the subject from an image filename.

        Args:
            image_name (str): Filename of the image.

        Returns:
            Optional[int]: value if successfully extracted; otherwise, None.
        """
        try:
            azi_str = image_name.split("t_")[1]
            number = int(azi_str.split(".")[0])
            return number
        except (IndexError, ValueError):
            return None

    def _get_task_subset_image_names(self, image_names: List[str]) -> Tuple[List[str], List[str]]:
        """
        Returns two Lists of left and right image names from selected subset (based on task).

        Args:
            image_names (List of str): Filenames of the images.

        Returns:
            Dict e.g. {left_0: [0, 'P0002_left_0.png'], right_0: [1, 'P0002_right_0.png']}
        """

        left_names = []
        right_names = []
        for i in image_names:
            cur_azi = self._extract_number_of_image(i)
            if cur_azi in self.task:
                if "left" in i:
                    left_names.append(i)  # channel, name
                if "right" in i:
                    right_names.append(i)

        return left_names, right_names

    def get_all_images_and_HRTF_from_id(self, id: str) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Loads all the images for the subject (only the subset) and the corresponding HRTF.

        Args:
            ID of each subject (str): e.g. P0001

        Returns:
            all_images: torch.Tensor of shape (ear_idx, image_idx, height, width)
            HRTFs: torch.Tensor
        """
        image_names = self.get_image_names_from_id(id)
        image_names.sort()
        left_images_filenames, right_images_filenames = (
            self._get_task_subset_image_names(image_names)
        )
        left_images = []
        right_images = []

        if not left_images_filenames or not right_images_filenames:
            raise FileNotFoundError(f"No images found for subject ID '{id}'.")

        left_images = torch.from_numpy(np.stack([imread(os.path.join(self.image_dir, path)) for path in left_images_filenames]))
        right_images = torch.from_numpy(np.stack([imread(os.path.join(self.image_dir, path)) for path in right_images_filenames]))

        all_images = torch.stack((left_images, right_images))

        HRTF = self.load_subject_id_hrtf(id)

        return all_images, HRTF

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        This function is used by the Dataloader, it iterates through the number of subjects in the current dataset
        and provides the corresponding Images, HRTFs and Subject IDs.
        """

        id = self.all_subjects[idx]
        all_images, HRTF = self.get_all_images_and_HRTF_from_id(id)

        return all_images, HRTF


    def load_subject_id_hrtf(self, subject_id: str, return_sofa: bool = False) -> np.ndarray:
        hrtf_file = [s for s in self.hrtf_files if subject_id + "_" + self.hrtf_type in s][0]
        if not hrtf_file:
            print(subject_id + " Not found!")
            return None

        data = sofar.read_sofa(hrtf_file, verbose=False)
        if return_sofa:
            return data

        # Filtrer pour garder seulement les angles dans self.task
        filtered_hrir = data.Data_IR[self.task]  # Indices des angles définis dans self.task
        return self._compute_HRTF(filtered_hrir)


    def _load_hrir(self, hrtf_file: sofar.Sofa) -> np.ndarray:
        """
        This function load the HRIR data for the given filename.

        Args:
              sofa file
            Returns:
               HRIR data"""
        data = sofar.read_sofa(hrtf_file, verbose=False)
        return data.Data_IR

    def _compute_HRTF(self, hrir: np.ndarray) -> np.ndarray:
        """
        This function compute the RFFT of the given HRIRs and return HRTFs.

        Args:
              HRIRs (time domain)
            Returns:
               HRTFs (Frequency domain)"""

        return np.fft.rfft(hrir, n=self.nfft)


def baseline_spectral_distortion(sd: SonicomDatabase, path_to_baseline_hrtf: str = "/content/drive/MyDrive/TechArena/TechArena20241120/data/Average_HRTFs.sofa") -> float:
    # this function calculate the spectral difference as mean square error between your ground truth HRTFs and the baseline average HRTFs
    # load all HRTFS, concat in 1 tensor, clone Average_HRTFs as many times and then find get_spectral_distortion
    """Returns:
    baseline prediction MSE in dB
    """

    all_HRTFs = sd.load_all_hrtfs()
    baseline_HRIR = sofar.read_sofa(path_to_baseline_hrtf, verbose=False).Data_IR
    baseline_HRTF = torch.from_numpy(sd._compute_HRTF(baseline_HRIR))
    baseline_HRTF = baseline_HRTF.unsqueeze(0).repeat(all_HRTFs.shape[0], 1, 1, 1)
    eval_metric = MeanSpectralDistortion()

    return eval_metric.get_spectral_distortion(all_HRTFs, baseline_HRTF)


def convert_to_HRIR(hrtfs: np.ndarray) -> np.ndarray:
    return np.fft.irfft(hrtfs, axis=-1)

def save_sofa(HRIR: np.ndarray, output_path: str, reference_sofa: sofar.Sofa):
    """
    Save the HRIR to a SOFA object file. See main() for example usage

    Args:
        HRIR (np.ndarray): HRIR of shape (793, 2, 256).
        output_path (str): Path where the SOFA file will be saved.
        reference_sofa (str): The SOFA object to copy information
    """
    hrtf = reference_sofa
    hrtf.Data_IR = HRIR
    sofar.write_sofa(output_path, hrtf, 0)


if __name__ == "__main__":
    from torch.utils.data import DataLoader

    sonicom_root = "/content/drive/MyDrive/TechArena/TechArena20241120/data"
    sd = SonicomDatabase(sonicom_root, training_data=False, task_id=0)
    train_dataloader = DataLoader(sd, batch_size=1, shuffle=False)

    for i, (images, hrtf) in tqdm.tqdm(enumerate(train_dataloader)):
        print(f"Image size: {images.shape} and HRTF size: {hrtf.shape}")
        break

    # Error = baseline_spectral_distortion(sd)
    # print(Error)

/content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics


0it [00:07, ?it/s]

Image size: torch.Size([1, 2, 19, 1024, 1024]) and HRTF size: torch.Size([1, 19, 2, 129])





# **model.py**



In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import gc
from torch.utils.data import DataLoader

# Modèle d'encodage
class PinnaEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.dropout(x)
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.dropout(x)
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.dropout(x)
        return x

# Générateur HRTF

class HRTFGenerator(nn.Module):
    def __init__(self, num_angles=19, num_freq_bins=129):
        super().__init__()
        self.encoder = PinnaEncoder()
        self.flatten_size = None
        self.fc1 = None
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_angles * num_freq_bins * 2)
        self.dropout = nn.Dropout(0.5)

    def forward(self, images):
        images = images.squeeze(3).float()  # Assurez-vous que les images sont en float32

        batch_size, num_ears, num_views, height, width = images.shape
        features = []
        for ear in range(num_ears):
            ear_features = []
            for view in range(num_views):
                x = images[:, ear, view, :, :].unsqueeze(1)
                x = self.encoder(x)
                ear_features.append(x)
            ear_features = torch.stack(ear_features, dim=1)
            ear_features = torch.mean(ear_features, dim=1)
            features.append(ear_features)
        features = torch.cat(features, dim=1)


        if self.flatten_size is None:
            self.flatten_size = features.view(batch_size, -1).shape[1]
            self.fc1 = nn.Linear(self.flatten_size, 512).to(features.device)

        features = features.view(batch_size, -1)
        x = F.relu(self.fc1(features))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        hrtf = x.view(batch_size, 19, 2, 129)
        return hrtf

# Entraîneur
class HRTFTrainer:
    def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model.to(device)
        self.device = device
        self.criterion = nn.MSELoss()
        self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    def train_epoch(self, dataloader, accumulation_steps=4):
        self.model.train()
        total_loss = 0
        for i, (images, hrtfs) in enumerate(dataloader):
            images = images.to(self.device).float()  # Conversion explicite en float32
            hrtfs = hrtfs.to(self.device).float()  # Conversion explicite en float32

            # Convertir les données complexes en réels
            hrtfs_real = torch.abs(hrtfs)

            self.optimizer.zero_grad()
            predictions = self.model(images)

            # Conversion des prédictions en réels
            predictions_real = torch.abs(predictions)

            # Calcul de la perte
            loss = self.criterion(predictions_real, hrtfs_real)
            loss = loss / accumulation_steps
            loss.backward()

            if (i + 1) % accumulation_steps == 0 or (i + 1) == len(dataloader):
                self.optimizer.step()
                self.optimizer.zero_grad()

            total_loss += loss.item()
            del images, hrtfs, predictions
            gc.collect()
            torch.cuda.empty_cache()
        return total_loss / len(dataloader)

    def validate(self, dataloader):
        self.model.eval()
        total_loss = 0
        with torch.no_grad():
            for images, hrtfs in dataloader:
                images = images.to(self.device).float()  # Conversion explicite en float32
                hrtfs = hrtfs.to(self.device).float()  # Conversion explicite en float32

                # Convertir les données complexes en réels
                hrtfs_real = torch.abs(hrtfs)

                predictions = self.model(images)

                # Conversion des prédictions en réels
                predictions_real = torch.abs(predictions)

                # Calcul de la perte
                loss = self.criterion(predictions_real, hrtfs_real)
                total_loss += loss.item()

                del images, hrtfs, predictions
                gc.collect()
                torch.cuda.empty_cache()
        return total_loss / len(dataloader)


# Entraînement du modèle
def train_model(train_loader, val_loader, num_epochs=50):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = HRTFGenerator().to(device)
    trainer = HRTFTrainer(model)
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}:')
        train_loss = trainer.train_epoch(train_loader)
        val_loss = trainer.validate(val_loader)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')

        print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        gc.collect()
        torch.cuda.empty_cache()

    print("Entraînement terminé.")
    return model


# **Sol.py**

In [7]:
from torchvision import transforms

# Ajout du prétraitement
transform = transforms.Compose([
    transforms.ToPILImage(),            # Conversion en image PIL (nécessaire pour torchvision)
    transforms.Resize((128, 128)),      # Redimensionnement des images
    transforms.ToTensor(),              # Conversion en tenseur
    transforms.Normalize((0.5,), (0.5,))  # Normalisation (valeurs moyennes et écarts-types)
])

# Modification de la méthode __getitem__ pour inclure le transform
class ModifiedSonicomDatabase(SonicomDatabase):
    def __init__(self, *args, transform=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.transform = transform

    def __getitem__(self, idx: int):
        id = self.all_subjects[idx]
        all_images, HRTF = self.get_all_images_and_HRTF_from_id(id)


       # Appliquer la transformation à chaque image
        transformed_images = []
        for ear_images in all_images:  # Parcours des oreilles (gauche et droite)
            transformed_ear = torch.stack([self.transform(img.numpy()) for img in ear_images])
            transformed_images.append(transformed_ear)

        # Empiler les images transformées pour former un tenseur
        all_images = torch.stack(transformed_images)  # [2, num_views, 128, 128]

       # Supprimer les dimensions inutiles si nécessaire
        all_images = all_images.squeeze(3)  # Supprime la dimension 1 s'il y en a une inutile.
        return all_images, HRTF



# Utilisation de la classe modifiée
train_data = ModifiedSonicomDatabase(
    "/content/drive/MyDrive/TechArena/TechArena20241120/data",
    training_data=True,
    transform=transform
)

val_data = ModifiedSonicomDatabase(
    "/content/drive/MyDrive/TechArena/TechArena20241120/data",
    training_data=False,
    transform=transform
)

train_loader = DataLoader(train_data, batch_size=1, shuffle=True)
val_loader = DataLoader(val_data, batch_size=1)

/content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics


In [7]:
# # from torch.utils.data import DataLoader
# # from utils import SonicomDatabase
# # from model import train_model

# # Create dataloaders
# train_data = SonicomDatabase("/content/drive/MyDrive/TechArena/TechArena20241120/data", training_data=True)
# val_data = SonicomDatabase("/content/drive/MyDrive/TechArena/TechArena20241120/data", training_data=False)
# train_loader = DataLoader(train_data, batch_size=1, shuffle=True)
# val_loader = DataLoader(val_data, batch_size=1)


In [8]:
# Train the model
model = train_model(train_loader, val_loader)

Epoch 1/50:


  hrtfs = hrtfs.to(self.device).float()  # Conversion explicite en float32


Epoch 1/50 - Train Loss: 0.0060, Val Loss: 0.0220
Epoch 2/50:
Epoch 2/50 - Train Loss: 0.0044, Val Loss: 0.0149
Epoch 3/50:
Epoch 3/50 - Train Loss: 0.0034, Val Loss: 0.0117
Epoch 4/50:
Epoch 4/50 - Train Loss: 0.0029, Val Loss: 0.0088
Epoch 5/50:
Epoch 5/50 - Train Loss: 0.0027, Val Loss: 0.0104
Epoch 6/50:
Epoch 6/50 - Train Loss: 0.0027, Val Loss: 0.0076
Epoch 7/50:
Epoch 7/50 - Train Loss: 0.0025, Val Loss: 0.0100
Epoch 8/50:
Epoch 8/50 - Train Loss: 0.0025, Val Loss: 0.0077
Epoch 9/50:
Epoch 9/50 - Train Loss: 0.0022, Val Loss: 0.0079
Epoch 10/50:
Epoch 10/50 - Train Loss: 0.0022, Val Loss: 0.0084
Epoch 11/50:
Epoch 11/50 - Train Loss: 0.0023, Val Loss: 0.0061
Epoch 12/50:
Epoch 12/50 - Train Loss: 0.0023, Val Loss: 0.0059
Epoch 13/50:
Epoch 13/50 - Train Loss: 0.0023, Val Loss: 0.0094
Epoch 14/50:
Epoch 14/50 - Train Loss: 0.0021, Val Loss: 0.0074
Epoch 15/50:
Epoch 15/50 - Train Loss: 0.0020, Val Loss: 0.0064
Epoch 16/50:
Epoch 16/50 - Train Loss: 0.0020, Val Loss: 0.0065
Epoch 

In [10]:
torch.save(model.state_dict(), "/content/drive/MyDrive/TechArena/TechArena20241120/best_model.pth")

# inferance.py

In [25]:
!python /content/drive/MyDrive/TechArena/TechArena20241120/inference.py -l /content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics/P0002_left_0.png /content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics/P0002_left_1.png -r /content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics/P0002_right_0.png /content/drive/MyDrive/TechArena/TechArena20241120/data/SONICOM_TestData_pics/P0002_right_1.png -o /content/drive/MyDrive/TechArena/TechArena20241120/data/output/prediction.sofa

  checkpoint = torch.load(model_path, map_location=device)
Saved HRTF to /content/drive/MyDrive/TechArena/TechArena20241120/data/output/prediction.sofa


# **Installation**

In [3]:
!pip install sofar

Collecting sofar
  Downloading sofar-1.2.0-py2.py3-none-any.whl.metadata (4.1 kB)
Collecting netCDF4 (from sofar)
  Downloading netCDF4-1.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting cftime (from netCDF4->sofar)
  Downloading cftime-1.6.4.post1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.7 kB)
Downloading sofar-1.2.0-py2.py3-none-any.whl (129 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.2/129.2 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading netCDF4-1.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (9.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m90.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cftime-1.6.4.post1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m72.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling c

In [None]:
!nvidia-smi

Sun Dec  1 16:27:20 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   36C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [None]:
torch.cuda.empty_cache()

# **Corbeille**

In [None]:
# def load_subject_id_hrtf(self, subject_id: str, return_sofa: bool = False) -> sofar.Sofa | np.ndarray:
    #     """
    #     This function load the HRIR data for the given file name and compute the RFFT of the HRIRs then return HRTFs
    #     Example if the file name is P0001, this function will load the sofa file of P0001 - read it and return the HRTF data of the P001

    #     Args:
    #         subject_id (str): e.g. P0001, ..., P0200
    #     """

    #     hrtf_file = [s for s in self.hrtf_files if subject_id + "_" + self.hrtf_type in s][0]
    #     if not hrtf_file:
    #         print(subject_id + " Not found!")
    #         return None
    #     if return_sofa:
    #         return sofar.read_sofa(hrtf_file, verbose=False)
    #     else:
    #         hrir = self._load_hrir(hrtf_file)
    #         return self._compute_HRTF(hrir)

In [None]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import gc
# from torch.utils.data import DataLoader
# from torchvision import transforms

# # Modèle d'encodage
# class PinnaEncoder(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
#         self.bn1 = nn.BatchNorm2d(32)
#         self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
#         self.bn2 = nn.BatchNorm2d(64)
#         self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
#         self.bn3 = nn.BatchNorm2d(128)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.dropout = nn.Dropout(0.25)

#     def forward(self, x):
#         x = self.pool(F.relu(self.bn1(self.conv1(x))))
#         x = self.dropout(x)
#         x = self.pool(F.relu(self.bn2(self.conv2(x))))
#         x = self.dropout(x)
#         x = self.pool(F.relu(self.bn3(self.conv3(x))))
#         x = self.dropout(x)
#         return x

# # Générateur HRTF
# class HRTFGenerator(nn.Module):
#     def __init__(self, num_angles=19, num_freq_bins=129):
#         super().__init__()
#         self.encoder = PinnaEncoder()
#         # self.flatten_size = 128 * 16 * 16  # Ajusté pour image 128x128
#         # self.fc1 = nn.Linear(self.flatten_size, 512)
#         # self.fc2 = nn.Linear(512, 256)
#         # self.fc3 = nn.Linear(256, num_angles * num_freq_bins * 2)  # *2 pour gauche et droite
#         # self.dropout = nn.Dropout(0.5)
#         self.fc1 = nn.Linear(65536, 512)  # Taille ajustée
#         self.fc2 = nn.Linear(512, 256)
#         self.fc3 = nn.Linear(256, 2 * 129)
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, images):

#         if images.shape[3] == 1:
#           images = images.squeeze(3)

#         batch_size, num_ears, num_views, height, width = images.shape
#         features = []
#         for ear in range(num_ears):
#             ear_features = []
#             for view in range(num_views):
#                 x = images[:, ear, view, :, :].unsqueeze(1)
#                 x = self.encoder(x)
#                 ear_features.append(x)
#             ear_features = torch.stack(ear_features, dim=1)
#             ear_features = torch.mean(ear_features, dim=1)
#             features.append(ear_features)
#         features = torch.cat(features, dim=1)
#         features = features.view(batch_size, -1)
#         print(f"Shape before fc1: {features.shape}")
#         # x = features.view(batch_size, -1)
#         # x = F.relu(self.fc1(x))
#         x = F.relu(self.fc1(features))
#         x = self.dropout(x)
#         x = F.relu(self.fc2(x))
#         x = self.dropout(x)
#         x = self.fc3(x)
#         hrtf = x.view(batch_size, 793, 2, 129)
#         return hrtf


# # Entraîneur
# class HRTFTrainer:
#     def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
#         self.model = model.to(device)
#         self.device = device
#         self.criterion = nn.MSELoss()
#         self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#     def train_epoch(self, dataloader, accumulation_steps=4):
#         self.model.train()
#         total_loss = 0
#         for i, (images, hrtfs) in enumerate(dataloader):
#             images = images.to(self.device).float()
#             hrtfs = hrtfs.to(self.device)
#             self.optimizer.zero_grad()

#             predictions = self.model(images)
#             loss = self.criterion(predictions, hrtfs)
#             loss = loss / accumulation_steps  # Accumulation des gradients
#             loss.backward()

#             if (i + 1) % accumulation_steps == 0 or (i + 1) == len(dataloader):
#                 self.optimizer.step()
#                 self.optimizer.zero_grad()

#             total_loss += loss.item()
#             del images, hrtfs, predictions
#             gc.collect()
#             torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

#     def validate(self, dataloader):
#         self.model.eval()
#         total_loss = 0
#         with torch.no_grad():
#             for images, hrtfs in dataloader:
#                 images = images.to(self.device).float()
#                 hrtfs = hrtfs.to(self.device)
#                 predictions = self.model(images)
#                 loss = self.criterion(predictions, hrtfs)
#                 total_loss += loss.item()
#                 del images, hrtfs, predictions
#                 gc.collect()
#                 torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

# # # Entraînement du modèle
# # def train_model(train_loader, val_loader, num_epochs=50, batch_size=1):
# #     model = HRTFGenerator()
# #     trainer = HRTFTrainer(model)
# #     best_val_loss = float('inf')

# #     for epoch in range(num_epochs):
# #         print(f'Epoch {epoch+1}/{num_epochs}:')
# #         train_loss = trainer.train_epoch(train_loader)
# #         val_loss = trainer.validate(val_loader)

# #         if val_loss < best_val_loss:
# #             best_val_loss = val_loss
# #             torch.save(model.state_dict(), 'best_model.pth')

# #         print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# #         gc.collect()
# #         torch.cuda.empty_cache()
# #     return model

# import torch
# import gc  # Garbage collector pour nettoyer la mémoire GPU

# # Entraînement du modèle
# def train_model(train_loader, val_loader, num_epochs=50, batch_size=1):
#     model = HRTFGenerator().to(device)  # Transférer le modèle sur l'appareil
#     trainer = HRTFTrainer(model)
#     optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
#     best_val_loss = float('inf')

#     for epoch in range(num_epochs):
#         print(f'Epoch {epoch+1}/{num_epochs}:')
#         model.train()  # Mettre le modèle en mode entraînement
#         train_loss = 0.0

#         # Entraînement
#         for images, target in train_loader:
#             images = images.to(device)
#             target = target.to(device)

#             # Gérer les dimensions si besoin
#             if target.shape != images.shape:
#                 print(f"Mismatch de dimensions : images {images.shape} vs target {target.shape}")
#                 target = target[:, :images.shape[1], :, :]

#             # Gérer les données complexes
#             if torch.is_complex(images) or torch.is_complex(target):
#                 images = torch.cat([images.real, images.imag], dim=1)
#                 target = torch.cat([target.real, target.imag], dim=1)

#             # Forward pass
#             optimizer.zero_grad()
#             output = model(images)

#             # Vérification des dimensions de sortie
#             if output.shape != target.shape:
#                 print(f"Output mismatch : output {output.shape}, target {target.shape}")
#                 target = target[:, :output.shape[1], :, :]

#             loss = torch.nn.functional.mse_loss(output, target)
#             loss.backward()
#             optimizer.step()

#             train_loss += loss.item()

#         train_loss /= len(train_loader)
#         print(f'Train Loss: {train_loss:.4f}')

#         # Validation
#         model.eval()  # Mode évaluation
#         val_loss = 0.0
#         with torch.no_grad():
#             for images, target in val_loader:
#                 images = images.to(device)
#                 target = target.to(device)

#                 if torch.is_complex(images) or torch.is_complex(target):
#                     images = torch.cat([images.real, images.imag], dim=1)
#                     target = torch.cat([target.real, target.imag], dim=1)

#                 output = model(images)

#                 if output.shape != target.shape:
#                     target = target[:, :output.shape[1], :, :]

#                 val_loss += torch.nn.functional.mse_loss(output, target).item()

#         val_loss /= len(val_loader)
#         print(f'Val Loss: {val_loss:.4f}')

#         # Sauvegarder le meilleur modèle
#         if val_loss < best_val_loss:
#             best_val_loss = val_loss
#             torch.save(model.state_dict(), 'best_model.pth')

#         # Nettoyer la mémoire
#         gc.collect()
#         torch.cuda.empty_cache()

#     print("Entraînement terminé.")
#     return model

In [None]:
# # Entraîneur
# class HRTFTrainer:
#     def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
#         self.model = model.to(device)
#         self.device = device
#         self.criterion = nn.MSELoss()
#         self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#     def train_epoch(self, dataloader, accumulation_steps=4):
#         self.model.train()
#         total_loss = 0
#         for i, (images, hrtfs) in enumerate(dataloader):
#             images = images.to(self.device).float()
#             hrtfs = hrtfs.to(self.device)

#             # Convertir les données complexes en réels
#             hrtfs_real = torch.abs(hrtfs)  # Utilisation de la magnitude

#             self.optimizer.zero_grad()
#             predictions = self.model(images)

#             # Conversion des prédictions en réels
#             predictions_real = torch.abs(predictions)

#             # Calcul de la perte
#             loss = self.criterion(predictions_real, hrtfs_real)
#             loss = loss / accumulation_steps
#             loss.backward()

#             if (i + 1) % accumulation_steps == 0 or (i + 1) == len(dataloader):
#                 self.optimizer.step()
#                 self.optimizer.zero_grad()

#             total_loss += loss.item()
#             del images, hrtfs, predictions
#             gc.collect()
#             torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

#     def validate(self, dataloader):
#         self.model.eval()
#         total_loss = 0
#         with torch.no_grad():
#             for images, hrtfs in dataloader:
#                 images = images.to(self.device).float()
#                 hrtfs = hrtfs.to(self.device)

#                 # Convertir les données complexes en réels
#                 hrtfs_real = torch.abs(hrtfs)  # Utilisation de la magnitude

#                 predictions = self.model(images)

#                 # Conversion des prédictions en réels
#                 predictions_real = torch.abs(predictions)

#                 # Calcul de la perte
#                 loss = self.criterion(predictions_real, hrtfs_real)
#                 total_loss += loss.item()

#                 del images, hrtfs, predictions
#                 gc.collect()
#                 torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

# class HRTFTrainer:
#     def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
#         self.model = model.to(device)
#         self.device = device
#         self.criterion = nn.MSELoss()
#         self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#     def train_epoch(self, dataloader, accumulation_steps=4):
#         self.model.train()
#         total_loss = 0
#         for i, (images, hrtfs) in enumerate(dataloader):
#             images = images.to(self.device).float()
#             hrtfs = hrtfs.to(self.device)
#             self.optimizer.zero_grad()

#             predictions = self.model(images)
#             loss = self.criterion(predictions, hrtfs)
#             loss = loss / accumulation_steps
#             loss.backward()

#             if (i + 1) % accumulation_steps == 0 or (i + 1) == len(dataloader):
#                 self.optimizer.step()
#                 self.optimizer.zero_grad()

#             total_loss += loss.item()
#             del images, hrtfs, predictions
#             gc.collect()
#             torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

#     def validate(self, dataloader):
#         self.model.eval()
#         total_loss = 0
#         with torch.no_grad():
#             for images, hrtfs in dataloader:
#                 images = images.to(self.device).float()
#                 hrtfs = hrtfs.to(self.device)
#                 predictions = self.model(images)
#                 loss = self.criterion(predictions, hrtfs)
#                 total_loss += loss.item()
#                 del images, hrtfs, predictions
#                 gc.collect()
#                 torch.cuda.empty_cache()
#         return total_loss / len(dataloader)

In [None]:
# class HRTFGenerator(nn.Module):
#     def __init__(self, num_angles=19, num_freq_bins=129):
#         super().__init__()
#         self.encoder = PinnaEncoder()

#         # Taille fictive initiale, à ajuster dynamiquement
#         self.flatten_size = None

#         # Couches fully connected (on ajuste après le premier forward pass)
#         self.fc1 = None
#         self.fc2 = nn.Linear(512, 256)
#         self.fc3 = nn.Linear(256, num_angles * num_freq_bins * 2)  # *2 pour gauche et droite
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, images):
#         images = images.squeeze(3)  # Supprimer une dimension inutile si nécessaire

#         batch_size, num_ears, num_views, height, width = images.shape
#         features = []
#         for ear in range(num_ears):
#             ear_features = []
#             for view in range(num_views):
#                 x = images[:, ear, view, :, :].unsqueeze(1)
#                 x = self.encoder(x)
#                 ear_features.append(x)
#             ear_features = torch.stack(ear_features, dim=1)
#             ear_features = torch.mean(ear_features, dim=1)
#             features.append(ear_features)
#         features = torch.cat(features, dim=1)
#         print(f"Flatten size (features): {features.view(batch_size, -1).shape[1]}")

#         # Déterminer dynamiquement la taille
#         if self.flatten_size is None:
#             self.flatten_size = features.view(batch_size, -1).shape[1]
#             self.fc1 = nn.Linear(self.flatten_size, 512).to(features.device)

#         features = features.view(batch_size, -1)
#         x = F.relu(self.fc1(features))
#         x = self.dropout(x)
#         x = F.relu(self.fc2(x))
#         x = self.dropout(x)
#         x = self.fc3(x)
#         hrtf = x.view(batch_size, 19, 2, 129)  # Ajustement de la sortie
#         return hrtf

# class HRTFGenerator(nn.Module):
#     def __init__(self, num_angles=19, num_freq_bins=129):
#         super().__init__()
#         self.encoder = PinnaEncoder()
#         self.fc1 = nn.Linear(128 * 16 * 16, 512)  # Taille ajustée pour correspondre à l'encodeur
#         self.fc2 = nn.Linear(512, 256)
#         self.fc3 = nn.Linear(256, num_angles * num_freq_bins * 2)  # *2 pour gauche et droite
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, images):
#         # Suppression de la dimension inutile
#         images = images.squeeze(3)

#         batch_size, num_ears, num_views, height, width = images.shape
#         features = []
#         for ear in range(num_ears):
#             ear_features = []
#             for view in range(num_views):
#                 x = images[:, ear, view, :, :].unsqueeze(1)
#                 x = self.encoder(x)
#                 ear_features.append(x)
#             ear_features = torch.stack(ear_features, dim=1)
#             ear_features = torch.mean(ear_features, dim=1)
#             features.append(ear_features)
#         features = torch.cat(features, dim=1)
#         features = features.view(batch_size, -1)
#         x = F.relu(self.fc1(features))
#         x = self.dropout(x)
#         x = F.relu(self.fc2(x))
#         x = self.dropout(x)
#         x = self.fc3(x)
#         hrtf = x.view(batch_size, 19, 2, 129)  # Ajustement de la sortie
#         return hr
# Entraîneur

In [None]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import gc

# class PinnaEncoder(nn.Module):
#     def __init__(self):
#         super().__init__()
#         # CNN layers for processing pinna images
#         self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
#         self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.dropout = nn.Dropout(0.25)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.dropout(x)
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.dropout(x)
#         x = self.pool(F.relu(self.conv3(x)))
#         x = self.dropout(x)
#         return x

# class HRTFGenerator(nn.Module):
#     def __init__(self, num_angles=19, num_freq_bins=129):
#         super().__init__()
#         self.encoder = PinnaEncoder()

#         # Calculate flattened size after convolutions
#         self.flatten_size = 128 * 16 * 16  # Adjust based on input image size

#         # Fully connected layers
#         self.fc1 = nn.Linear(self.flatten_size, 1024)
#         self.fc2 = nn.Linear(1024, 512)
#         self.fc3 = nn.Linear(512, num_angles * num_freq_bins * 2)  # *2 for left and right channels

#         self.dropout = nn.Dropout(0.5)

#     def forward(self, images):
#         batch_size, num_ears, num_views, height, width = images.shape

#         # Process each ear and view separately
#         features = []
#         for ear in range(num_ears):
#             ear_features = []
#             for view in range(num_views):
#                 x = images[:, ear, view, :, :].unsqueeze(1)  # Add channel dimension
#                 x = self.encoder(x)
#                 ear_features.append(x)

#             # Combine features from different views
#             ear_features = torch.stack(ear_features, dim=1)
#             ear_features = torch.mean(ear_features, dim=1)  # Average pooling across views
#             features.append(ear_features)

#         # Combine features from both ears
#         features = torch.cat(features, dim=1)

#         # Flatten and pass through fully connected layers
#         x = features.view(batch_size, -1)
#         x = F.relu(self.fc1(x))
#         x = self.dropout(x)
#         x = F.relu(self.fc2(x))
#         x = self.dropout(x)
#         x = self.fc3(x)

#         # Reshape output to match HRTF format
#         hrtf = x.view(batch_size, -1, 2, 129)  # (batch_size, num_angles, 2, num_freq_bins)

#         return hrtf

# class HRTFTrainer:
#     def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
#         self.model = model.to(device)
#         self.device = device
#         self.criterion = nn.MSELoss()
#         self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#     def train_epoch(self, dataloader):
#         self.model.train()
#         total_loss = 0

#         for images, hrtfs in dataloader:
#             images = images.to(self.device)
#             hrtfs = hrtfs.to(self.device)

#             self.optimizer.zero_grad()
#             images = images.float()
#             predictions = self.model(images)
#             loss = self.criterion(predictions, hrtfs)

#             loss.backward()
#             self.optimizer.step()

#             total_loss += loss.item()

#             del images, hrtfs, predictions, loss  # Delete batch-specific variables
#             gc.collect()  # Collect garbage
#             torch.cuda.empty_cache()  # Free up GPU memory

#         return total_loss / len(dataloader)

#     def validate(self, dataloader):
#         self.model.eval()
#         total_loss = 0

#         with torch.no_grad():
#             for images, hrtfs in dataloader:
#                 images = images.to(self.device)
#                 hrtfs = hrtfs.to(self.device)

#                 predictions = self.model(images)
#                 loss = self.criterion(predictions, hrtfs)
#                 total_loss += loss.item()

#                 del images, hrtfs, predictions, loss  # Delete batch-specific variables
#                 gc.collect()  # Collect garbage
#                 torch.cuda.empty_cache()  # Free up GPU memory

#         return total_loss / len(dataloader)

# def train_model(train_loader, val_loader, num_epochs=50, batch_size=1):
#     model = HRTFGenerator()
#     trainer = HRTFTrainer(model)

#     best_val_loss = float('inf')
#     for epoch in range(num_epochs):
#         print(f'Epoch {epoch+1}/{num_epochs}:')
#         train_loss = trainer.train_epoch(train_loader)
#         val_loss = trainer.validate(val_loader)

#         if val_loss < best_val_loss:
#             best_val_loss = val_loss
#             torch.save(model.state_dict(), 'best_model.pth')

#         print(f'Epoch {epoch+1}/{num_epochs}:')
#         print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

#         gc.collect()
#         torch.cuda.empty_cache()

#     return model