In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
from tensorflow.keras.layers import Input, Dense,Conv1D, MaxPooling1D, UpSampling1D, Flatten, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.python.ops import math_ops
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tqdm import tqdm
from scipy.io import wavfile
# from google.colab import drive
from tensorflow.keras import regularizers
# from google.colab import drive

In [None]:
def cut(arr, length):
  idx = len(arr)%length
  out = []
  while(idx+length <= len(arr)):
    out.append(arr[idx:idx+length])
    idx += length
  return np.array(out)

def loadSong(fName):
  fs, data = wavfile.read(inpathTrain + fName)
  if data.ndim > 1:
    mono_data = np.mean(data, axis=1)
  else:
    mono_data = data

  return mono_data.astype('int16')

def loadSongCut(fName):
  data = loadSong(fName)
  data = cut(data, snippitLength)
  scaler[fName] = MinMaxScaler()
  #data = quadratic_scaler(data, 5)
  data = scaler[fName].fit_transform(data)

  return train_test_split(data, test_size=0.3, random_state=42)

def digital_decibel(x):
  if (x>0):
    decibels = 1/ (10 * np.log10(x/1))
  else:
    decibels = 1 / (-1*  10 * np.log10(-x/1))
  return decibels

def quadratic_scaler(x, n):
  v = [i**n for i in x]
  return v

In [None]:
def snipLoss(y_true, y_pred):
  snipWeight = tf.convert_to_tensor([int(np.cosh(x)) for x in range(-5, 5, snippitLength)], dtype='float32')

  loss = math_ops.squared_difference(y_true,y_pred)
  loss = math_ops.Mul(x = loss,y = snipWeight)
  loss = math_ops.log1p(loss)
  return loss

def si_snr2(y_true, y_pred):
  # Remove extra dimensions
  y_true = tf.squeeze(y_true)
  y_pred = tf.squeeze(y_pred)

  # Compute the scaling factor
  scale = tf.reduce_sum(y_true * y_pred) / tf.reduce_sum(y_pred * y_pred)

  # Compute the estimated source and the target source
  est_source = scale * y_pred
  target_source = y_true

  # Compute the noise source
  noise_source = est_source - target_source

  # Compute the SI-SNR
  numerator = tf.reduce_sum(target_source * est_source, axis=-1)
  denominator = tf.reduce_sum(noise_source * noise_source, axis=-1)
  si_snr = 10 * tf.math.log(numerator / denominator + 1e-8) / tf.math.log(10.0)

  # Return the average SI-SNR
  return tf.reduce_mean(si_snr)

def si_snr(target, estimate):
  # target and estimate are tensors of shape (batch_size, time_steps)
  # compute the dot product of target and estimate along the time axis
  dot = tf.reduce_sum(target * estimate, axis=-1, keepdims=True)
  # compute the energy of target along the time axis
  energy = tf.reduce_sum(target ** 2, axis=-1, keepdims=True)
  # compute the scaled target
  scaled_target = dot * target / energy
  # compute the noise
  noise = estimate - scaled_target
  # compute the SI-SNR in decibels
  si_snr = 10 * tf.math.log(tf.reduce_sum(scaled_target ** 2, axis=-1) / tf.reduce_sum(noise ** 2, axis=-1)) / tf.math.log(10.0)
  # return the SI-SNR tensor of shape (batch_size,)
  return si_snr

In [None]:
# paths
# drive.mount('/content/drive')
# inpathTrain = "/content/drive/MyDrive/Machine Learning/Autoencoder/train_data/"
# inpathOut = "/content/drive/MyDrive/Machine Learning/Autoencoder/output/"
inpathTrain = "/mnt/e/data/SynologyDrive/Uni/mSem02/Machine Learning/Project_Audio_Autoencoder/musicnet_midis/BOT/Mixdown/output/"
inpathOut =   "/home/martin/martin_user_data/jupyter_notebooks/autoencoder_ml/output/"
fileNames = os.listdir(inpathTrain)
scaler = {}

# global variables
samplerate = 44_100
snippitLength = 64

loss_fct = snipLoss

total_num_songs = len(fileNames)
TrainSongs = 0.25
numTrainSongs = int(total_num_songs*0.7*TrainSongs)
numTrainSongs = 20
numTestSongs = int(total_num_songs*0.3*TrainSongs)
numTestSongs = 3
numHyperTrainSongs = 5


# Hyperparameters
compression_ratio = 0.5
latentSize = int(compression_ratio*snippitLength)
# latentSize = 50
numDense = 3
numConvLayer = 2
numConv = 8


output_wav_name = f'snln={snippitLength}_cmpr={compression_ratio}_loss={loss_fct}_songs={numTrainSongs}'

In [None]:
####################################
#####  plot history

def plot_loss(ax, network_history):
    loss = np.concatenate([network_history[key].history['loss'] for key in network_history.keys()])
    val_loss = np.concatenate([network_history[key].history['val_loss'] for key in network_history.keys()])

    ax.set_xlabel('Epochs')
    ax.set_ylabel('Loss')
    ax.set_title('Loss')
    ax.plot(loss, label='Training')
    ax.plot(val_loss, label='Validation')
    ax.legend()

def plot_si_snr(ax, network_history):
    si_snr = np.concatenate([network_history[key].history['si_snr'] for key in network_history.keys()])
    val_si_snr = np.concatenate([network_history[key].history['val_si_snr'] for key in network_history.keys()])

    ax.set_xlabel('Epochs')
    ax.set_ylabel('SI_SNR')
    ax.set_title('SI-SNR')
    ax.plot(si_snr, label='Training')
    ax.plot(val_si_snr, label='Validation')
    ax.legend()

def plot_history(network_history, name):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6), sharex=True, sharey=False)

    plot_loss(ax[0], network_history)
    plot_si_snr(ax[1], network_history)

    plt.tight_layout()
    plt.savefig(name)
    # plt.show()
    plt.clf()


In [None]:
def buildModel(compression_ratio = 0.5, numDense = 3, numConv = 8,numConvLayer = 2, loss_fct = snipLoss):

  latentSize = int(compression_ratio*snippitLength)
  physical_devices = tf.config.experimental.list_physical_devices('GPU')
  for i in physical_devices:
      tf.config.experimental.set_memory_growth(i, True)
  tf.device('/device:GPU:0')

  input = Input(shape=(snippitLength,1))
  x = input

  # Convolutional part of encoder
  for i in range(numConvLayer):
    x = Conv1D(numConv, 5, activation='relu', padding='same')(x)
    x = MaxPooling1D(2, padding = 'same')(x)

  convShape = x.shape
  # calculate flatten dimension
  flsize = 1
  for i in x.shape:
    if(i != None):
      flsize*= i

  print(f'{latentSize} {flsize}')

  x = Flatten()(x)
  convShape2 = x.shape


  # Dense part of encoder
  denses = [int(i) for i in np.linspace(flsize, latentSize, numDense)]
  for i in denses[1:]:
    x = Dense(i, activation='relu')(x)

  encoded = x


  # Dense part of decoder
  x = encoded
  for i in denses[::-1][1:]:
    if(numConvLayer == 0 and i == snippitLength):
      x = Dense(i, activation='sigmoid')(x)
    else:
      x = Dense(i, activation='relu')(x)

  if(numConvLayer == 0):
    decoded = x

  x = Reshape(convShape[1:])(x)

  # Convolutional part of decoder
  for i in range(numConvLayer):
    x = Conv1D(numConv,5, activation='relu', padding='same')(x)
    x = UpSampling1D(2)(x)
  if(numConvLayer != 0):
    decoded = Conv1D(1,5, activation='sigmoid', padding='same')(x)

  autoencoder = Model(input, decoded)
  autoencoder = Model(input, Flatten()(decoded))

  autoencoder.compile(optimizer='adam', loss=loss_fct, metrics=[si_snr])

  autoencoder.summary()
  return autoencoder

In [None]:
param_space = {'compression_ratio' : np.linspace(0.1,0.9,5),
               'numDense' : [int(i) for i in np.linspace(2, 4, 3)],
               'numConv' : [8,16],
               'numConvLayer' : np.linspace(0,2, 3, dtype = int)}

# param_space = {'compression_ratio' : np.linspace(0.1,0.9,5),
#                'numDense' : [2],
#                'numConv' : [8],
#                'numConvLayer' : [0, 1]}



import itertools
value_combis = itertools.product(*[v for v in param_space.values()])
param_combis = []
for combi in value_combis:
  param_combis.append({key: value for key, value in zip(param_space.keys(), combi)})
len(param_combis)

In [None]:
#Hyperparameter grid search
search_results = []


for hyperParamSet in tqdm(param_combis):
  autoencoder = buildModel(hyperParamSet['compression_ratio'],
                           hyperParamSet['numDense'],
                           hyperParamSet['numConv'],
                           hyperParamSet['numConvLayer'])

  autoencoder = buildModel()
  histories = {}
  for filename_train in tqdm(fileNames[:numHyperTrainSongs]):
    Xt, Xv = loadSongCut(filename_train)
    histories[filename_train] = autoencoder.fit(Xt, Xt,
                epochs=5,
                batch_size=2**10,
                shuffle=True,
                validation_data=(Xv, Xv))

  plot_history(histories, f'HyperParOpt, compression_ratio ={hyperParamSet["compression_ratio"]:.1f}, numDense ={hyperParamSet["numDense"]}, numConv = {hyperParamSet["numConv"]}, numConvLayer = {hyperParamSet["numConvLayer"]}.pdf')

  loss = []
  val_loss = []
  for key in histories.keys():
    loss.append(histories[key].history['loss'])
    val_loss.append(histories[key].history['val_loss'])

  loss = np.concatenate(loss)
  val_loss = np.concatenate(val_loss)


  train_si_snr = []
  val_si_snr = []
  for key in histories.keys():
    train_si_snr.append(histories[key].history['si_snr'])
    val_si_snr.append(histories[key].history['val_si_snr'])

  train_si_snr = np.concatenate(train_si_snr)
  val_si_snr = np.concatenate(val_si_snr)

  best_val_epoch    = np.argmax(val_si_snr)
  best_val_si_snr      = np.max(val_si_snr)
  best_val_loss     = np.min(val_loss)

  best_train_si_snr      = np.max(train_si_snr)
  best_train_loss     = np.min(loss)

  search_results.append({
        **hyperParamSet,
        'best_val_epoch': best_val_epoch,
        'best_val_si_snr': best_val_si_snr,
        'best_val_loss': best_val_loss,
        'best_train_si_snr': best_train_si_snr,
        'best_train_loss': best_train_loss
    })



In [None]:
import json

results = [{k: int(v) if isinstance(v, np.int64) else v for k, v in d.items()} for d in search_results]

with open('searchResults.json', 'w') as file:
    json.dump(results, file, indent = '')

In [None]:
####################################
#####  evaluate test songs

# numTestSongs = 2

def evaluateTestSongs(autoencoder):
  index = 0
  test_evaluated = []
  for songname in tqdm(reversed(fileNames[-numTestSongs:])):
      # songname = '1727_schubert_op114_2.wav'
      print(songname)
      song = loadSong(songname)
      # wavfile.write(inpathOut + songname + '.wav', samplerate, song)
      songSnip = cut(song, snippitLength)

      songSnip_transformed = MinMaxScaler().fit_transform(songSnip)
      test_loss, test_si_snr = autoencoder.evaluate(songSnip_transformed, songSnip_transformed, verbose=2)

      test_evaluated.append([songname, test_loss, test_si_snr])


  return test_evaluated

# test_loss, test_si_snr = autoencoder.evaluate(songSnips_transformed, songSnips_transformed, verbose=2)
# print(test_evaluated)

In [None]:
####################################
#####  predict unknown song

def plotWave(autoencoder, name, compression_ratio):
  exampleSong = fileNames[-1]
  # exampleSong = '1727_schubert_op114_2.wav'
  orig = loadSong(exampleSong)
  origSnip = cut(orig, snippitLength)
  orig = np.concatenate(origSnip)

  if(exampleSong in scaler.keys()):
    scaler_Example = scaler[exampleSong]
    origSnip_transformed = scaler_Example.transform(origSnip)
  else:
    scaler_Example = MinMaxScaler()
    origSnip_transformed = scaler_Example.fit_transform(origSnip)

  # autoencode song
  a = autoencoder.predict(origSnip_transformed)
  a = a.reshape(-1, snippitLength)
  XpredSnip = scaler_Example.inverse_transform(a)

  silence = np.zeros((1,snippitLength), dtype = 'int16')
  a = scaler_Example.transform(silence)
  a = autoencoder.predict(a)
  a = a.reshape(-1, snippitLength)
  Xsilence = scaler_Example.inverse_transform(a)[0]

  # remove noise generated by silence
  XpredSnip = [i-Xsilence for i in XpredSnip]
  Xpred = np.concatenate(XpredSnip).astype('int16')

  test_loss, test_si_snr = autoencoder.evaluate(origSnip_transformed, origSnip_transformed)

  output_wav_name = f'snln={snippitLength}_cmpr={compression_ratio:.1f}_loss={loss_fct}_songs={numTrainSongs}_SNR={test_si_snr:.1f}.wav'
  wavfile.write(inpathOut + output_wav_name, samplerate, Xpred)
  print(f"file saved: {output_wav_name}")

  plt.plot(orig, linewidth = 0.1)
  plt.plot(orig-Xpred, linewidth = 0.1)
  plt.savefig(name + "whole.pdf")
  plt.clf()
  ####################################
  #####  see difference in waveform detailed
  nrows = 2
  ncols = 6
  snips = [0, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000]

  fig, ax = plt.subplots(nrows, ncols, figsize=(6*ncols, 6*nrows), sharey = True, sharex = True)
  s = 0
  for i in range(nrows):
    for j in range(ncols):
      ax[i][j].plot(origSnip[snips[s]], linewidth = 0.5, c = 'b')
      ax[i][j].plot(XpredSnip[snips[s]], linewidth = 0.5, c = 'r')
      s +=1
  plt.savefig(name + "snip.pdf")
  plt.clf()



In [None]:
param_space = {'compression_ratio' : np.linspace(0.1,0.9,5),
               'numDense' : [2],
               'numConv' : [8],
               'numConvLayer' : [0, 1]}

In [None]:
# Find best topology across different compression ratios and build/train 10 models across compression ratios with that topology
import csv
parSet_sum = {}

for item in search_results:
    numDense = item['numDense']
    numConv = item['numConv']
    numConvLayer = item['numConvLayer']
    best_val_si_snr = item['best_val_si_snr']

    key = (numDense, numConv, numConvLayer)
    if key in parSet_sum:
       parSet_sum[key] += best_val_si_snr
    else:
        parSet_sum[key] = best_val_si_snr

keys = [k for k in parSet_sum.keys()]
si_snr_sum = [parSet_sum[k] for k in keys]
bestParSet = keys[np.argmax(si_snr_sum)]


compression_rates = np.linspace(0.1,0.9,9)
for c in compression_rates:
  autoencoder = buildModel(c, bestParSet[0],bestParSet[1],bestParSet[2])

  histories = {}
  numTrainSongs = 20
  for filename_train in tqdm(fileNames[:numTrainSongs]):
    Xt, Xv = loadSongCut(filename_train)
    histories[filename_train] = autoencoder.fit(Xt, Xt,
                epochs=5,
                batch_size=2**10,
                shuffle=True,
                validation_data=(Xv, Xv))
  plot_history(histories, f'BestSet, compression_ratio ={c:.1f}.pdf')

  autoencoder.save(outputpdf + 'modelCompressionRate:{c:.1f}.keras')
  testPerformance = evaluateTestSongs(autoencoder)
  with open(f'modelCompressionRate:{c:.1f}Performance.csv', 'w', newline='') as file:
    # Create a CSV writer
    writer = csv.writer(file)

    # Write the data row by row
    for row in testPerformance:
        writer.writerow(row)
  plotWave(autoencoder, f'modelCompressionRate:{c:.1f}wave_', c)




In [None]:
# #numTrainSongs =
# batch_size = 2**10
# epochs = 5
# histories = {}
# for fn in tqdm(fileNames[:numTrainSongs]):
#   Xt, Xv = loadSongCut(fn)
#   Xt = np.array(Xt)
#   Xv = np.array(Xv)
#   histories[fn] = autoencoder.fit(Xt, Xt,
#                 epochs=epochs,
#                 batch_size=batch_size,
#                 shuffle=True,
#                 validation_data=(Xv, Xv))

In [None]:
# plot_history(histories, 'test')