# <center> GeNNus </center>


## <center> Imports

In [19]:
import torch
from torch.utils.data import Dataset

import torchaudio
import torchaudio.transforms

import sys, os

from pprint import pprint

from tqdm import tqdm

import json

import numpy as np

import matplotlib.pylab as plt
import seaborn as sns

import librosa
import librosa.display

import pandas as pd

from pathlib import Path

import gc



## <center> Data pre-processing </center>

In [20]:
def label_from_str_to_one_hot(label_str: str): 
  
  if label_str == "Pop":
    return torch.tensor([1, 0, 0, 0, 0, 0])
  
  if label_str == "Hip-Hop":
    return torch.tensor([0, 1, 0, 0, 0, 0])
  
  if label_str == "Electronic":
    return torch.tensor([0, 0, 1, 0, 0, 0])
  
  if label_str == "Rock":
    return torch.tensor([0, 0, 0, 1, 0, 0])

  if label_str == "Folk":
    return torch.tensor([0, 0, 0, 0, 1, 0])

  if label_str == "Jazz":
    return torch.tensor([0, 0, 0, 0, 0, 1])

In [21]:
def load_raw_audio_data(
  path, normalize_audio: bool, audio_num_frames: int
):
  
  audio_tensor_list = []
  
  num_audio_files_unable_to_open = 0

  # TODO use glob instead of this
  for path, subdirs, files in tqdm(os.walk(path), colour="teal"):
    for name in tqdm(files, colour="turquoise"):
        
      file_audio_path = os.path.join(path, name)
      
      try:
        waveform, sample_rate = torchaudio.load(
          file_audio_path, normalize=normalize_audio,
          num_frames=audio_num_frames
        )
        
        label = file_audio_path.split("/")[-2]
        label_one_hot = label_from_str_to_one_hot(label)
        
        audio_tensor_list.append(
          {
            "waveform": waveform.numpy(), 
            "og_sample_rate": sample_rate,
            "label_one_hot": label_one_hot.numpy(),
            "label": label,
            "path": file_audio_path,
            "hop_length": -1
          }
        )

        
      except:
        print(f"[load_audio_data] error while loading {file_audio_path}")
        num_audio_files_unable_to_open += 1
        continue
  
  return pd.DataFrame(audio_tensor_list)
        
        

In [26]:
DATASET_SIZE = "small"
DATASET_NAME = f"fma_{DATASET_SIZE}_organized_by_label_resampled_rechanneled"
DATASET_FOLDER = "./data/audio"
# DATASET_FOLDER = "/mnt/ramdisk"

dataset_path = f"{DATASET_FOLDER}/{DATASET_NAME}"

AUDIO_NUM_FRAMES = 240000

In [27]:
og_audio_pd = load_raw_audio_data(
  # path=f"./data/audio/fma_{DATASET_SIZE}_organized_by_label_resampled_rechanneled/", 
  path=dataset_path, 
  normalize_audio=True, 
  audio_num_frames=AUDIO_NUM_FRAMES
)

0it [00:00, ?it/s]


[load_audio_data] error while loading ./data/audio/fma_small_organized_by_label_resampled_rechanneled/Pop/023431.mp3


100%|██████████| 999/999 [00:08<00:00, 112.23it/s]
100%|██████████| 999/999 [00:09<00:00, 110.63it/s]
100%|██████████| 999/999 [00:08<00:00, 113.68it/s]
100%|██████████| 999/999 [00:08<00:00, 111.43it/s]
100%|██████████| 999/999 [00:08<00:00, 111.39it/s]
100%|██████████| 999/999 [00:09<00:00, 109.48it/s]
7it [00:53,  7.69s/it]


In [None]:
og_audio_pd.iloc[0]["waveform"].shape

In [None]:
# "audio_hop" --> take one sample ever hop_length elements
def resample_audio(og_audio_pd, hop_length: int):
  
  resampled_audio_tensor_list = []
  
  for row_index, audio in tqdm(og_audio_pd.iterrows(), colour="steelblue"):
    
    resampled_waveform = torchaudio.functional.resample(
        audio["waveform"], 
        # as per Torch Audio docs, this is the way of performing "hopping" in a 
        # similar way as the Mel Spectrogram transform does
        orig_freq=hop_length, new_freq=1
      )
    
    resampled_audio_tensor_list.append(
      {
        "waveform": resampled_waveform.numpy(), 
        "og_sample_rate": audio["og_sample_rate"],
        "hop_length": hop_length,
        "label_one_hot": audio["label_one_hot"].numpy(),
        "label": audio["label"],
        "path": audio["path"]
      }
    )
    
  return pd.DataFrame(resampled_audio_tensor_list)

In [None]:
# hop_length = 24

# hopped_audio_pd = resample_audio(og_audio_pd, hop_length)

In [None]:
# hopped_audio_pd.iloc[0]["waveform"].shape

In [None]:
# code taken from: https://stackoverflow.com/a/47626762

class NumpyEncoder(json.JSONEncoder):
  
  def default(self, obj):
    
    if isinstance(obj, np.ndarray):
      return obj.tolist()
    
    return json.JSONEncoder.default(self, obj)

def export_audio_data(audio_pd, audio_data_type):
  
  for row_ind, audio in tqdm(audio_pd.iterrows()):
    torchaudio.save(
      audio["path"], audio[audio_data_type], audio["og_sample_rate"]
  )
    
def export_mel_spectrogram(audio_pd, path, file_name):
  
  audio_pd.to_json(f"{path}/{file_name}")
    
    
def export_audio_metadata(audio_pd, metadata_path, metadata_cols):
  temp_pd = audio_pd[metadata_cols]
  
  temp_pd.to_json(metadata_path)

In [None]:
# export_audio_data(
#   og_audio_pd, "waveform"
# )

export_audio_metadata(
  og_audio_pd, 
  f"./data/audio/fma_{DATASET_SIZE}_organized_by_label_resampled_rechanneled/metadata.json",
  og_audio_pd.columns
)

In [None]:
def get_mel_spectrogram_transform(
  sample_rate: int, n_fft, win_length, hop_length, n_mels
):

  return torchaudio.transforms.MelSpectrogram(
    sample_rate=sample_rate,
    n_fft=n_fft,
    win_length=win_length,
    hop_length=hop_length,
    center=True,
    pad_mode="reflect",
    power=2.0,
    norm='slaney',
    onesided=True,
    n_mels=n_mels,
    mel_scale="htk",
  )

def get_mel_spectrogram(
  og_audio_pd, n_fft, win_length, hop_length, n_mels
):
  
  mel_spectrogram_tensor_list = []
  
  for row_ind, audio in og_audio_pd.iterrows():
    
    mel_spectrogram_transform = get_mel_spectrogram_transform(
      audio["og_sample_rate"],
      n_fft=n_fft, win_length=win_length, hop_length=hop_length, n_mels=n_mels
    )
    
    mel_spectrogram = mel_spectrogram_transform(audio["waveform"])
    
    mel_spectrogram_tensor_list.append(
      {
        "mel_spectrogram": mel_spectrogram.numpy(), 
        "og_sample_rate": audio["og_sample_rate"],
        "label_one_hot": audio["label_one_hot"],
        "label": audio["label"],
        "mel_spectrogram_config": {
          "n_fft": n_fft, 
          "win_length": win_length, 
          "hop_length": hop_length, 
          "n_mels": n_mels
        }
      }
    )
    
  return pd.DataFrame(mel_spectrogram_tensor_list)

In [None]:
n_fft=1024
win_length=None
hop_length=512
n_mels=128

In [None]:
mel_spectros_pd = get_mel_spectrogram(
  og_audio_pd, 
  n_fft=n_fft, 
  win_length=win_length, 
  hop_length=hop_length, 
  n_mels=n_mels
)

In [None]:
pprint(mel_spectros_pd.iloc[0]["mel_spectrogram"].shape)

In [None]:
def plot_spectrogram(spec, title=None, ylabel='freq_bin', aspect='auto', xmax=None):
  fig, axs = plt.subplots(1, 1)
  axs.set_title(title or 'Spectrogram (db)')
  axs.set_ylabel(ylabel)
  axs.set_xlabel('frame')
  im = axs.imshow(librosa.power_to_db(spec), origin='lower', aspect=aspect)
  if xmax:
    axs.set_xlim((0, xmax))
  fig.colorbar(im, ax=axs)
  plt.show(block=False)

In [None]:
plot_spectrogram(
  mel_spectros_pd.iloc[29]["mel_spectrogram"][0], title="MelSpectrogram - torchaudio - channel 0", ylabel='mel freq'
)

In [None]:
# TODO place all songs of similar genre in the same Tensor, in order to perform the average.
# Plot that average and storytell it as the average spectrogram of the genre.

In [None]:
# TODO place PCA and other non-linear dimensionality reductions here to show the data space

In [None]:
export_mel_spectrogram(
  mel_spectros_pd, 
  f"./data/spectrograms/fma_{DATASET_SIZE}_organized_by_label_resampled_rechanneled",
  "spectrograms.json"
)