In [1]:
import numpy as np
import random
import sklearn
from sklearn import metrics, ensemble, preprocessing
from matplotlib import pyplot as plt
from scipy.stats import norm
import os
import datetime

np.set_printoptions(suppress=True)
np.random.seed(100)
random.seed(100)

In [2]:
import tensorflow as tf
from tensorflow import keras

print(tf.__version__)
print("GPU available:", len(tf.config.list_physical_devices('GPU')))
print(tf.test.gpu_device_name())

# gpus = tf.config.experimental.list_physical_devices('GPU')
# tf.config.experimental.set_visible_devices(gpus[0], 'GPU')

tf.config.run_functions_eagerly(True)

2.11.0
GPU available: 0



In [3]:
NOTES_QTY = 16
NOTE_PARAMS = 3
TEST_LEN = 10000

In [4]:
def prepare(dset):
    res = np.delete(dset, (1, 2), axis=2)
    
    tmp = res[..., 1].copy()
    res[..., 1] = res[..., 2]
    res[..., 2] = tmp
    
    # 0 - note
    # 1 - duration
    # 2 - velocity
    
    return res

In [5]:
def normalize_train(dset):    
    notes_scaler = sklearn.preprocessing.RobustScaler()
    notes_scaler.fit(dset[..., 0])
    dset[..., 0] = notes_scaler.transform(dset[..., 0])
    
    duration_scaler = sklearn.preprocessing.MaxAbsScaler()
    duration_scaler.fit(dset[..., 1])
    dset[..., 1] = duration_scaler.transform(dset[..., 1])
    
    dset[..., 2] /= 127
    
    return (notes_scaler, duration_scaler)

def normalize_test(dset, params):
    notes_scaler, duration_scaler = params
    
    dset[..., 0] = notes_scaler.transform(dset[..., 0])
    dset[..., 1] = duration_scaler.transform(dset[..., 1])
    dset[..., 2] /= 127
    

In [6]:
def mess_n(lines, n):
    mu, std = norm.fit(lines[..., 2].flatten())
    
    for line in lines:
        idxs = random.sample(range(len(line)), n)

        for idx in idxs:
            line[idx][2] = np.random.normal(mu, std)
            
def mess(test):
    test_messed = test.copy()
    for i in range(NOTES_QTY):
        mess_n(test_messed[i * TEST_LEN: (i + 1) * TEST_LEN], i+1)
        
    return test_messed

In [7]:
dset = np.load(r'C:\Users\mrshu\reps\music-style-performer\data\dset16.npy')
dset = dset.astype(float)

In [8]:
# unique, counts = np.unique(dset[...,3], return_counts=True)
# plt.plot(unique, counts, '.-')
# plt.show()

In [9]:
processed = prepare(dset)
np.random.shuffle(processed)

In [10]:
full_test_len = NOTES_QTY * TEST_LEN

train = processed[:-full_test_len] 
test = processed[-full_test_len:]
test_messed = mess(test)

params = normalize_train(train)
normalize_test(test, params)
normalize_test(test_messed, params)

print(f'train: {train.shape}, test: {test.shape}, messed: {test_messed.shape}')

train: (4912011, 16, 3), test: (160000, 16, 3), messed: (160000, 16, 3)


In [11]:
%matplotlib
plt.hist(test_messed[..., 2].flatten(), bins=32, label='messed')
plt.hist(test[..., 2].flatten(), bins=32, label='true', alpha=0.9)
plt.legend()
plt.show()

Using matplotlib backend: TkAgg


# Models

In [12]:
LATENT_DIM = 200
I = 0.1

## Autoencoder

In [45]:
class Autoencoder(keras.Model):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        self.build_encoder()
        self.build_decoder()
        
    def build_encoder(self):        
        self.encoder = keras.Sequential([
            keras.layers.Input(shape=(NOTES_QTY, NOTE_PARAMS)),
            keras.layers.LSTM(LATENT_DIM, activation="tanh"),
        ])
        
    def build_decoder(self):
        self.decoder = keras.Sequential([
            keras.layers.RepeatVector(NOTES_QTY),
            keras.layers.LSTM(NOTE_PARAMS, return_sequences=True)
        ])

    def call(self, x, training=True):
        encoded = self.encoder(x, training=training)
        decoded = self.decoder(encoded, training=training)
        return decoded
    

In [46]:
autoencoder = Autoencoder()

test_out = autoencoder(np.expand_dims(train[0], axis=0), training=False)
print(test_out.shape)

en_test_out = autoencoder.encoder(np.expand_dims(train[0], axis=0), training=False)
print(en_test_out.shape)

de_test_out = autoencoder.decoder(en_test_out, training=False)
print(de_test_out.shape)

(1, 16, 3)
(1, 200)
(1, 16, 3)


In [35]:
autoencoder_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

## Latent Discriminator

In [16]:
class LatentDisc(keras.Model):
    def __init__(self):
        super(LatentDisc, self).__init__()
        
        # self.l1 = keras.layers.Dense(128, activation='relu')(self.input)
        self.l1 = keras.layers.Dense(64, activation='relu')
        self.l2 = keras.layers.Dense(32, activation='relu')
        self.l3 = keras.layers.Dense(16, activation='relu')
        self.outp = keras.layers.Dense(1, activation='sigmoid')
        
    def call(self, x, training=True):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        x = self.outp(x)
        return x

In [17]:
ldisc = LatentDisc()

test_out = ldisc(np.ones(shape=(1, LATENT_DIM)), training=False)
print(test_out.shape)

(1, 1)


In [18]:
ldisc_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

## Visual Discriminator
(and classifier)

In [19]:
class VisualDisc(keras.Model):
    def __init__(self):
        super(VisualDisc, self).__init__()
        
        self.lstm = keras.layers.LSTM(1)
        
    def call(self, x, training=True):
        x = self.lstm(x)
        return x

In [20]:
vdisc = VisualDisc()

test_out = vdisc(train[0:1], training=False)
print(test_out.shape)

(1, 1)


In [21]:
vdisc_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [22]:
classifier = VisualDisc()
test_out = classifier(train[0:1], training=False)
print(test_out.shape)

(1, 1)


In [23]:
classifier_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Helpers

In [24]:
checkpoint_dir = r'C:\Users\mrshu\reps\music-style-performer\train_data\discriminator\checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(autoencoder_optimizer=autoencoder_optimizer,
                                ldisc_optimizer=ldisc_optimizer,
                                vdisc_optimizer=vdisc_optimizer,
                                classifier_optimizer=classifier_optimizer,
                                autoencoder=autoencoder,
                                ldisc=ldisc,
                                vdisc=vdisc,
                                classifier=classifier)

In [25]:
log_dir=r'C:\Users\mrshu\reps\music-style-performer\train_data\discriminator\logs/'

summary_writer = tf.summary.create_file_writer(
    log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [26]:
bce = tf.keras.losses.BinaryCrossentropy(from_logits=False)
mse = tf.keras.losses.MeanSquaredError()

In [27]:
l2_optimizer = tf.keras.optimizers.legacy.SGD(learning_rate=0.01, momentum=0.0)

# Training

In [28]:
INM_ITERS = 5
LAMBDA = 2
EPOCHS = 1

In [47]:
@tf.function
def train_step(x, step):
    n = np.random.normal(0, I, x.shape)
    l1 = autoencoder.encoder(x + n)
    l2 = tf.Variable(np.random.uniform(-1, 1, (1, LATENT_DIM)))
    
    #
    # Classifier update
    #
    
    with tf.GradientTape() as gt:
        l_classifier = bce([0, 1], [classifier(autoencoder.decoder(l2)),
                                    classifier(autoencoder.decoder(l1))])
        
    classifier_gradients = gt.gradient(l_classifier, classifier.trainable_variables)
    classifier_optimizer.apply_gradients(zip(classifier_gradients, classifier.trainable_variables)) 
    
    #
    # Discriminator update
    #
    
    with tf.GradientTape(persistent=True) as gt:
        l_latent = bce([0, 1], [ldisc(l1), ldisc(l2)])
        l_visual = bce([0, 1], [vdisc(autoencoder.decoder(l2)), vdisc(x)])
        
    ldisc_gradients = gt.gradient(l_latent, ldisc.trainable_variables)
    ldisc_optimizer.apply_gradients(zip(ldisc_gradients, ldisc.trainable_variables))
    
    vdisc_gradients = gt.gradient(l_visual, vdisc.trainable_variables)
    vdisc_optimizer.apply_gradients(zip(vdisc_gradients, vdisc.trainable_variables))
    
    #
    # Informative-negative mining
    #
    
    for i in range(INM_ITERS):
        with tf.GradientTape() as gt:
            l_l2 = bce([1], [classifier(autoencoder.decoder(l2))])
            
        l2_gradients = gt.gradient(l_l2, l2)
        l2_optimizer.apply_gradients([(l2_gradients, l2)])
    
    #
    # Generator update
    #
    
    with tf.GradientTape() as gt:
        gt.watch(autoencoder.variables)
        
        l_ae_latent = bce([1], [ldisc(l1)])
        l_ae_visual = bce([1], [vdisc(autoencoder.decoder(l2))])
        l_ae_mse = mse([x], [autoencoder(x + n)])
        
        total_ae_loss = l_ae_latent + l_ae_visual + LAMBDA * l_ae_mse
        
    ae_gradients = gt.gradient(total_ae_loss, autoencoder.variables)
    autoencoder_optimizer.apply_gradients(zip(ae_gradients, autoencoder.variables))
      
    #
    # Logging 
    #
    
    with summary_writer.as_default():
        tf.summary.scalar('l_classifier', l_classifier, step=step)
        tf.summary.scalar('l_latent', l_latent, step=step)
        tf.summary.scalar('l_visual', l_visual, step=step)
        tf.summary.scalar('l_ae_latent', l_ae_latent, step=step)
        tf.summary.scalar('l_ae_visual', l_ae_visual, step=step)
        tf.summary.scalar('l_ae_mse', l_ae_mse, step=step)
                                          

In [30]:
%reload_ext tensorboard
%tensorboard --logdir {log_dir} --host "0.0.0.0"

Reusing TensorBoard on port 6006 (pid 14800), started 0:11:01 ago. (Use '!kill 14800' to kill it.)

In [49]:
def train_loop(start, epochs):
    for epoch in range(start, start + epochs):
        for step in range(len(train)):
            train_step(train[step:step+1], step)
            
            if (step+1) % 1 == 0:
                print('.', end='', flush=True)
                
        np.random.shuffle(train)
                
        checkpoint.save(file_prefix=checkpoint_prefix)

In [50]:
train_loop(0, 10)

....

KeyboardInterrupt: 