#MusicAutoencoder

This is the complete package:
- read a song
- train the model
- compress it
- decompress it
- see a visual representation of the soundwave of the target and the original
- write the result of it to your hard drive

input song `input_song.flac`
output song `result_song.flac`

In [None]:
import tensorflow.keras as keras
import tensorflow.keras.losses as klosses
import tensorflow.keras.layers as layers
import tensorflow.keras.backend as k_backend
from tensorflow.keras.callbacks import TensorBoard
import soundfile as sf
import numpy as np
import numpy.random
import matplotlib.pyplot as plt
import random
import itertools
import time
import os

import tensorflow as tf
tf.get_logger().setLevel('ERROR')

numpy.random.seed(4)
random.seed(9)

In [2]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 16647775245130559814
, name: "/device:XLA_CPU:0"
device_type: "XLA_CPU"
memory_limit: 17179869184
locality {
}
incarnation: 15667858463253559945
physical_device_desc: "device: XLA_CPU device"
]


In [3]:
#load the data
song_data, samplerate = sf.read("input_song.flac")

song_segment = song_data[2**20: 2**21]
(song_data.shape, song_segment.shape)

((11648640, 2), (1048576, 2))

In [4]:
def normalize(data):
  """normalizes the data and returns the parameters necessary to reconstruct the original"""
  MIN = data.min()
  data = data - MIN
  DIV = data.max()
  data = data / DIV
  return (data, MIN, DIV)

def denormalize(data_tuple):
  """reverts the normalization"""
  (data, MIN, DIV) = data_tuple
  return data * DIV + MIN

def compression_exponent(factor):
  """returns the smalles n for 2^n > factor"""
  return np.int_(np.ceil(np.log2(np.float64(factor))))

def generate_data_for_training(song_data, compression_factor, window_width, stride, batch_size=8):
  """creates a generator, that returns batches of the training data
  
     returns the Generator and the number of steps per epoch
  """
  comp_fac = 2 ** compression_exponent(compression_factor)
  window_width += (comp_fac - (window_width % comp_fac)) % comp_fac
  segment_indices = [(i, i + window_width) for i in range(0, song_data.shape[0] - window_width + stride, stride)]
  def generator():
    while True:
      windows = map(lambda seg: song_data[seg[0] : seg[1]], segment_indices)
      windows = map(lambda segment: np.concatenate([segment, np.zeros((window_width - segment.shape[0], segment.shape[1]))]), windows)
      
      while True:
        targets = []
        for i, w in zip(range(batch_size), windows):
          targets.append(w.reshape((1, window_width, song_data.shape[1], 1)))
        if len(targets) > 0:
          targets = np.concatenate(targets)
          yield (targets, targets)
        else:
          break

  return (generator(), int(np.ceil(len(segment_indices) / batch_size))) 
  

def predict(data, model, compression_factor, overlap = 2**16, segment_size = 2**18):
  """predicts the data the model"""
  def transform_data_for_model(song_data):
    """transforms data to have a compatible size fopr the model"""
    comp_fac = 2 ** compression_exponent(compression_factor)
    padding_size = (comp_fac - (song_data.shape[0] % comp_fac)) % comp_fac
    nd = np.concatenate([song_data, np.zeros((padding_size, song_data.shape[1]))])
    nd = nd.reshape((1, nd.shape[0], nd.shape[1], 1))
    return (nd, padding_size)
  """reverts the transform_data_for_model function"""
  def transform_data_from_model(model_data, padding):
    data = model_data[0][:model_data.shape[1] - padding]
    return data.reshape((data.shape[0], data.shape[1]))

  data_segments = [data[max(0, i - overlap):min(i + overlap + segment_size, data.shape[0])] for i in range(0, data.shape[0], segment_size)]
  prepared_data = [transform_data_for_model(segment) for segment in data_segments]
  model_input = [d[0] for d in prepared_data]
  raw_prediction = [model.predict(i) for i in model_input]
  prediction = [transform_data_from_model(tup[0], tup[1][1]) for tup in zip(raw_prediction, prepared_data)]
  padding_free_prediction = [prediction[0][:segment_size]] + [pred[overlap : -overlap] for pred in prediction[1:-1]] + [prediction[-1][overlap:]]
  return np.concatenate(padding_free_prediction)

def evaluate(data, model, compression_factor, overlap = 2**16, segment_size = 2**18):
  """evaluates the model based on mse"""
  pred = predict(data, model, compression_factor, overlap, segment_size)
  return np.sum((pred - data) ** 2) / data.size

In [5]:
def create_encoder_decoder(data_shape, compression_factor, activation_en='elu', activation_de='elu', optimizer='adam', kernel_width_en=256, kernel_width_de=64, channel_count_en=16, channel_count_de=16, regularizer=None):
  """creates an encoder-decoder pair to be used in an autoencoder"""
  acfun_in = activation_en
  acfun_out = activation_de
  layer_count = compression_exponent(compression_factor)

  encoder = keras.Sequential(name = "encoder")
  encoder.add(layers.Conv2D(channel_count_en, (kernel_width_en, data_shape[1]), activation='linear', padding='same', kernel_regularizer=regularizer, input_shape=data_shape))
  for _ in range(layer_count // 2):
    encoder.add(layers.Conv2D(channel_count_en, (kernel_width_en, data_shape[1]), strides = (4, 1), activation=acfun_in, padding='same', kernel_regularizer=regularizer))
  if layer_count % 2 == 1:
    encoder.add(layers.Conv2D(channel_count_en, (kernel_width_en, data_shape[1]), strides = (8, 1), activation=acfun_in, padding='same', kernel_regularizer=regularizer))
  else:
    encoder.add(layers.Conv2D(channel_count_en, (kernel_width_en, data_shape[1]), strides = (4, 1), activation=acfun_in, padding='same', kernel_regularizer=regularizer))

  encoder.add(layers.Conv2D(4, (kernel_width_en, data_shape[1]), activation = 'linear', padding='same'))
  
  decoder = keras.Sequential(name = "decoder")
  if layer_count % 2 == 1:
    decoder.add(layers.Conv2DTranspose(channel_count_de, (kernel_width_de, data_shape[1]), strides=(8, 1), activation=acfun_out, padding='same'))
  else:
    decoder.add(layers.Conv2DTranspose(channel_count_de, (kernel_width_de, data_shape[1]), strides=(4, 1), activation=acfun_out, padding='same'))
  for _ in range(layer_count // 2):
    decoder.add(layers.Conv2DTranspose(channel_count_de, (kernel_width_de, data_shape[1]), strides=(4, 1), activation=acfun_out, padding='same'))
  decoder.add(layers.Conv2D(1, (kernel_width_de, data_shape[1]), activation='linear', padding='same'))

  return (encoder, decoder)

def create_model(data_shape, activation_en='elu', activation_de='elu', optimizer='adam', kernel_width_en=256, kernel_width_de=64, channel_count_en=16, channel_count_de=16, compression_factor=8, loss='mse', regularizer = None):
  """creates an autoencoder"""
  enc_dec = create_encoder_decoder(data_shape, compression_factor, activation_en, activation_de, optimizer, kernel_width_en, kernel_width_de, channel_count_en, channel_count_de)
  model = keras.Sequential(name = "autoencoder")
  model.add(enc_dec[0])
  model.add(enc_dec[1])
  model.compile(
          loss=loss,
          optimizer=optimizer,
          metrics=[keras.metrics.MeanSquaredError()])
  return model

In [6]:
#normalize the data
(normalized_data, MIN, DIV) = normalize(song_data) 
(normalized_mock_data, MIN_mock, DIV_mock) = normalize(song_segment)

In [7]:
normalized_data.shape

(11648640, 2)

In [8]:
compression_factor = 4

#create all combination of different hyperparameters on would want to test

activation_ens = ['selu', 'tanh', 'softplus']
activation_des = activation_ens
channel_size_ens = [16, 32]
channel_size_des = channel_size_ens
kernel_size_ens = [32, 64]
kernel_size_des = kernel_size_ens
optimizers = ['adam']
losses = ['mae', 'msle']


parameters = list(itertools.product(activation_ens, activation_des, channel_size_ens, channel_size_des, kernel_size_ens, kernel_size_des, optimizers, losses))
len(parameters)

72

In [11]:
def test_model(params, data, compression_factor):
  """trains and evaluates a combination of hyperparameters"""
  (activation_en,
   activation_de,
   channel_size_en,
   channel_size_de,
   kernel_size_en,
   kernel_size_de,
   optimizer,
   loss) = params

  model = create_model((None, data.shape[1], 1),
                            activation_en = activation_en,
                            activation_de = activation_de,
                            optimizer = optimizer,
                            kernel_width_en = kernel_size_en,
                            kernel_width_de = kernel_size_de,
                            channel_count_en = channel_size_en,
                            channel_count_de = channel_size_de,
                            compression_factor = compression_factor,
                            loss = loss)
  
  data_generator, steps_per_epoch = generate_data_for_training(data, compression_factor, 2 ** 16, 2 ** 14, 4)
  history = model.fit(data_generator, steps_per_epoch = steps_per_epoch, epochs = 16, verbose = 0)

  return (model, history)
#iterate through all combinations of hyperparameters  
results = []
for i, params in enumerate(parameters):
  start = time.time()
  model, history = test_model(params, normalized_mock_data, compression_factor)
  loss = evaluate(normalized_mock_data, model, compression_factor)
  results.append((history, params, loss))
  print(i, "\tneeded", time.time() - start, "seconds\t", params, "\tloss:", loss)

Train for 16 steps
Epoch 1/16
Epoch 2/16
Epoch 3/16
Epoch 4/16
Epoch 5/16
Epoch 6/16
Epoch 7/16
Epoch 8/16
Epoch 9/16
Epoch 10/16
Epoch 11/16
Epoch 12/16
Epoch 13/16
Epoch 14/16
Epoch 15/16
Epoch 16/16
0 	needed 767.7499876022339 seconds	 ('selu', 'selu', 16, 16, 32, 32, 'adam', 'mae') 	loss: 0.0038651970906773843


In [12]:
#sort the hyperparameters by their loss
results = sorted(results, key=lambda x: x[2])
for res in results:
  print(res)

(<tensorflow.python.keras.callbacks.History object at 0x7f7720412610>, ('selu', 'selu', 16, 16, 32, 32, 'adam', 'mae'), 0.0038651970906773843)


In [13]:
#create a model with the best found hyperparameter combination
(activation_en,
 activation_de,
 channel_size_en,
 channel_size_de,
 kernel_size_en,
 kernel_size_de,
 optimizer,
 loss) = results[0][1]

model = create_model((None, normalized_mock_data.shape[1], 1),
                            activation_en = activation_en,
                            activation_de = activation_de,
                            optimizer = optimizer,
                            kernel_width_en = kernel_size_en,
                            kernel_width_de = kernel_size_de,
                            channel_count_en = channel_size_en,
                            channel_count_de = channel_size_de,
                            compression_factor = compression_factor,
                            loss = loss)
model.summary()

Model: "autoencoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
encoder (Sequential)         (None, None, 2, 4)        37940     
_________________________________________________________________
decoder (Sequential)         (None, None, 2, 1)        21537     
Total params: 59,477
Trainable params: 59,477
Non-trainable params: 0
_________________________________________________________________


In [None]:
#train the model
# first on smaller windows, then on larger ones for speed
data_generator_small, steps_per_epoch_small = generate_data_for_training(normalized_data, compression_factor, 2 ** 18, 2 ** 16, 4)
data_generator_big, steps_per_epoch_big = generate_data_for_training(normalized_data, compression_factor, 2 ** 20, 2 ** 18, 4)
history_small = model.fit_generator(data_generator_small, steps_per_epoch = steps_per_epoch_small, epochs = 16)
history_big = model.fit_generator(data_generator_big, steps_per_epoch = steps_per_epoch_big, epochs = 48)

Train for 44 steps
Epoch 1/16
Epoch 2/16
Epoch 3/16
Epoch 4/16
Epoch 5/16
Epoch 6/16
Epoch 7/16
Epoch 8/16
Epoch 9/16
Epoch 10/16
Epoch 11/16
Epoch 12/16
Epoch 13/16
Epoch 14/16
Epoch 15/16
Epoch 16/16
Train for 11 steps
Epoch 1/48


In [None]:
plt.plot(history.history['loss'])

In [None]:
#transform the predicted data back into its original featurespace
result = denormalize((predict(normalized_mock_data, model, compression_factor), MIN_mock, DIV_mock))

In [None]:
abs_diff = np.sum(np.abs(result - normalized_mock_data))
rel_diff = abs_diff / result.size
(abs_diff, rel_diff)

In [None]:
#plot an extract of the sound file
plt.figure(figsize=(160, 9))
offset = 100_000
plt.plot(result[offset:offset + 2**12, :1], label='Prediction')
plt.plot(song_segment[offset:offset + 2**12, :1], label='Target')

In [1]:
sf.write("result_song.flac", result, samplerate)

NameError: name 'sf' is not defined