# Custom dataset class

In some cases, especially if the time signal is longer, the dataset becomes too large for the memory. Therefore a custom dataset class has been implemented to preprocess the time signal on the fly for every batch. This needs less RAM memory than deviding batches after the preprocessing has been done and saved for every direction.

In [None]:
import math
import numpy.fft as FFT

# custom dataset class; if RAM is too small to load the whole data
class SpectroDatset(data_utils.Dataset):
  #constructor
  def __init__(self, signal):
    i_d = 0
    hrir_l, hrir_r = data[i_d][0], data[i_d][1]
    self.signal = signal
    signal_l = np.convolve(self.signal, hrir_l, mode='valid')
    self.win_length = int(sr*0.025)
    self.hop_lenght = int(sr*0.01)
    self.n_t_frames = math.ceil((len(signal_l) - self.win_length) / self.hop_lenght)
    if (len(signal_l) - self.win_length) % self.hop_lenght < 0.5:
      print((len(signal_l) - self.win_length) % self.hop_lenght)

  def __len__(self):
    n_directions = np.shape(direction)[0]
    return n_directions * self.n_t_frames

  def __getitem__(self, idx):
    i_d = idx//self.n_t_frames
    i_t_frame = idx % self.n_t_frames

    hrir_l, hrir_r = data[i_d][0], data[i_d][1]
    signal_l = np.convolve(self.signal, hrir_l, mode='valid')
    signal_r = np.convolve(self.signal, hrir_r, mode='valid')
    # sym When True (default), generates a symmetric window, for use in filter design. When False, generates a periodic window, for use in spectral analysis.
    windowed_signal_l = signal_l[ self.hop_lenght*i_t_frame :  self.hop_lenght*i_t_frame + self.win_length ] * scipy.signal.windows.hann(self.win_length, sym=True)
    windowed_signal_r = signal_r[ self.hop_lenght*i_t_frame :  self.hop_lenght*i_t_frame + self.win_length ] * scipy.signal.windows.hann(self.win_length, sym=True)
    
    fft_l = 2/self.win_length * FFT.rfft(windowed_signal_l, n=self.win_length)
    fft_r = 2/self.win_length * FFT.rfft(windowed_signal_r, n=self.win_length)
    f = FFT.rfftfreq(self.win_length, d=1/sr)
    f_mask = np.logical_and(f >= 20, f <= 20000)
    fft_l, fft_r = fft_l[f_mask], fft_r[f_mask]
    ILD = 20*np.log10(np.abs(fft_l)) - 20*np.log10(np.abs(fft_r))
    return torch.tensor(ILD).float(), torch.tensor(direction[i_d]).float()



# Create and use custom dataset instance

In [None]:
dataset = SpectroDatset(signal=noise)
batch_size = len(dataset)
data_loader = data_utils.DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0)
criterion = nn.MSELoss()
total_loss = 0
for features_batch, labels_batch in data_loader:
  preds = model(features_batch) # Pass Batch
  loss = criterion(preds, labels_batch) # Calculate Loss
  total_loss += loss.item()