# Import modules and connect drive

In [None]:
import numpy as np
import os
import pickle
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K
import pdb
import matplotlib.pyplot as plt
import librosa
import shutil
import random
import time
import soundfile as sf
from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:


def get_pitch_and_velocity(json_path, file_names):
    """
    Retrieve the pitch and velocity information of the files in file_names using
    the json file in json_path

    Parameters:
    json_path  (str): location of the json file
    file_names (str): name of files for which to retrieve pitch and velocity

    Returns:
    pitch (ndarray)
    velocity (ndarray)

    """
    with open(json_path, 'r') as f:
        data = json.load(f)

    file_data = [data[file_name] for file_name in file_names]
    pitch = [file_datum['pitch'] for file_datum in file_data]
    velocity = [file_datum['velocity'] for file_datum in file_data]
    
    return pitch,velocity

class Loader:
    """Loader is responsible for loading an audio file."""

    def __init__(self, sample_rate, duration, mono):
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono

    def load(self, file_path):
        signal = librosa.load(file_path,
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]
        return signal


class Padder:
    """Padder is responsible to apply padding to an array."""

    def __init__(self, mode="constant"):
        self.mode = mode

    def left_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    def right_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


class LogSpectrogramExtractor:
    """LogSpectrogramExtractor extracts log spectrograms (in dB) from a
    time-series signal.
    """

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,
                            n_fft=self.frame_size,
                            hop_length=self.hop_length,center=True)[:-1]
        spectrogram = np.abs(stft)
        phase_spectrogram = np.angle(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)
        return log_spectrogram, phase_spectrogram


class MinMaxNormaliser:
    """MinMaxNormaliser applies min max normalisation to an array."""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


class Saver:
    """saver is responsible to save features, and the min max values."""

    def __init__(self, feature_save_dir1, feature_save_dir2, min_max_values_save_dir):
        self.feature_save_dir1 = feature_save_dir1
        self.feature_save_dir2 = feature_save_dir2
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature1(self, feature, file_path):
        save_path = self._generate_save_path1(file_path)
        np.save(save_path, feature)
        return save_path

    def save_feature2(self, feature, file_path):
        save_path = self._generate_save_path2(file_path)
        np.save(save_path, feature)
        return save_path

    def save_min_max_values(self, min_max_values):
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path1(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir1, file_name + ".npy")
        return save_path

    def _generate_save_path2(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir2, file_name + ".npy")
        return save_path


class PreprocessingPipeline:
    """PreprocessingPipeline processes audio files in a directory, applying
    the following steps to each file:
        1- load a file
        2- pad the signal (if necessary)
        3- extracting log spectrogram from signal
        4- normalise spectrogram
        5- save the normalised spectrogram
    Storing the min max values for all the log spectrograms.
    """

    def __init__(self):
        self.padder = None
        self.extractor = None
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):
        start_time = time.time()
        num_files_written = 0
        for root, _, files in os.walk(audio_files_dir):
            for file in files:
                file_path = os.path.join(root, file)
                self._process_file(file_path)
                print(f"Processed file {file_path}")
                num_files_written += 1
                elapsed_time = time.time() - start_time
                avg_time_per_file = elapsed_time / num_files_written
                remaining_files = len(files) - num_files_written
                eta = avg_time_per_file * remaining_files
                print(f"ETA: {eta:.2f} seconds")

        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        log_spec,phase_spec = self.extractor.extract(signal)

        #magnitude
        norm_feature = self.normaliser.normalise(log_spec)
        save_path = self.saver.save_feature1(norm_feature, file_path)

        #phase
        
        self.saver.save_feature2(phase_spec, file_path)
        self._store_min_max_value(save_path, log_spec.min(), log_spec.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }

def generate_log_spectrogram_and_save(SAMPLE_RATE,DURATION,MONO,FRAME_SIZE,HOP_LENGTH,SPECTROGRAMS_SAVE_DIR,phase_save_dir,MIN_MAX_VALUES_SAVE_DIR):
    "as the name suggests"
    loader = Loader(SAMPLE_RATE, DURATION, MONO)
    padder = Padder()
    log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
    min_max_normaliser = MinMaxNormaliser(0, 1)
    saver = Saver(SPECTROGRAMS_SAVE_DIR, phase_save_dir, MIN_MAX_VALUES_SAVE_DIR)

    preprocessing_pipeline = PreprocessingPipeline()
    preprocessing_pipeline.loader = loader
    preprocessing_pipeline.padder = padder
    preprocessing_pipeline.extractor = log_spectrogram_extractor
    preprocessing_pipeline.normaliser = min_max_normaliser
    preprocessing_pipeline.saver = saver

    preprocessing_pipeline.process(FILES_DIR)


def save_ndarray_as_npy_files(array, save_path, file_name):
    """
    save the passed ndarray as an npy file
    
    """
    save_path = os.path.join(save_path, file_name + '.npy')
    np.save(save_path, array)

def move_acoustic_wav_to_new_folder(input_folder,output_folder):
    
    """
    only ran once to move the acoustic files to another directory
    """

    # iterate over all files in the input folder
    for filename in os.listdir(input_folder):
      if filename.endswith(".wav") and filename.startswith("keyboard_acoustic"):
          # construct the full input and output file paths
          input_path = os.path.join(input_folder, filename)
          print(input_path)
          output_path = os.path.join(output_folder, filename)
          
          # move the file from the input folder to the output folder
          shutil.move(input_path, output_path)
          print(f"Processed file {filename}")

# Pre processing and storing data as .npy for use in other file



In [None]:
#move acoustic sound samples to a new folder to prepare for generating spectrograms if needed
move_acoustic = False
if move_acoustic:
  move_acoustic_wav_to_new_folder(input_folder = '/content/drive/MyDrive/output',output_folder = '/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic wav files')

do_preprocess = True
if do_preprocess:

    #generate spectrograms from the acoustic.wav files if needed
    generate_spectrograms = True
    if generate_spectrograms:
      n_fft = 2048
      HOP_LENGTH = 256
      DURATION = 3  # in seconds #only the first 3 seconds, skip the release part (last second)
      SAMPLE_RATE = 16000
      MONO = True

      #whole_spectrogram_save_dir = '/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic spectrograms'

      log_SPECTROGRAMS_SAVE_DIR = "/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic_log_spectrograms"
      PHASE_SPECTROGRAMS_SAVE_DIR = "/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic_phase_spectrograms"
      MIN_MAX_VALUES_SAVE_DIR = "/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic_log_MinMaxVals"
      FILES_DIR = "/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic wav files"

      generate_log_spectrogram_and_save(SAMPLE_RATE,DURATION,MONO,n_fft,HOP_LENGTH,log_SPECTROGRAMS_SAVE_DIR,PHASE_SPECTROGRAMS_SAVE_DIR,MIN_MAX_VALUES_SAVE_DIR)

    #Compile the spectrograms into a 3darray, keep all data
    compile_spectrograms_into_3darray = True
    if compile_spectrograms_into_3darray:
        x_train_log,log_file_names = load_and_sample_npy_files(folder_path = "/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic_log_spectrograms", sample_percentage=1)
        #get phase data based on the file names of x_train_log. ensures correct ordering
        x_train_phase,_ = load_and_sample_npy_files(folder_path ="/content/drive/MyDrive/Variational_Auto_Encoder/Acoustic_phase_spectrograms", sample_percentage=1, file_names_criteria = log_file_names)

        #get sine and cosine component and stack x_train
        x_train_sine = np.sin(x_train_phase)
        x_train_cos = np.cos(x_train_phase)

        x_train = np.concatenate((x_train_log, x_train_sine, x_train_cos), axis=3)


        #x_train is 256x251x1. Need it to be 256x256x1. Will pad with zeros. 
        x_train = np.pad(x_train, ((0, 0), (0, 0), (0, 5), (0, 0)), mode='constant')

    #Get the pitch and velocity values for each training data
    do_get_pitch_and_velocity = True
    if do_get_pitch_and_velocity:
        field_names = [file_name[:-8] for file_name in log_file_names]#the [:-4] removes the .npy extension
        pitch, velocity = get_pitch_and_velocity(json_path='/content/drive/MyDrive/Variational_Auto_Encoder/examples.json', file_names=field_names)
        label_train = np.vstack((pitch,velocity))

    #Save the training ndarray as .npy. Also save the pitch and velocity
    save_train_and_label_and_filenames = True
    if save_train_and_label_and_filenames:
        save_ndarray_as_npy_files(x_train, save_path='/content/drive/MyDrive/Variational_Auto_Encoder/training data', file_name='x_train_all_acoustic')
        save_ndarray_as_npy_files(label_train, save_path='/content/drive/MyDrive/Variational_Auto_Encoder/training data', file_name='pitch_and_velocity_train_all_acoustic')
        save_ndarray_as_npy_files(log_file_names, save_path='/content/drive/MyDrive/Variational_Auto_Encoder/training data', file_name='file_names_train_all_acoustic')

  0%|          | 15/8068 [01:01<9:14:06,  4.13s/it]


KeyboardInterrupt: ignored