In [None]:
#cell 0

!pip3 install numpy
!pip3 install matplotlib
!pip3 install tensorflow
!pip3 install mido
!pip3 install tqdm

#if data/piano_scale.wav does not exist, download it from https://ances.ai/nnss/piano_scale.wav with python requests
import requests
import os
if not os.path.exists('data/piano_scale.wav'):
    r = requests.get('http://ances.ai/nnss/piano_scale.wav')
    with open('data/piano_scale.wav', 'wb') as f:
        f.write(r.content)
print('done')     


In [None]:
#cell 1

import scipy.io.wavfile
import numpy as np
import pickle
import matplotlib.pyplot as plt
import matplotlib
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
import IPython.display as ipd
import soundfile as sf
from tensorflow.keras.optimizers import RMSprop, Adam, SGD, schedules
from tensorflow.keras.callbacks import LambdaCallback, ModelCheckpoint
import json
import os
import mido
from tqdm import tqdm

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
%matplotlib notebook

matplotlib.rcParams['figure.figsize'] = [9, 7]

tf.config.list_physical_devices("GPU")

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# tf.compat.v1.disable_eager_execution()

def to_np(tensor):
    return tensor.eval(session=tf.compat.v1.Session())

tf.compat.v1.experimental.output_all_intermediates(True)

Definition of functions we need and global variables 

In [None]:
#cell 2

def get_notes(midifile):
    mid = mido.MidiFile(midifile, clip=True)
    ticks_per_beat = mid.ticks_per_beat

    for note in mid.tracks[0]:
        if note.type == "set_tempo":
            tempo = note.tempo
            
    time_cum = 0
    notes=[]

    for note in mid.tracks[1]:
        time_cum += note.time
        if note.type == "note_on":
            t = mido.tick2second(time_cum, ticks_per_beat, tempo)
            notes.append({'note':note.note,'velocity':note.velocity,'start':t})
        elif note.type=="note_off":
            t = mido.tick2second(time_cum, ticks_per_beat, tempo)
            for a in range(0,len(notes)):
                if notes[a]['note']==note.note and 'end' not in notes[a]:
                    notes[a]['end']=t
    return notes

def merge(chunks,offset):
    audio=chunks[0]
    for a in range(1,len(chunks)):
        audio[-2*offset:] += chunks[a][:2*offset]
        audio=np.concatenate((audio,chunks[a][2*offset:]))
    return audio

def ances_normalization(s,power=1):
    ab = np.power(np.abs(s),power)
    an = np.angle(s)
    return ab*np.exp(1j*an) 


def generate_files(audio, file_prefix="X", shuffle=True):
    len_chunk=FRAME_LENGTH+(FRAME_STEP*(STEPS-1))
    #interpolate audio
    audio = np.interp(np.arange(0, len(audio), 1/FACTOR), np.arange(0, len(audio)), audio)
    if shuffle:
        np.random.seed(0)
        permutation = np.random.permutation((audio.shape[0]-len_chunk)//(STEPS*FRAME_STEP))*STEPS*FRAME_STEP
    else:
        permutation = np.arange(0,(audio.shape[0]-len_chunk)//(STEPS*FRAME_STEP))*STEPS*FRAME_STEP

    arr = []
    part=0
    end=MAX_FILES*BATCH_LIMIT
    for a in tqdm(permutation[:end]):
        chunk=audio[a:a+len_chunk]
        if chunk.shape[0]<(len_chunk):
            break
        else:
            spectrogram=tf.signal.stft(chunk,frame_length=FRAME_LENGTH,frame_step=FRAME_STEP, window_fn=tf.signal.hann_window).numpy().astype(np.complex64)
            spectrogram=spectrogram[:,:BINS_LIMIT]
            arr.append(spectrogram) 
        if len(arr)>=BATCH_LIMIT:
            arr=ances_normalization(np.array(arr),1/POWER_RATE) 
            pickle.dump(arr, open(DATA_FOLDER+file_prefix+'{:04d}'.format(part)+".data", "wb"), protocol=4)
            arr=[]
            part+=1

def encode_notes(notes):
    len_encode = int(max([note['end'] for note in notes]))
    encoding=np.zeros((len_encode*SAMPLERATE+SAMPLERATE))
    _m1=np.arange(0.,SAMPLERATE*5,1.)/(SAMPLERATE*5)
    for note in tqdm(notes):
        start=int(note['start']*SAMPLERATE)
        end=int(note['end']*SAMPLERATE)
        s=np.zeros((end-start,))
        time=np.arange(0.,end-start,1.)

        s+=1.1*np.sin(np.pi*time*(1-time/(SAMPLERATE*5)))*1.1
        s+=1.1*np.sin(np.pi*time*(note['note'])/(127*1.5))*1.1
        encoding[start:end]+=s
        
    encoding[encoding>1]=1 #to add some harmonics
    return encoding


FACTOR=8
FRAME_STEP=1
FRAME_LENGTH=2048*FACTOR
STEPS=400
BATCH_LIMIT=640
BINS_LIMIT=1024
MAX_FILES=400
POWER_RATE=8
SAMPLERATE=44100
DATA_FOLDER="2TB/synt/"
BATCH_SIZE=64
    

To generate spectrograms, run the code below. This could need a lot of HD memory. 2TB in my case. 

In [None]:
#cell 3

bach_notes=get_notes('data/bach_test_short.mid')
escala_notes=get_notes('data/scale.mid')

with open('data/piano_scale.wav', 'rb') as f:
    piano, sr = sf.read(f)

print('Encoding scale') 
encoding=encode_notes(escala_notes)[:piano.shape[0]]

print('Encoding Bach test')
encoding_test=encode_notes(bach_notes)

#cleaning folder
for filename in os.listdir(DATA_FOLDER):
    if "X_" in filename or "Y_" in filename:
        os.remove(DATA_FOLDER+filename)

print("Generating X_test files...")
generate_files(encoding_test, file_prefix="X_test_", shuffle=False)
print("Generating X_ files...")
generate_files(encoding, file_prefix="X_", shuffle=True)
print("Generating Y_ files...")
generate_files(piano, file_prefix="Y_", shuffle=True)


In [None]:
with open(DATA_FOLDER+"X_test_0001.data", "rb") as file:
    X_test = pickle.load(file)

image = np.abs(X_test[15]).squeeze().T
plt.imshow(image,origin='lower')
plt.show()

Once the data is generated, i have to reboot the kernel to free memory, and run the first and second cell again, jumping to cell 4

In [None]:
#cell 4

def train_generator(batch_size=4):
    files = [f.replace('.data', '').replace('Y_', '') for f in os.listdir(DATA_FOLDER) if f.endswith('.data') and f.startswith('Y_')]
    files.sort()
    while True:
        for p in files[:-1]:
            with open(DATA_FOLDER+"X_"+p+".data", "rb") as file:
                X = pickle.load(file)
            with open(DATA_FOLDER+"Y_"+p+".data", "rb") as file:
                Y = pickle.load(file)
            for offset in range(0, X.shape[0], batch_size):
                yield X[offset:offset+batch_size], Y[offset:offset+batch_size]
                
def test_generator(batch_size=4):
    files = [f.replace('.data', '').replace('Y_', '') for f in os.listdir(DATA_FOLDER) if f.endswith('.data') and f.startswith('Y_')]
    files.sort()
    while True:
        with open(DATA_FOLDER+"X_"+files[-1]+".data", "rb") as file:
            X = pickle.load(file)
        with open(DATA_FOLDER+"Y_"+files[-1]+".data", "rb") as file:
            Y = pickle.load(file)
        for offset in range(0, X.shape[0], batch_size):
            yield X[offset:offset+batch_size], Y[offset:offset+batch_size]

#I know, it's slow as hell and inefficient. I'll fix it later. Calculate STFT and ISTFT consumes a lot of GPU memory
def testor(batch_size=1,frame_step=1):
    files = [f for f in os.listdir(DATA_FOLDER) if f.endswith('.data') and f.startswith('X_test_')]
    files.sort()
    rec=[]
    for file in files:
        with open(DATA_FOLDER+file, "rb") as file:
            X_test = pickle.load(file)
            
        for offset in range(0, X_test.shape[0], batch_size):
            pred = model.predict_on_batch(X_test[offset:offset+batch_size])
            pred = ances_normalization(pred,8)
            pred = tf.signal.inverse_stft(pred, frame_length=FRAME_LENGTH, 
                                                  frame_step=FRAME_STEP,
                                                  window_fn=tf.signal.inverse_stft_window_fn(FRAME_STEP, forward_window_fn=tf.signal.hann_window)).numpy()
            for r in pred:
                rec.append(r)
                
    rec = np.array(rec)
    return merge(rec.copy(),int(FRAME_LENGTH//2))

#here the model

in1 = Input(shape=(None,1024), dtype=tf.complex64)
re = tf.math.real(in1)
im = tf.math.imag(in1)
co = Concatenate(axis=-1)([re,im])
lstm = Bidirectional(LSTM(1000, return_sequences=True))(co)
lstm = Bidirectional(LSTM(1000, return_sequences=True))(lstm)
re = Dense(1024, activation="linear", name="real")(lstm)
im = Dense(1024, activation="linear", name="imag")(lstm)
out = tf.complex(re, im)

model = Model(inputs=in1, outputs=out)

opt = Adam(learning_rate=0.0002)
model.compile(loss='mean_squared_error', optimizer=opt) 

model.summary()

#model.load_weights("weights/d00000097.h5", by_name=False)

In [None]:
#cell 5

data_epoch = "train0.txt"

#if os.path.exists(data_epoch):
#    os.remove(data_epoch)

def on_epoch_end(epoch, logs=None):
    if epoch % 10 == 0:
        print("Rendering the test")
        pred = testor()
        pickle.dump(pred, open("audio_test/"+str(epoch)+".data", "wb"), protocol=4)

    with open(data_epoch, 'a') as the_file:
        the_file.write(json.dumps(str(logs))+'\n')


print_callback = LambdaCallback(on_epoch_end=on_epoch_end)

mc = ModelCheckpoint('weights/d{epoch:08d}.h5',
                     save_weights_only=True, save_freq=200)

history = model.fit(train_generator(batch_size=BATCH_SIZE),
                    validation_data=test_generator(batch_size=BATCH_SIZE),
                    steps_per_epoch=(MAX_FILES-1)*BATCH_LIMIT//BATCH_SIZE,
                    validation_steps=BATCH_LIMIT//BATCH_SIZE,
                    epochs=1000,
                    initial_epoch=100,
                    callbacks=[print_callback, mc])

In [None]:
with open(DATA_FOLDER+"X_0001.data", "rb") as file:
    X = pickle.load(file)
    
step = 64

pred = model.predict_on_batch(X[:64])

image = np.abs(pred[0]).squeeze().T
plt.imshow(image,origin='lower')
plt.show()
