In [None]:
## data: pop music in midi, jazz in midi
## Model: GAN

In [None]:
!unzip /content/drive/MyDrive/PJCMidi.zip -d "/content/drive/My Drive/PJCMidi"

Module

In [None]:
!pip install pretty_midi
import datetime
import numpy as np
import copy
import tensorflow as tf
import numpy as np
import pretty_midi


def set_piano_roll_to_instrument(piano_roll, instrument, velocity=100, tempo=120.0, beat_resolution=16):
    # Calculate time per pixel
    tpp = 60.0 / tempo / float(beat_resolution)
    threshold = 60.0 / tempo / 4
    phrase_end_time = 60.0 / tempo * 4 * piano_roll.shape[0]
    # Create piano_roll_search that captures note onsets and offsets
    piano_roll = piano_roll.reshape((piano_roll.shape[0] * piano_roll.shape[1], piano_roll.shape[2]))
    piano_roll_diff = np.concatenate((np.zeros((1, 128), dtype=int), piano_roll, np.zeros((1, 128), dtype=int)))
    piano_roll_search = np.diff(piano_roll_diff.astype(int), axis=0)
    # Iterate through all possible(128) pitches

    for note_num in range(128):
        # Search for notes
        start_idx = (piano_roll_search[:, note_num] > 0).nonzero()
        start_time = list(tpp * (start_idx[0].astype(float)))
        end_idx = (piano_roll_search[:, note_num] < 0).nonzero()
        end_time = list(tpp * (end_idx[0].astype(float)))

        temp_start_time = [i for i in start_time]
        temp_end_time = [i for i in end_time]

        for i in range(len(start_time)):
            # print(start_time)
            if start_time[i] in temp_start_time and i != len(start_time) - 1:
                # print('i and start_time:', i, start_time[i])
                t = []
                current_idx = temp_start_time.index(start_time[i])
                for j in range(current_idx + 1, len(temp_start_time)):
                    # print(j, temp_start_time[j])
                    if temp_start_time[j] < start_time[i] + threshold and temp_end_time[j] <= start_time[i] + threshold:
                        # print('popped start time:', temp_start_time[j])
                        t.append(j)
                        # print('popped temp_start_time:', t)
                for _ in t:
                    temp_start_time.pop(t[0])
                    temp_end_time.pop(t[0])
                # print('popped temp_start_time:', temp_start_time)

        start_time = temp_start_time
        end_time = temp_end_time
        duration = [pair[1] - pair[0] for pair in zip(start_time, end_time)]

        if len(end_time) < len(start_time):
            d = len(start_time) - len(end_time)
            start_time = start_time[:-d]
        # Iterate through all the searched notes
        for idx in range(len(start_time)):
            if duration[idx] >= threshold:
                # Create an Note object with corresponding note number, start time and end time
                note = pretty_midi.Note(velocity=velocity, pitch=note_num, start=start_time[idx], end=end_time[idx])
                # Add the note to the Instrument object
                instrument.notes.append(note)
            else:
                if start_time[idx] + threshold <= phrase_end_time:
                    # Create an Note object with corresponding note number, start time and end time
                    note = pretty_midi.Note(velocity=velocity, pitch=note_num, start=start_time[idx],
                                            end=start_time[idx] + threshold)
                else:
                    # Create an Note object with corresponding note number, start time and end time
                    note = pretty_midi.Note(velocity=velocity, pitch=note_num, start=start_time[idx],
                                            end=phrase_end_time)
                # Add the note to the Instrument object
                instrument.notes.append(note)
    # Sort the notes by their start time
    instrument.notes.sort(key=lambda note: note.start)


def write_piano_roll_to_midi(piano_roll, filename, program_num=0, is_drum=False, velocity=100,
                             tempo=120.0, beat_resolution=16):
    # Create a PrettyMIDI object
    midi = pretty_midi.PrettyMIDI(initial_tempo=tempo)
    # Create an Instrument object
    instrument = pretty_midi.Instrument(program=program_num, is_drum=is_drum)
    # Set the piano roll to the Instrument object
    set_piano_roll_to_instrument(piano_roll, instrument, velocity, tempo, beat_resolution)
    # Add the instrument to the PrettyMIDI object
    midi.instruments.append(instrument)
    # Write out the MIDI data
    midi.write(filename)


def write_piano_rolls_to_midi(piano_rolls, program_nums=None, is_drum=None, filename='test.mid', velocity=100,
                              tempo=120.0, beat_resolution=24):
    if len(piano_rolls) != len(program_nums) or len(piano_rolls) != len(is_drum):
        print("Error: piano_rolls and program_nums have different sizes...")
        return False
    if not program_nums:
        program_nums = [0, 0, 0]
    if not is_drum:
        is_drum = [False, False, False]
    # Create a PrettyMIDI object
    midi = pretty_midi.PrettyMIDI(initial_tempo=tempo)
    # Iterate through all the input instruments
    for idx in range(len(piano_rolls)):
        # Create an Instrument object
        instrument = pretty_midi.Instrument(program=program_nums[idx], is_drum=is_drum[idx])
        # Set the piano roll to the Instrument object
        set_piano_roll_to_instrument(piano_rolls[idx], instrument, velocity, tempo, beat_resolution)
        # Add the instrument to the PrettyMIDI object
        midi.instruments.append(instrument)
    # Write out the MIDI data
    midi.write(filename)


# new added functions for cyclegan
class ImagePool(object):

    def __init__(self, maxsize=50):
        self.maxsize = maxsize
        self.num_img = 0
        self.images = []

    def __call__(self, image):
        if self.maxsize <= 0:
            return image
        if self.num_img < self.maxsize:
            self.images.append(image)
            self.num_img += 1
            return image
        if np.random.rand() > 0.5:
            idx = int(np.random.rand()*self.maxsize)
            tmp1 = copy.copy(self.images[idx])[0]
            self.images[idx][0] = image[0]
            idx = int(np.random.rand()*self.maxsize)
            tmp2 = copy.copy(self.images[idx])[1]
            self.images[idx][1] = image[1]
            return [tmp1, tmp2]
        else:
            return image


def load_npy_data(npy_data):
    npy_A = np.load(npy_data[0]) * 1.  # 64 * 84 * 1
    npy_B = np.load(npy_data[1]) * 1.  # 64 * 84 * 1
    npy_AB = np.concatenate((npy_A.reshape(npy_A.shape[0], npy_A.shape[1], 1),
                             npy_B.reshape(npy_B.shape[0], npy_B.shape[1], 1)),
                            axis=2)  # 64 * 84 * 2
    return npy_AB


def save_midis(bars, file_path, tempo=80.0):
    padded_bars = np.concatenate((np.zeros((bars.shape[0], bars.shape[1], 24, bars.shape[3])),
                                  bars,
                                  np.zeros((bars.shape[0], bars.shape[1], 20, bars.shape[3]))),
                                 axis=2)
    padded_bars = padded_bars.reshape(-1, 64, padded_bars.shape[2], padded_bars.shape[3])
    padded_bars_list = []
    for ch_idx in range(padded_bars.shape[3]):
        padded_bars_list.append(padded_bars[:, :, :, ch_idx].reshape(padded_bars.shape[0],
                                                                     padded_bars.shape[1],
                                                                     padded_bars.shape[2]))
    # this is for multi-track version
    # write_midi.write_piano_rolls_to_midi(padded_bars_list, program_nums=[33, 0, 25, 49, 0],
    #                                      is_drum=[False, True, False, False, False], filename=file_path, tempo=80.0)

    # this is for single-track version
    write_piano_rolls_to_midi(piano_rolls=padded_bars_list,
                                         program_nums=[0],
                                         is_drum=[False],
                                         filename=file_path,
                                         tempo=tempo,
                                         beat_resolution=4)


def get_now_datetime():
    now = datetime.datetime.now().strftime('%Y-%m-%d')
    return str(now)


def to_binary(bars, threshold=0.0):
    """Turn velocity value into boolean"""
    track_is_max = tf.equal(bars, tf.reduce_max(bars, axis=-1, keepdims=True))
    track_pass_threshold = (bars > threshold)
    out_track = tf.logical_and(track_is_max, track_pass_threshold)
    return out_track


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Model

In [None]:
import glob
import time
import numpy as np
import tensorflow as tf
from collections import namedtuple

def abs_criterion(pred, target):
    return tf.reduce_mean(tf.abs(pred - target))


def mae_criterion(pred, target):
    return tf.reduce_mean((pred - target) ** 2)


def sce_criterion(logits, labels):
    return tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=labels))


def softmax_criterion(logits, labels):
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))


def padding(x, p=3):
    return tf.pad(x, [[0, 0], [p, p], [p, p], [0, 0]], "REFLECT")

class InstanceNorm(tf.keras.layers.Layer):
    def __init__(self, epsilon=1e-5):
        super(InstanceNorm, self).__init__()
        self.epsilon = epsilon
    
    def call(self, x):
        scale = tf.Variable(
            initial_value=np.random.normal(1., 0.02, x.shape[-1:]),
            trainable=True,
            name='SCALE',
            dtype=tf.float32
        )
        offset = tf.Variable(
            initial_value=np.zeros(x.shape[-1:]),
            trainable=True,
            name='OFFSET',
            dtype=tf.float32
        )
        mean, variance = tf.nn.moments(x, axes=[1, 2], keepdims=True)
        inv = tf.math.rsqrt(variance + self.epsilon)
        normalized = (x - mean) * inv
        return scale * normalized + offset


class ResNetBlock(tf.keras.layers.Layer):
    def __init__(self, dim, k_init, ks=3, s=1):
        super(ResNetBlock, self).__init__()
        self.dim = dim 
        self.k_init = k_init 
        self.ks = ks
        self.s = s
        self.p = (ks - 1) // 2
        # For ks = 3, p = 1
        self.padding = "valid"

    def call(self, x):
        y = tf.keras.layers.Lambda(padding, arguments={"p": self.p}, name="PADDING_1")(x)
        # After first padding, (batch * 130 * 130 * 3)

        y = tf.keras.layers.Conv2D(
            filters=self.dim,
            kernel_size=self.ks,
            strides=self.s,
            padding=self.padding,
            kernel_initializer=self.k_init,
            use_bias=False
        )(y)
        y = InstanceNorm()(y)
        y = tf.keras.layers.ReLU()(y)
        # After first conv2d, (batch * 128 * 128 * 3)

        y = tf.keras.layers.Lambda(padding, arguments={"p": self.p}, name="PADDING_2")(y)
        # After second padding, (batch * 130 * 130 * 3)

        y = tf.keras.layers.Conv2D(
            filters=self.dim,
            kernel_size=self.ks,
            strides=self.s,
            padding=self.padding,
            kernel_initializer=self.k_init,
            use_bias=False
        )(y)
        y = InstanceNorm()(y)
        y = tf.keras.layers.ReLU()(y + x)
        # After second conv2d, (batch * 128 * 128 * 3)

        return y

def build_discriminator(options, name='Discriminator'):

    initializer = tf.random_normal_initializer(0., 0.02)

    inputs = tf.keras.Input(shape=(options.time_step,
                          options.pitch_range,
                          options.output_nc))

    x = inputs

    x = tf.keras.layers.Conv2D(filters=options.df_dim,
                      kernel_size=7,
                      strides=2,
                      padding='same',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_1')(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)
    # (batch * 32 * 42 * 64)

    x = tf.keras.layers.Conv2D(filters=options.df_dim * 4,
                      kernel_size=7,
                      strides=2,
                      padding='same',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_2')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)
    # (batch * 16 * 21 * 256)

    x = tf.keras.layers.Conv2D(filters=1,
                      kernel_size=7,
                      strides=1,
                      padding='same',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_3')(x)
    # (batch * 16 * 21 * 1)

    outputs = x

    return tf.keras.Model(inputs=inputs,
                 outputs=outputs,
                 name=name)


def build_generator(options, name='Generator'):

    initializer = tf.random_normal_initializer(0., 0.02)

    inputs = tf.keras.Input(shape=(options.time_step,
                          options.pitch_range,
                          options.output_nc))

    x = inputs
    # (batch * 64 * 84 * 1)

    x = tf.keras.layers.Lambda(padding,
                      name='PADDING_1')(x)
    # (batch * 70 * 90 * 1)

    x = tf.keras.layers.Conv2D(filters=options.gf_dim,
                      kernel_size=7,
                      strides=1,
                      padding='valid',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_1')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.ReLU()(x)
    # (batch * 64 * 84 * 64)

    x = tf.keras.layers.Conv2D(filters=options.gf_dim * 2,
                      kernel_size=3,
                      strides=2,
                      padding='same',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_2')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.ReLU()(x)
    # (batch * 32 * 42 * 128)

    x = tf.keras.layers.Conv2D(filters=options.gf_dim * 4,
                      kernel_size=3,
                      strides=2,
                      padding='same',
                      kernel_initializer=initializer,
                      use_bias=False,
                      name='CONV2D_3')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.ReLU()(x)
    # (batch * 16 * 21 * 256)

    for i in range(10):
        x = ResNetBlock(dim=options.gf_dim * 4, k_init=initializer)(x)
    # (batch * 16 * 21 * 256)

    x = tf.keras.layers.Conv2DTranspose(filters=options.gf_dim * 2,
                               kernel_size=3,
                               strides=2,
                               padding='same',
                               kernel_initializer=initializer,
                               use_bias=False,
                               name='DECONV2D_1')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.ReLU()(x)
    # (batch * 32 * 42 * 128)

    x = tf.keras.layers.Conv2DTranspose(filters=options.gf_dim,
                               kernel_size=3,
                               strides=2,
                               padding='same',
                               kernel_initializer=initializer,
                               use_bias=False,
                               name='DECONV2D_2')(x)
    x = InstanceNorm()(x)
    x = tf.keras.layers.ReLU()(x)
    # (batch * 64 * 84 * 64)

    x = tf.keras.layers.Lambda(padding,
                      name='PADDING_2')(x)
    # After padding, (batch * 70 * 90 * 64)

    x = tf.keras.layers.Conv2D(filters=options.output_nc,
                      kernel_size=7,
                      strides=1,
                      padding='valid',
                      kernel_initializer=initializer,
                      activation='sigmoid',
                      use_bias=False,
                      name='CONV2D_4')(x)
    # (batch * 64 * 84 * 1)

    outputs = x

    return tf.keras.Model(inputs=inputs,
                 outputs=outputs,
                 name=name)


class CycleGAN(object):

    def __init__(self):

        self._build_model()
        # self.generator = build_generator

        print("initialize model...")

    def _build_model(self):
        self.lr = 0.0002
        self.batch_size = 16

        # Generator
        self.g = build_generator(options, name='generator')

        # Discriminator
        self.d = build_discriminator(options, name='discriminator')

        # Discriminator and Generator Optimizer
        self.d_optimizer = tf.keras.optimizers.Adam(self.lr, beta_1=0.5)
        self.g_optimizer = tf.keras.optimizers.Adam(self.lr, beta_1=0.5)

    def train(self, epoch, time_step, batch_size):
        self.batch_size = batch_size
        # Data from domain A and B
        dataA = glob.glob("/content/drive/MyDrive/PJCMidi/JP_P/train/*.*")
        dataB = glob.glob("/content/drive/MyDrive/PJCMidi/JP_J/train/*.*")

        start_time = time.time()

        for e in range(epoch):

            # Shuffle training data
            np.random.shuffle(dataA)
            np.random.shuffle(dataB)

            # Get the proper number of batches
            batch_idxs = min(len(dataA), len(dataB)) // batch_size

            # learning rate starts to decay when reaching the threshold
            self.lr = self.lr if e < time_step else self.lr * (e-epoch) / (epoch-time_step)

            for idx in range(batch_idxs):

                # To feed real_data
                batch_files = list(zip(dataA[idx * self.batch_size:(idx + 1) * self.batch_size],
                                       dataB[idx * self.batch_size:(idx + 1) * self.batch_size]))
                batch_samples = [load_npy_data(batch_file) for batch_file in batch_files]
                batch_samples = np.array(batch_samples).astype(np.float32)  # batch_size * 64 * 84 * 2
                real_A, real_B = batch_samples[:, :, :, 0], batch_samples[:, :, :, 1]
                real_A = tf.expand_dims(real_A, -1)
                real_B = tf.expand_dims(real_B, -1)

                # generate gaussian noise for robustness improvement
                gaussian_noise = np.abs(np.random.normal(0,
                                                         0,
                                                         [self.batch_size,
                                                          time_step,
                                                          84,
                                                          1])).astype(np.float32)

                with tf.GradientTape(persistent=True) as gen_tape, tf.GradientTape(persistent=True) as disc_tape:

                    fake_B = self.g(real_A, training=True)
                    DB_real = self.d(real_B + gaussian_noise,
                                                   training=True)
                    DB_fake = self.d(fake_B + gaussian_noise,
                                                       training=True)

                    # Generator loss
                    g_loss = mae_criterion(DB_fake, tf.ones_like(DB_fake))

                    # Discriminator loss
                    d_B_loss_real = mae_criterion(DB_real, tf.ones_like(DB_real))
                    d_B_loss_fake = mae_criterion(DB_fake, tf.zeros_like(DB_fake))
                    d_loss = (d_B_loss_real + d_B_loss_fake) / 2

                    # Calculate the gradients for generator and discriminator
                    g_gradients = gen_tape.gradient(target=g_loss,
                                                    sources=self.g.trainable_variables)
                    d_gradients = disc_tape.gradient(target=d_loss,
                                                                   sources=self.d.trainable_variables)

                    # Apply the gradients to the optimizer
                    self.g_optimizer.apply_gradients(zip(g_gradients,
                                                            self.g.trainable_variables))
                    self.d_optimizer.apply_gradients(zip(d_gradients,
                                                          self.d.trainable_variables))

                    print('=================================================================')
                    print(("Epoch: [%2d] [%4d/%4d] time: %4.4f D_loss: %6.2f, G_loss: %6.2f" %
                           (e, idx, batch_idxs, time.time() - start_time, d_loss, g_loss)))


    def generate_song(self, name):

        song = np.load('/content/drive/MyDrive/popMidis/'+name+'.midi')

        transfer = self.g(song, training=False)

        save_midis(transfer, '/content/drive/MyDrive/popMidis/'+name+'-jazz.midi', 127)
        np.save('/content/drive/MyDrive/popMidis/'+name+'-jazz.midi', transfer)


In [None]:
OPTIONS = namedtuple('OPTIONS', 'batch_size '
                                    'time_step '
                                    'input_nc '
                                    'output_nc '
                                    'pitch_range '
                                    'gf_dim '
                                    'df_dim ')
options = OPTIONS._make((128,
                          64,
                         1,
                         1,
                         84,
                         64,
                         64))

g = build_generator(options)
d = build_discriminator(options)
print(g.summary())

model = CycleGAN()
model.train(30, 64, 16)

Model: "Generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 64, 84, 1)]       0         
                                                                 
 PADDING_1 (Lambda)          (None, 70, 90, 1)         0         
                                                                 
 CONV2D_1 (Conv2D)           (None, 64, 84, 64)        3136      
                                                                 
 instance_norm (InstanceNorm  (None, 64, 84, 64)       0         
 )                                                               
                                                                 
 re_lu (ReLU)                (None, 64, 84, 64)        0         
                                                                 
 CONV2D_2 (Conv2D)           (None, 32, 42, 128)       73728     
                                                         



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch: [26] [ 304/ 701] time: 8683.5002 D_loss:   0.12, G_loss:   0.42
Epoch: [26] [ 305/ 701] time: 8683.9788 D_loss:   0.13, G_loss:   0.83
Epoch: [26] [ 306/ 701] time: 8684.4198 D_loss:   0.09, G_loss:   0.69
Epoch: [26] [ 307/ 701] time: 8684.8989 D_loss:   0.13, G_loss:   0.49
Epoch: [26] [ 308/ 701] time: 8685.3509 D_loss:   0.17, G_loss:   1.24
Epoch: [26] [ 309/ 701] time: 8685.8280 D_loss:   0.14, G_loss:   0.39
Epoch: [26] [ 310/ 701] time: 8686.2977 D_loss:   0.12, G_loss:   0.75
Epoch: [26] [ 311/ 701] time: 8686.7521 D_loss:   0.15, G_loss:   0.70
Epoch: [26] [ 312/ 701] time: 8687.2323 D_loss:   0.13, G_loss:   0.70
Epoch: [26] [ 313/ 701] time: 8687.6897 D_loss:   0.13, G_loss:   0.74
Epoch: [26] [ 314/ 701] time: 8688.1731 D_loss:   0.11, G_loss:   0.52
Epoch: [26] [ 315/ 701] time: 8688.6584 D_loss:   0.17, G_loss:   0.70
Epoch: [26] [ 316/ 701] time: 8689.1237 D_loss:   0.15, G_loss:   0.74
Epoch: [26] 

AttributeError: ignored