## Preprocessing

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

[name: "/cpu:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 6826729921972381734, name: "/gpu:0"
 device_type: "GPU"
 memory_limit: 11323454260
 locality {
   bus_id: 1
 }
 incarnation: 2787711032108987066
 physical_device_desc: "device: 0, name: Tesla K40c, pci bus id: 0000:08:00.0"]

In [2]:
import numpy as np

In [3]:
data_dir = "data"
dataf_name = "data.csv"

log_dir = "logs"
result_dir = "results"

### Identify Audio Files

Obtain audio file names and tempos from a csv.

Audio files and the csv file are expected to be in the directory `data/`,  
and the csv file is expected to have the names of the audio file in the first column,  
and the tempos corresponding audio files in the second column,  
with no header.

In [4]:
# prepend directory to dataf_name if not present
dataf_name = data_dir + '/' + dataf_name if dataf_name.split('/')[0] != data_dir else dataf_name

audiofs = np.genfromtxt(dataf_name, delimiter=',', dtype='unicode')
# prepend directory to track names if not present
audiof_names = [data_dir + '/' + audiof_name if audiof_name.split('/')[0] != data_dir
                else audiof_name
                for audiof_name in audiofs[:, 0]]
# append file extension (.wav) to track names if not present
audiof_names = [audiof_name + ".wav" if audiof_name.split('.')[-1] != "wav"
                else audiof_name
                for audiof_name in audiof_names]
audiofs = np.stack((audiof_names, audiofs[:,1]), axis=1)

### Import Audio

In [5]:
from scipy.io import wavfile
from scipy.interpolate import interp1d
from scipy.signal import stft, istft

In [6]:
%%capture --no-stderr import_audio
samp_rate = 44100 # treat processed data as if sampled at 44.1 kHz
mid_tempo = int(np.median(audiofs[:,1].astype(int))) # median tempo of dataset
samp_per_beat = int(60*samp_rate/mid_tempo) # (60sec/min)*(Hz/bpm) = cycle/beat

clips = []
for [audiof_name, tempo] in audiofs:
    rate, audiof = wavfile.read(audiof_name)
    
    # convert stereo to mono
    if audiof.shape[1] == 2:
        # type conversions to minimize risk of overflow
        audiof = (audiof.astype(int).sum(axis=1) // 2).astype(int)
        
    tempo = int(tempo) # cast from string
    # normalize tempo to the median of dataset
    if tempo != mid_tempo:
        audio_length = audiof.shape[0] / rate
        intervals_old = np.linspace(0, audio_length, audiof.shape[0])
        intervals_new = np.linspace(0, audio_length, int(audiof.shape[0]*tempo/mid_tempo))
        # construct interpolation and resample
        audiof = (interp1d(intervals_old, audiof)(intervals_new)).astype(int)
        
    # produce up to 5 different random 16-beat clips from audiof
    num_clips = 5
    len_clip = 16*samp_per_beat
    max_num_clips = int(audiof.shape[0]/len_clip)
    # array containing starting indices of clips
    pos_clips = (np.random.choice(max_num_clips,
                                  size=min(max_num_clips, num_clips),
                                  replace=False)
                 *len_clip).astype(int)
    for index_clip, pos_clip in enumerate(pos_clips):
        clip = audiof[pos_clip:pos_clip+len_clip]
        # write clip for potential debugging
        wavfile.write("clips/{}{}.wav".format(audiof_name.split('/')[-1].split('.')[0], index_clip), samp_rate, clip)
        # apply short-time Fourier transform to clip's PCM
        transformed = stft(clip)[2].T
        # separate real and imag components and concatenate
        transformed = np.concatenate((transformed.real, transformed.imag), axis=1)
        clips.append(transformed)
    print("Imported " + audiof_name)

# convert list to numpy array and shuffle order of clips
clips = np.array(clips)
np.random.shuffle(clips)

# normalize data
clips_means = np.mean(clips, axis=0)
clips_deviations = np.std(clips, axis=0)
clips = np.nan_to_num((clips - clips_means) / clips_deviations)

%store clips_means
%store clips_deviations
%store clips



Don't worry about the `true_divide` `RuntimeWarning`; it's handled with `np.nan_to_num()`.

In [7]:
with open("{}/import_audio".format(log_dir), 'w') as log:
    log.write(import_audio.stdout)

## Build Network

In [None]:
(num_samples, num_timesteps, num_features) = clips.shape

### Keras Implementaion for Proof of Concept

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, TimeDistributed

Using TensorFlow backend.


In [None]:
X = clips
y = np.roll(clips, -1, axis=0)

In [None]:
model = Sequential()
model.add(LSTM(2048, input_shape=(None, num_features), return_sequences=True, activation='tanh'))
model.add(Dropout(0.5))
model.add(LSTM(1024, return_sequences=True, activation='tanh'))
model.add(Dropout(0.5))
model.add(TimeDistributed(Dense(num_features)))
model.compile(loss='mean_squared_error', optimizer='rmsprop', metrics=['accuracy'])

In [None]:
%%capture --no-stderr fit
history = model.fit(X, y, batch_size=1, epochs=100)
model.save("{}/model.h5".format(result_dir))
%store history

In [None]:
with open("{}/fit".format(log_dir), 'w') as log:
    log.write(fit.stdout)

## Generate Audio

In [None]:
def generate_audio(model, seed, length):
    prediction = seed
    output = np.zeros((1, length, len(seed)))
    for i in range(length):
        output[0, i] = prediction
        prediction = model.predict(output[:, :i+1, :])[0][-1]
        print("{}/{}".format(i+1, length), end="\r")
    return output

In [None]:
%%capture --no-stderr generate
# select random timestep from random sample as seed
seed = clips[np.random.randint(num_samples), np.random.randint(num_timesteps)] # I should add a bit of noise, too
result_transformed = generate_audio(model, seed, num_timesteps)
# denormalize data
result_denormalized = (result_transformed * clips_deviations) + clips_means
%store result_transformed

In [None]:
with open("{}/generate".format(log_dir), 'w') as log:
    log.write(generate.stdout)

## Cleanup

In [None]:
(real, imag) = np.split(result_denormalized[0], 2, axis=1)
result = istft((real + 1j*imag).T)[1].round().astype(np.int16)
%store result
wavfile.write("{}/result.wav".format(result_dir), samp_rate, result)

## Retrieval

In [None]:
%store -r
from keras.models import load_model

clips_means
clips_deviations
clips
model = load_model("{}/model.h5".format(result_dir))
history
result_transformed
result

In [None]:
# could the "music" generated be attributable to the denormalization process?
result_min = result_transformed.min()
result_max = result_transformed.max()
result_range = result_max - result_min
rand_transformed = (np.random.random(result_transformed.shape) * result_range) + result_min

rand_denormalized = (rand_transformed * clips_deviations) + clips_means

(rand_real, rand_imag) = np.split(rand_denormalized[0], 2, axis=1)
rand_result = istft((rand_real + 1j*rand_imag).T)[1].round().astype(np.int16)

wavfile.write("{}/rand_result.wav".format(result_dir), samp_rate, rand_result)