In [8]:
!pip install wave
!pip install pydub
!pip install scipy
import tensorflow as tf
from tensorflow import keras
from keras.utils import plot_model
import time
import numpy as np
import wave
import pydub
from pydub import AudioSegment
from pydub.utils import make_chunks
from pydub.playback import play
import os
from os import path
from scipy.io.wavfile import read
from scipy.io.wavfile import write

Collecting wave
  Downloading https://files.pythonhosted.org/packages/df/33/5a06e0c47a147b2683876ba7c576fad13e92b0b16755eb431e56c341e0cf/Wave-0.0.2.tar.gz
Building wheels for collected packages: wave
  Building wheel for wave (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/8c/2e/ad/d96151afb1fdccf126346b26eabb91fec3c5ce5cbee7287fbf
Successfully built wave
Installing collected packages: wave
Successfully installed wave-0.0.2
Collecting pydub
  Downloading https://files.pythonhosted.org/packages/79/db/eaf620b73a1eec3c8c6f8f5b0b236a50f9da88ad57802154b7ba7664d0b8/pydub-0.23.1-py2.py3-none-any.whl
Installing collected packages: pydub
Successfully installed pydub-0.23.1


Using TensorFlow backend.


In [0]:
# load in data
def loadSongs():
  
  pathContent = os.listdir()
  mp3List = []
  wavList = []
  
  for s in pathContent:
    if(s.find('.mp3') >= 0):
      mp3List.append(s)
    if(s.find('.wav') >= 0):
      wavList.append(s)
  
  fullSongs = []
  
  for path in mp3List:
    dest = path.replace('.mp3', '.wav')
    sound = AudioSegment.from_mp3(path)
    
    # conver file to mono
    sound = sound.set_channels(1)
    sound.export(dest, format = 'wav')
    
    fullSongs.append(dest)
    
  for path in wavList:
    fullSongs.append(path)
  
  return fullSongs

In [0]:
# stride = 1 for stateless, stride = lookback for stateful
def createDataset(songList, lookback, predict, stride, validationSplit):
  
  s = 'seg'
  e = '.wav'
  
  x_train = []
  y_train = []
  
  x_test = []
  y_test = []
  
  totalSegs = 0
  validationSplit = validationSplit * 100
  
  for path in songList:
    
    wav = read(path)
    arr = wav[1]
    
    #wav = AudioSegment.from_wav(path)
    #arr = wav.get_array_of_samples()
    #arr = np.array(arr)
    
    # TODO lookback + 1 or not?
    for i in range(lookback, len(arr) - predict, stride):
      hist = arr[i - lookback : i]
      future = arr[i : i + predict]
      if(totalSegs % validationSplit == 0):
        x_test.append(hist)
        y_test.append(future)
      else:
        x_train.append(hist)
        y_train.append(future)
      
      totalSegs = totalSegs + 1
      
      
  print('Dataset created')
  
  return x_train, y_train, x_test, y_test

In [0]:
def createEncoderDataset(songList, size, stride):
  
  splices = []
  
  for path in songList:
    
    wav = read(path)
    arr = wav[1]
    
    for i in range(size, len(arr), stride):
      splice = arr[i - size : i]
      splices.append(splice)
      
  return splices

In [0]:
def lstmAutoencoder(inputSize, outputSize):
  
  model = keras.models.Sequential()
  
  # encoder
  
  model.add(keras.layers.Conv1D(32, 3, activation = 'relu', padding = 'same', input_shape = (inputSize, 1)))
  model.add(keras.layers.Conv1D(32, 3, activation = 'relu', padding = 'same'))
  model.add(keras.layers.MaxPooling1D(pool_size = 2))
  model.add(keras.layers.Conv1D(64, 3, activation = 'relu', padding = 'same'))
  model.add(keras.layers.Conv1D(64, 3, activation = 'relu', padding = 'same'))
  model.add(keras.layers.MaxPooling1D(pool_size = 2))
  model.add(keras.layers.Conv1D(128, 3, activation = 'relu', padding = 'same'))
  model.add(keras.layers.Conv1D(128, 3, activation = 'relu', padding = 'same'))
  model.compile(optimizer = 'adam', loss = 'mse', metrics = ['accuracy'])
  plot_model(model, show_shapes = True, to_file='autoencoder.png')
  
  return model

In [0]:
def lstmEncoder(inputSize, outputSize):
  model = keras.models.Sequential()
  model.add(keras.layers.CuDNNLSTM(64, return_sequences = True, input_shape = (inputSize, 1)))
  model.add(keras.layers.TimeDistributed(keras.layers.Dense(outputSize)))
  return model

In [0]:
def lstmDecoder(inputSize, outputSize):
  model = keras.models.Sequential()
  model.add(keras.layers.Dense(outputSize))

In [0]:
def lstmGen(lookback, predict):
  model = keras.models.Sequential()
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = True, input_shape = (lookback, predict)))
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = False))
  model.add(keras.layers.Dense(predict, activation = 'tanh'))
  model.compile(loss = 'mse', optimizer = 'adam', metrics = ['accuracy'])
  plot_model(model, show_shapes = True, to_file='generator.png')
  return model

In [0]:
def lstmDisc(lookback, predict):
  model = keras.models.Sequential()
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = True, input_shape = (lookback, predict)))
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = False))
  model.add(keras.layers.Dense(1))
  model.compile(loss = 'binary_crossentropy', optimizer = 'adam', metrics = ['accuracy'])
  plot_model(model, show_shapes = True, to_file='discriminator.png')
  return model

In [0]:
def lstm(lookback, predict):
  model = keras.models.Sequential()
  model.add(keras.layers.Conv1D(128, 3, input_shape = [lookback, predict]))
  model.add(keras.layers.MaxPooling1D())
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = True))
  model.add(keras.layers.Conv1D(64, 5))
  model.add(keras.layers.MaxPooling1D())
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = True))
  model.add(keras.layers.Conv1D(32, 5))
  model.add(keras.layers.MaxPooling1D())
  model.add(keras.layers.CuDNNLSTM(128, return_sequences = False))
  model.add(keras.layers.Dense(predict, 'tanh'))
  model.summary()
  model.compile(loss = 'mean_squared_error', optimizer = 'adam', metrics = ['accuracy'])
  plot_model(model, show_shapes = True, to_file='lstm.png')
  return model

In [0]:
def trainGan(epochs):
  for epoch in range(epochs):
    
    # first, generate images
    idx = np.random.randint(0, rpn_x_train.shape[0], batch_size)
    true = np.array(rpn_x_train[idx])
    print(true.shape)
    true = np.expand_dims(true, axis = 3)
    noise = np.random.normal(0, 1, (batch_size, lookback, predict))
    fake = genSequences(gen, batch_size, lookback, noise, printDiag = False)
    
    fake = np.array(fake)
    print(fake.shape)
    
    validLabel = np.ones((batch_size, 1))
    fakeLabel = np.zeros((batch_size, 1))
    
    #print(validLabel.shape)
    #print(fakeLabel.shape)
    #print(true.shape)
    #print(fake.shape)
    
    # train discriminator
    
    d_loss_real = disc.train_on_batch(true, validLabel)
    d_loss_fake = disc.train_on_batch(fake, fakeLabel)
    d_loss = .5 * (np.add(d_loss_real, d_loss_fake))
    
    # train generator
    noise_tr = np.random.normal(0, 1, (batch_size, lookback))
    noise_tr = np.expand_dims(noise_tr, axis = 3)
    
    disc.trainable = False
    g_loss = gen.train_on_batch(noise_tr, validLabel)
    
    disc.trainable = True

In [0]:
def trainAutoencoder(model, x_train, epochs, validationSplit):
  model.fit(x_train, x_train, epochs = epochs, validation_split = validationSplit)

In [0]:
def genSequences(gen, numOfSequences, size, noise, printDiag = False):
  
  sequences = []
  
  printIndex = size / 10
  
  for i in range(numOfSequences):
    
    currSequence = []
    currNoise = np.array(noise)
    
    if(printDiag):
      print("Generating Sequence ", i)
      start = time.time()
    
    for j in range(size):
    
      prediction = gen.predict(np.array(currNoise))
      
      currSequence.append(prediction[0])
      currNoise = np.roll(currNoise, -1)
      currNoise[currNoise.shape[0] - 1] = prediction[0]
      
      if(j % printIndex == 0 and printDiag):
        diff = time.time() - start
        start = time.time()
        print(j, " of ", size, " in ", diff)
    
    sequences.append(currSequence)
    
  return sequences

In [0]:
def arrToWav(dir, samples):
  
  samples = np.array(samples)
  
  write(dir, 44100, samples)

In [0]:
def saveModel(model, name):
  
  model_json = model.to_json()
  jsonName = name + '.json'
  h5Name = name + '.h5'
  with open(jsonName, "w") as json_file:
    json_file.write(model_json)
    
  # serialize weights to HDF5
  model.save_weights(h5Name)
  print("Saved model to disk")

In [0]:
#segNumber = loadSongs()
#print(segNumber)

#lookback = 44100
lookback = 10
predict = 1
validationSplit = .2
batch_size = 3
normalizer = 32768
spliceSize = lookback * 2

songList = loadSongs()
x_train, y_train, x_test, y_test = createDataset(songList, lookback, predict, lookback, validationSplit)
encoderSplices = createEncoderDataset(songList, spliceSize, spliceSize // 2)

rpn_x_train = np.array(x_train)
rpn_y_train = np.array(y_train)

rpn_x_test = np.array(x_test)
rpn_y_test = np.array(y_test)

rpn_splices = np.array(encoderSplices)

#normalize values; wav is 16 bit signed, so values are between + and - 2^15, so divide by 2^15
rpn_x_train = rpn_x_train / normalizer
rpn_y_train = rpn_y_train / normalizer

rpn_x_test = rpn_x_test / normalizer
rpn_y_test = rpn_y_test / normalizer

rpn_splices = rpn_splices / normalizer

print(rpn_x_train.shape)
print(rpn_y_train.shape)

print(rpn_x_test.shape)
print(rpn_y_test.shape)

print(rpn_splices.shape)

Dataset created
(8378, 10)
(8378, 1)
(441, 10)
(441, 1)
(8818, 20)


In [0]:
gen = lstmGen(lookback, predict)
disc = lstmDisc(lookback, predict)
trainGan(10)

In [0]:
saveModel(gen, 'gen')
saveModel(disc, 'disc')

In [0]:
noise = np.random.rand(lookback, 1)
testSeq = genSequences(gen, 1, 44100, np.expand_dims(noise, axis = 3), printDiag = True)

In [0]:
print(rpn_x_train.shape)
print(rpn_y_train.shape)

x_sub = np.expand_dims(rpn_x_train, axis = 3)

model = lstmGen(lookback, predict)
model.fit(x_sub, rpn_y_train, epochs = 100, validation_split = 0.2)

In [0]:
numOfSequences = 1;
size = 4410

model.summary()
noise = rpn_x_train[2]
noise = np.expand_dims(noise, axis = 1)
noise = np.expand_dims(noise, axis = 0)
print(noise.shape)
gens = genSequences(model, numOfSequences, size, noise, printDiag = True)
#gens = gens * 32768
#longSeq = genSequences(model, numOfSequences, size * 6, noise)

In [0]:
saveModel(model, 'lstm')

Saved model to disk


In [0]:
squeezed = np.squeeze(gens)
arrToWav('test2.wav', squeezed)

In [9]:
ae = lstmAutoencoder(1000, 500)
#trainAutoencoder(ae, np.expand_dims(rpn_x_train, axis = 3), 10, .2)

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use tf.cast instead.
