# Datasets similar to MNIST 

## Classes for microphones and MeshRIR



Check pos_src file

In [1]:
import numpy as np

file_path = 'data/MeshRIR/raw/S32-M441_npy/pos_src.npy'  # Replace with your actual file path

with open(file_path, 'rb') as f:
    version = np.lib.format.read_magic(f)
    shape, fortran_order, dtype = np.lib.format._read_array_header(f, version)

print("Array shape:", shape)
print("Data type:", dtype)
print("Fortran order:", fortran_order)

Array shape: (32, 3)
Data type: float64
Fortran order: False


Check pos_mic file

In [2]:
# Cell 3: Read only the header of the .npy file
file_path = 'data/MeshRIR/raw/S32-M441_npy/pos_mic.npy'  # Replace with your actual file path


with open(file_path, 'rb') as f:
    version = np.lib.format.read_magic(f)
    shape, fortran_order, dtype = np.lib.format._read_array_header(f, version)

print("Array shape:", shape)
print("Data type:", dtype)
print("Fortran order:", fortran_order)


Array shape: (441, 3)
Data type: float64
Fortran order: False


Check ir file

In [3]:
# file_path = f'data/MeshRIR/raw/S32-M441_npy/ir_{0}.npy'  # Replace with your actual file path
file_path = f'data/MeshRIR/raw/S1-M3969_npy/ir_{0}.npy'  # Replace with your actual file path

with open(file_path, 'rb') as f:
    version = np.lib.format.read_magic(f)
    shape, fortran_order, dtype = np.lib.format._read_array_header(f, version)

print("Array shape:", shape)
print("Data type:", dtype)
print("Fortran order:", fortran_order)

Array shape: (1, 32768)
Data type: float64
Fortran order: False


Implementation of the classes

In [4]:
# 48000 Hz = 1/s
# c = 340 m/s 
# 1 m -> 1/340 s

print(f"1 meter in : {1/340*1000} ms")
print(f"Samples per ms: {48000/1000}")
print()
print(f"Mic1 and Mic2 with 1 meter difference in the direction of the source: \nAre (fs / cs) = {1/340*48000} samples apart")

# samples/s * 1/(m/s) = samples/m



1 meter in : 2.941176470588235 ms
Samples per ms: 48.0

Mic1 and Mic2 with 1 meter difference in the direction of the source: 
Are (fs / cs) = 141.1764705882353 samples apart


In [None]:
# export
import os
import glob
from pathlib import Path
import torch
from torch.utils.data import Dataset
from torchvision.datasets.utils import check_integrity, download_and_extract_archive, extract_archive, verify_str_arg
from urllib.error import URLError
import numpy as np

""" 
    01. Instead of loading numpy files just to check it's shape, I can just read the header (read_npy_header)
    02. MNIST uses "class attributes" (common to all instances) mirrors and resources to indicate where to download the data
        * 02.1 The other attributes are instance attributes, so they are different for each instance
    03. From MNIST I also use how it checks for the data, downloads, extracts and the folder tree where it downloads (/(user_def_data_path)/(class_name)/raw)
    04. I let the user give the dataset name, so for example, "S" is common to both datasets and will download both
    05. Since each instance of this class will give microphone signals of a certain size, from a specific source, and one environment (dataset):
        05.01 I give in the constructor only one dataset name or choose the first matching one
        05.02 I define the starting sample of the signal in the constructor, instead of hardcoding it whenever I use the "get_mic" function
        05.03 I define the size of the signal in the constructor, instead of hardcoding it whenever I use the "get_mic" function
    06. I can make a wrapper (torch.utils.data.Dataset) of this class to load only a set of microphones using the "get_mic" function.
"""

def read_npy_header(file_path):
    with open(file_path, 'rb') as f:
        version = np.lib.format.read_magic(f)
        shape, fortran_order, dtype = np.lib.format._read_array_header(f, version)
    return shape, fortran_order, dtype

class DB_microphones:
    """
        Base class for microphone databases.
        I define the @property methods here, so I don't have to redefine them in the subclasses.
    """
    
    def __init__(self, root: str):
        self.root = root

    @property
    def raw_folder(self) -> str:
        return os.path.join(self.root, self.__class__.__name__, "raw")

    @property
    def dt(self):
        return 1.0 / self.fs    

    @property
    def fs(self):
        return self._fs

    @property
    def n_mics(self):
        return self._nmics
    
    @property
    def nt(self):
        return self._nt

    @property
    def n_sources(self):
        return self._n_sources

    @property
    def source_id(self):
        return self._source_id
    
    @property
    def signal_size(self):
        return self._signal_size
    
    @property
    def start_signal(self):
        return self._start_signal
    
    def get_nmics(self):
        return self.n_mics

    def get_mic(self, imic, start, size):
        raise NotImplementedError("Must be implemented in subclass")

    def get_pos(self, imic):
        raise NotImplementedError("Must be implemented in subclass")

    def get_time(self, start, size):
        return (start + np.arange(size)) * self.dt


class MeshRIR(DB_microphones):

    mirrors = [
        "https://zenodo.org/records/10852693/files/"
    ]

    resources = [
        ("S1-M3969_npy.zip", "2cb598eb44bb9905560c545db7af3432" ),
        ("S32-M441_npy.zip", "9818fc66b36513590e7abd071243d8e9"), 
    ]

    _fs = 48000
    

    def __init__(self, root: str, 
                 download: bool = False, 
                 dataset: str = "S1", 
                 source_id: int = 0,
                 start_signal: int = 0,
                 signal_size: int = 512):
        super().__init__(root=root)
        self.root = root
        self.dataset = dataset # Here it is user defined, later the dataset will save the name of the dataset according to the ones in self.resources
        self._start_signal = start_signal
        self._signal_size = signal_size

        # check if /raw exists and load the data from there
        # or download the data and process it
        if download:
            self.download(dataset=self.dataset)

        if not self._check_exists():
            raise RuntimeError("Dataset not found. You can use download=True to download it")
        
        # Before this, we could download multiple datasets, but we should only choose one
        datasets = [filename for filename, _ in self.resources if dataset in filename]
        if len(datasets) > 1:
            print(f"Warning, Datasets found for '{dataset}': \n{datasets}. \nUsing the first one: {datasets[0]}")
        self.dataset = Path(datasets[0]).stem

        # Data path: 
        self.data_path = os.path.join(self.raw_folder, self.dataset)

        # Check number of microphones (from positions and number of ir files)
        pos_mics_shape, _, _ = read_npy_header(Path(self.data_path, "pos_mic.npy")) 
        self._nmics = pos_mics_shape[0]

        # Source id
        mic_signal_shape, _, _ = read_npy_header(Path(self.data_path, f"ir_{0}.npy"))
        self._n_sources, self._nt = mic_signal_shape
        assert source_id < self._n_sources , f"Database has {self._n_sources} sources. Choose source_id in [0, {self._n_sources-1}]. "
        self._source_id = source_id
        
        # number of microphones. ir_ files in the folder, each is a microphone
        nfiles = len(glob.glob(os.path.join(self.data_path, 'ir_*.npy')))
        assert self._nmics==nfiles, f"ir_xxx.npy files = {nfiles}, should be {self._nmics}"

    def load_src_positions(self):
        # Source position in the dataset
        pos_src_path = os.path.join(self.data_path, 'pos_src.npy')
        self._source_positions = np.load(pos_src_path)

    def load_mic_positions(self):
        # Position of the microphones
        pos_mic_path = os.path.join(self.data_path, 'pos_mic.npy')
        self._pos_mics = np.load(pos_mic_path) # (nmics, 3)  each row is (x,y,z) for a mic

    # Modified from MNIST, this one checks if the .zip files are int the /raw folder
    def _check_exists(self, dataset:str = "S1") -> bool:
        """
        Checks if the numpy file exists in the raw folder.
        If it exists, we do not download and extract.
        It may be that the .zip file is there but not the numpy files,
        in which case, download_and_extract_archive will be called.
        If it detects the .zip if will just extract, otherwise download and extract.
        dataset: str
            The dataset to check for. It can be "S1" or "S32", if only one dataset to download
            if "S" it downloads both datasets.
        """

        # numpy files in the raw folder
        files = glob.glob(os.path.join(self.raw_folder, "*_npy") )

        urls = [url for url, _ in self.resources]

        # Check if the pattern in dataset is in any of the resources
        found_in_res = any(dataset in url for url in urls)
        if not found_in_res:
            raise ValueError(f"Dataset '{dataset}' not found in any of the resources: {urls}")

        # Check if the pattern in dataset is in any of the numpy files
        found = any(dataset in os.path.basename(file) for file in files)

        return found

    def download(self, dataset:str ="S1") -> None:
        """
        Download the data if it doesn't exist already.
        Args:
            dataset (str): The dataset to download. It can be "S1" or "S32", if only one dataset to download
            if "S" it downloads both datasets.
        """

        # Checks that the .zip files exists
        if self._check_exists(dataset):
            return

        # Create folders 
        os.makedirs(self.raw_folder, exist_ok=True)

        # Check what file to download according to dataset
        matching_resources = [(filename, md5) for filename, md5 in self.resources if dataset in filename]

        # download files
        for filename, md5 in matching_resources:
            for mirror in self.mirrors:
                
                url = f"{mirror}{filename}"
                try:
                    download_and_extract_archive(url, download_root=self.raw_folder, filename=filename, md5=md5)
                except URLError as error:
                    print(f"Failed to download (trying next):\n{error}")
                    continue
                finally:
                    print()
                break
            else:
                raise RuntimeError(f"Error downloading {filename}")
    
    def load_all_data(self):
        # Concatenate vectors (source, signal) -> into -> (source, imic, signal)
        data = np.concatenate( 
            [np.load(os.path.join(self.data_path, f'ir_{i}.npy'))[:,None,:]  
             for i in range(self._nmics)], # for all mics
             axis = 1 ) # in axis 1 (mics)  (source, mics, signal)
        return data

    def load_mic(self, imic):
        mic_signal = np.load(os.path.join(self.data_path, f'ir_{imic}.npy')) # (source, signal)
        return mic_signal[self.source_id, :]
        
    def get_pos(self, imic):
        if not hasattr(self, "_pos_mics"):
            self.load_mic_positions()
        return self._pos_mics[imic,:]
    
    def get_mic(self, imic, start=None, size=None):
        if start is None:
            start = self.start_signal
        if size is None:
            size = self.signal_size
        return self.load_mic(imic)[start:start+size]
    
    def get_time(self, start=None, size=None):
        if start is None:
            start = self.start_signal
        if size is None:
            size = self.signal_size
        t0 = 0.0
        t = np.arange(0, self._nt)*self.dt + t0
        return t[start:start+size]
    
    def get_nmics(self):        
        return self.n_mics

    def get_src_pos(self):
        if not hasattr(self, "_source_positions"):
            self.load_src_positions()
        return self._source_positions[self.source_id]



S1-M3969_npy


In [None]:

ds = MeshRIR(root="data", download=True, dataset="S1", source_id=0)
print(ds.dataset)


Tests of class methods

In [6]:
# ds.load_all_data() # Pass
ds.load_mic(3) # Pass
ds.get_pos(1) # Pass
ds.get_mic(3, start=0, size=ds.nt).shape # Pass
ds.get_mic(3, start=0, size=ds.nt).shape # Pass
ds.get_mic(3).shape # Pass
ds.get_time(start=0, size=ds.nt).shape # Pass
ds.get_time().shape # Pass
ds.get_nmics() # Pass
ds.get_src_pos() # Pass

array([2. , 1.5, 0. ])

Test of property methods

In [7]:
# ds.fs=4 # AttributeError: can't set attribute
# ds.dt() # TypeError: 'float' object is not callable
print(ds.fs)
print(ds.dt)
print(ds.n_mics)
print(ds.nt)
print(ds.n_sources)
print(ds.source_id)
print(ds.start_signal)
print(ds.signal_size)



48000
2.0833333333333333e-05
3969
32768
1
0
0
512


## Class for RIR Zea

In [None]:
# class ZeaRIR(DB_microphones):

#     mirrors = [
#         "https://zenodo.org/records/10852693/files/"
#     ]

#     resources = [
#         ("S1-M3969_npy.zip", "2cb598eb44bb9905560c545db7af3432" ),
#         ("S32-M441_npy.zip", "9818fc66b36513590e7abd071243d8e9"), 
#     ]

#     _fs = 48000
    

#     def __init__(self, root: str, 
#                  download: bool = False, 
#                  dataset: str = "S1", 
#                  source_id: int = 0,
#                  start_signal: int = 0,
#                  signal_size: int = 512):
#         super().__init__(root=root)
#         self.root = root
#         self.dataset = dataset # Here it is user defined, later the dataset will save the name of the dataset according to the ones in self.resources
#         self._start_signal = start_signal
#         self._signal_size = signal_size

#         # check if /raw exists and load the data from there
#         # or download the data and process it
#         if download:
#             self.download(dataset=self.dataset)

#         if not self._check_exists():
#             raise RuntimeError("Dataset not found. You can use download=True to download it")
        
#         # Before this, we could download multiple datasets, but we should only choose one
#         datasets = [filename for filename, _ in self.resources if dataset in filename]
#         if len(datasets) > 1:
#             print(f"Warning, Datasets found for '{dataset}': \n{datasets}. \nUsing the first one: {datasets[0]}")
#         self.dataset = Path(datasets[0]).stem

#         # Data path: 
#         self.data_path = os.path.join(self.raw_folder, self.dataset)

#         # Check number of microphones (from positions and number of ir files)
#         pos_mics_shape, _, _ = read_npy_header(Path(self.data_path, "pos_mic.npy")) 
#         self._nmics = pos_mics_shape[0]

#         # Source id
#         mic_signal_shape, _, _ = read_npy_header(Path(self.data_path, f"ir_{0}.npy"))
#         self._n_sources, self._nt = mic_signal_shape
#         assert source_id < self._n_sources , f"Database has {self._n_sources} sources. Choose source_id in [0, {self._n_sources-1}]. "
#         self._source_id = source_id
        
#         # number of microphones. ir_ files in the folder, each is a microphone
#         nfiles = len(glob.glob(os.path.join(self.data_path, 'ir_*.npy')))
#         assert self._nmics==nfiles, f"ir_xxx.npy files = {nfiles}, should be {self._nmics}"

#     def load_src_positions(self):
#         # Source position in the dataset
#         pos_src_path = os.path.join(self.data_path, 'pos_src.npy')
#         self._source_positions = np.load(pos_src_path)

#     def load_mic_positions(self):
#         # Position of the microphones
#         pos_mic_path = os.path.join(self.data_path, 'pos_mic.npy')
#         self._pos_mics = np.load(pos_mic_path) # (nmics, 3)  each row is (x,y,z) for a mic

#     # Modified from MNIST, this one checks if the .zip files are int the /raw folder
#     def _check_exists(self, dataset:str = "S1") -> bool:
#         """
#         Checks if the numpy file exists in the raw folder.
#         If it exists, we do not download and extract.
#         It may be that the .zip file is there but not the numpy files,
#         in which case, download_and_extract_archive will be called.
#         If it detects the .zip if will just extract, otherwise download and extract.
#         dataset: str
#             The dataset to check for. It can be "S1" or "S32", if only one dataset to download
#             if "S" it downloads both datasets.
#         """

#         # numpy files in the raw folder
#         files = glob.glob(os.path.join(self.raw_folder, "*_npy") )

#         urls = [url for url, _ in self.resources]

#         # Check if the pattern in dataset is in any of the resources
#         found_in_res = any(dataset in url for url in urls)
#         if not found_in_res:
#             raise ValueError(f"Dataset '{dataset}' not found in any of the resources: {urls}")

#         # Check if the pattern in dataset is in any of the numpy files
#         found = any(dataset in os.path.basename(file) for file in files)

#         return found

#     def download(self, dataset:str ="S1") -> None:
#         """
#         Download the data if it doesn't exist already.
#         Args:
#             dataset (str): The dataset to download. It can be "S1" or "S32", if only one dataset to download
#             if "S" it downloads both datasets.
#         """

#         # Checks that the .zip files exists
#         if self._check_exists(dataset):
#             return

#         # Create folders 
#         os.makedirs(self.raw_folder, exist_ok=True)

#         # Check what file to download according to dataset
#         matching_resources = [(filename, md5) for filename, md5 in self.resources if dataset in filename]

#         # download files
#         for filename, md5 in matching_resources:
#             for mirror in self.mirrors:
                
#                 url = f"{mirror}{filename}"
#                 try:
#                     download_and_extract_archive(url, download_root=self.raw_folder, filename=filename, md5=md5)
#                 except URLError as error:
#                     print(f"Failed to download (trying next):\n{error}")
#                     continue
#                 finally:
#                     print()
#                 break
#             else:
#                 raise RuntimeError(f"Error downloading {filename}")
    
#     def load_all_data(self):
#         # Concatenate vectors (source, signal) -> into -> (source, imic, signal)
#         data = np.concatenate( 
#             [np.load(os.path.join(self.data_path, f'ir_{i}.npy'))[:,None,:]  
#              for i in range(self._nmics)], # for all mics
#              axis = 1 ) # in axis 1 (mics)  (source, mics, signal)
#         return data

#     def load_mic(self, imic):
#         mic_signal = np.load(os.path.join(self.data_path, f'ir_{imic}.npy')) # (source, signal)
#         return mic_signal[self.source_id, :]
        
#     def get_pos(self, imic):
#         if not hasattr(self, "_pos_mics"):
#             self.load_mic_positions()
#         return self._pos_mics[imic,:]
    
#     def get_mic(self, imic, start=None, size=None):
#         if start is None:
#             start = self.start_signal
#         if size is None:
#             size = self.signal_size
#         return self.load_mic(imic)[start:start+size]
    
#     def get_time(self, start=None, size=None):
#         if start is None:
#             start = self.start_signal
#         if size is None:
#             size = self.signal_size
#         t0 = 0.0
#         t = np.arange(0, self._nt)*self.dt + t0
#         return t[start:start+size]
    
#     def get_nmics(self):        
#         return self.n_mics

#     def get_src_pos(self):
#         if not hasattr(self, "_source_positions"):
#             self.load_src_positions()
#         return self._source_positions[self.source_id]

# ds = MeshRIR(root="data", download=True, dataset="S1", source_id=0)
# print(ds.dataset)
