In [None]:
!pip3 install torch matplotlib tqdm livelossplot "pypianoroll>=1.0.2"

In [None]:
from IPython.display import clear_output
from IPython.display import Audio as audio
from ipywidgets import interact, IntSlider

import os
import os.path
import random
from pathlib import Path

import numpy as np
import matplotlib.pyplot as plt
import torch
import pypianoroll
from pypianoroll import Multitrack, Track
from tqdm import tqdm
from livelossplot import PlotLosses
from livelossplot.outputs import MatplotlibPlot

In [None]:
n_tracks = 5  # number of tracks
n_pitches = 72  # number of pitches
lowest_pitch = 24  # MIDI note number of the lowest pitch
n_samples_per_song = 8  # number of samples to extract from each song in the datset
n_measures = 4  # number of measures per sample
beat_resolution = 4  # temporal resolution of a beat (in timestep)
programs = [0, 0, 25, 33, 48]  # program number for each track
is_drums = [True, False, False, False, False]  # drum indicator for each track
track_names = ['Drums', 'Piano', 'Guitar', 'Bass', 'Strings']  # name of each track
tempo = 100 #BPM

batch_size = 4
latent_dim = 128
n_steps = 10000

sample_interval = 100 # interval to run the sampler (in step)
n_samples = 4

In [None]:
measure_resolution = 4 * beat_resolution
tempo_array = np.full((4 * 4 * measure_resolution, 1), tempo)
assert 24 % beat_resolution == 0, (
    "beat_resolution must be a factor of 24 (the beat resolution used in "
    "the source dataset)."
)
assert len(programs) == len(is_drums) and len(programs) == len(track_names), (
    "Lengths of programs, is_drums and track_names must be the same."
)    

In [None]:
dataset_root = Path('/kaggle/input/lpd-5-cleansed/lpd_5/lpd_5_cleansed')
id_list = []
for dirname,_,paths in os.walk("/kaggle/input/lakh-piano-roll/amg"):
    for path in paths:
        filepath = os.path.join(dirname, path)
        if os.path.isfile(filepath):
            with open(filepath) as f:
                id_list.extend([line.rstrip() for line in f])
id_list = list(set(id_list))

In [None]:
def msd_id_to_dirs(msd_id):
    return os.path.join(msd_id[2], msd_id[3], msd_id[4], msd_id)

In [None]:
song_dir = str(dataset_root / msd_id_to_dirs('TREVDFX128E07859E0')) # 'TRQAOWZ128F93000A4', 'TREVDFX128E07859E0'
multitrack = pypianoroll.load(song_dir + '/' + os.listdir(song_dir)[0])
multitrack.trim(end=12 * 96)
axs = multitrack.plot()
plt.gcf().set_size_inches((16, 8))
for ax in axs:
    for x in range(96, 12 * 96, 96):     
        ax.axvline(x - 0.5, color='k', linestyle='-', linewidth=1)
plt.show()

In [None]:
data = []
for msd_id in tqdm(id_list):
    song_dir = dataset_root / msd_id_to_dirs(msd_id)
    multitrack = pypianoroll.load(song_dir / os.listdir(song_dir)[0])
    multitrack.binarize()
    multitrack.set_resolution(beat_resolution)
    pianoroll = (multitrack.stack() > 0)
    pianoroll = pianoroll[:, :, lowest_pitch:lowest_pitch + n_pitches]
    n_total_measures = multitrack.get_max_length() // measure_resolution
    candidate = n_total_measures - n_measures
    target_n_samples = min(n_total_measures // n_measures, n_samples_per_song)
    for idx in np.random.choice(candidate, target_n_samples, False):
        start = idx * measure_resolution
        end = (idx + n_measures) * measure_resolution
        if (pianoroll.sum(axis=(1, 2)) < 10).any():
            continue
        data.append(pianoroll[:, start:end])
random.shuffle(data)
data = np.stack(data)
print(f"Successfully collect {len(data)} samples from {len(id_list)} songs")
print(f"Data shape : {data.shape}")

In [None]:
tracks = []
for idx, (program, is_drum, track_name) in enumerate(zip(programs, is_drums, track_names)):
    pianoroll = np.pad(
        np.concatenate(data[:4], 1)[idx], ((0, 0), (lowest_pitch, 128 - lowest_pitch - n_pitches)))
    tracks.append(Track(name=track_name, program=program, is_drum=is_drum, pianoroll=pianoroll))
multitrack = Multitrack(tracks=tracks, tempo=tempo_array, resolution=beat_resolution)
axs = multitrack.plot()
plt.gcf().set_size_inches((16, 8))
for ax in axs:
    for x in range(measure_resolution, 4 * 4 * measure_resolution, measure_resolution):
        if x % (measure_resolution * 4) == 0:
            ax.axvline(x - 0.5, color='k')
        else:
            ax.axvline(x - 0.5, color='k', linestyle='-', linewidth=2)
plt.show()

In [None]:
data = torch.as_tensor(data, dtype=torch.float32)
dataset = torch.utils.data.TensorDataset(data)
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=batch_size, drop_last=True, shuffle=True)

In [None]:
class GeneraterBlock(torch.nn.Module):
    def __init__(self, in_dim, out_dim, kernel, stride):
        super().__init__()
        self.transconv = torch.nn.ConvTranspose3d(in_dim, out_dim, kernel, stride)
        self.batchnorm = torch.nn.BatchNorm3d(out_dim)
    
    def forward(self, x):
        x = self.transconv(x)
        x = self.batchnorm(x)
        return torch.nn.functional.relu(x)

In [None]:
class Generator(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.transconv0 = GeneraterBlock(latent_dim, 256, (4, 1, 1), (4, 1, 1))
        self.transconv1 = GeneraterBlock(256, 128, (1, 4, 1), (1, 4, 1))
        self.transconv2 = GeneraterBlock(128, 64, (1, 1, 4), (1, 1, 4))
        self.transconv3 = GeneraterBlock(64, 32, (1, 1, 3), (1, 1, 1))
        self.transconv4 = torch.nn.ModuleList([
            GeneraterBlock(32, 16, (1, 4, 1), (1, 4, 1))
            for _ in range(n_tracks)
        ])
        self.transconv5 = torch.nn.ModuleList([
            GeneraterBlock(16, 1, (1, 1, 12), (1, 1, 12))
            for _ in range(n_tracks)
        ])

    def forward(self, x):
        x = x.view(-1, latent_dim, 1, 1, 1)
        x = self.transconv0(x)
        x = self.transconv1(x)
        x = self.transconv2(x)
        x = self.transconv3(x)
        x = [transconv(x) for transconv in self.transconv4]
        x = torch.cat([transconv(x_) for x_, transconv in zip(x, self.transconv5)], 1)
        x = x.view(-1, n_tracks, n_measures * measure_resolution, n_pitches)
        return x

In [None]:
class LayerNorm(torch.nn.Module):
    def __init__(self, n_features, eps=1e-5, affine=True):
        super().__init__()
        self.n_features = n_features
        self.affine = affine
        self.eps = eps
        if self.affine:
            self.gamma = torch.nn.Parameter(torch.Tensor(n_features).uniform_())
            self.beta = torch.nn.Parameter(torch.zeros(n_features))

    def forward(self, x):
        shape = [-1] + [1] * (x.dim() - 1)
        mean = x.view(x.size(0), -1).mean(1).view(*shape)
        std = x.view(x.size(0), -1).std(1).view(*shape)
        y = (x - mean) / (std + self.eps)
        if self.affine:
            shape = [1, -1] + [1] * (x.dim() - 2)
            y = self.gamma.view(*shape) * y + self.beta.view(*shape)
        return y

In [None]:
class DiscriminatorBlock(torch.nn.Module):
    def __init__(self, in_dim, out_dim, kernel, stride):
        super().__init__()
        self.transconv = torch.nn.Conv3d(in_dim, out_dim, kernel, stride)
        self.layernorm = LayerNorm(out_dim)
    
    def forward(self, x):
        x = self.transconv(x)
        x = self.layernorm(x)
        return torch.nn.functional.leaky_relu(x)

In [None]:
class Discriminator(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.conv0 = torch.nn.ModuleList([
            DiscriminatorBlock(1, 16, (1, 1, 12), (1, 1, 12)) for _ in range(n_tracks)
        ])
        self.conv1 = torch.nn.ModuleList([
            DiscriminatorBlock(16, 16, (1, 4, 1), (1, 4, 1)) for _ in range(n_tracks)
        ])
        self.conv2 = DiscriminatorBlock(16 * 5, 64, (1, 1, 3), (1, 1, 1))
        self.conv3 = DiscriminatorBlock(64, 64, (1, 1, 4), (1, 1, 4))
        self.conv4 = DiscriminatorBlock(64, 128, (1, 4, 1), (1, 4, 1))
        self.conv5 = DiscriminatorBlock(128, 128, (2, 1, 1), (1, 1, 1))
        self.conv6 = DiscriminatorBlock(128, 256, (3, 1, 1), (3, 1, 1))
        self.dense = torch.nn.Linear(256, 1)

    def forward(self, x):
        x = x.view(-1, n_tracks, n_measures, measure_resolution, n_pitches)
        x = [conv(x[:, [i]]) for i, conv in enumerate(self.conv0)]
        x = torch.cat([conv(x_) for x_, conv in zip(x, self.conv1)], 1)
        x = self.conv2(x)
        x = self.conv3(x)          
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        
        x = x.view(-1, 256)
        x = self.dense(x)
        return x

In [None]:
def compute_gradient_penalty(discriminator, real_samples, fake_samples):
    if torch.cuda.is_available(): 
        alpha = torch.rand(real_samples.size(0), 1, 1, 1).cuda()
    else:
        alpha = torch.rand(real_samples.size(0), 1, 1, 1)
    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples))
    interpolates = interpolates.requires_grad_(True)
    d_interpolates = discriminator(interpolates)
    if torch.cuda.is_available():
        fake = torch.ones(real_samples.size(0), 1).cuda()
    else:
        fake = torch.ones(real_samples.size(0), 1)
    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=fake,
        create_graph=True,
        retain_graph=True,
        only_inputs=True
    )[0]
    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

In [None]:
def train_one_step(d_optimizer, g_optimizer, real_samples):
    latent = torch.randn(batch_size, latent_dim)

    if torch.cuda.is_available():
        real_samples = real_samples.cuda()
        latent = latent.cuda()
        
    d_optimizer.zero_grad()
    prediction_real = discriminator(real_samples)
    d_loss_real = -torch.mean(prediction_real)
    d_loss_real.backward()
    
    fake_samples = generator(latent)
    prediction_fake_d = discriminator(fake_samples.detach())
    d_loss_fake = torch.mean(prediction_fake_d)
    d_loss_fake.backward()

    gradient_penalty = 10.0 * compute_gradient_penalty(
        discriminator, real_samples.data, fake_samples.data)
    gradient_penalty.backward()

    d_optimizer.step()
    
    g_optimizer.zero_grad()
    prediction_fake_g = discriminator(fake_samples)
    g_loss = -torch.mean(prediction_fake_g)
    g_loss.backward()
    g_optimizer.step()

    return d_loss_real + d_loss_fake, g_loss

In [None]:
discriminator = Discriminator()
generator = Generator()
print("Number of parameters in G: {}".format(
    sum(p.numel() for p in generator.parameters() if p.requires_grad)))
print("Number of parameters in D: {}".format(
    sum(p.numel() for p in discriminator.parameters() if p.requires_grad)))

# Create optimizers
d_optimizer = torch.optim.Adam(
    discriminator.parameters(), lr=0.001,  betas=(0.5, 0.9))
g_optimizer = torch.optim.Adam(
    generator.parameters(), lr=0.001, betas=(0.5, 0.9))

sample_latent = torch.randn(n_samples, latent_dim)

if torch.cuda.is_available():
    discriminator = discriminator.cuda()
    generator = generator.cuda()
    sample_latent = sample_latent.cuda()

history_samples = {}

liveloss = PlotLosses(outputs=[MatplotlibPlot(cell_size=(6,2))])

step = 0

In [None]:
progress_bar = tqdm(total=n_steps, initial=step, ncols=80, mininterval=1)

count = 0
while step < n_steps + 1:
    for real_samples in data_loader:
        generator.train()
        d_loss, g_loss = train_one_step(d_optimizer, g_optimizer, real_samples[0])

        if step > 0:
            running_d_loss = 0.05 * d_loss + 0.95 * running_d_loss
            running_g_loss = 0.05 * g_loss + 0.95 * running_g_loss
        else:
            running_d_loss, running_g_loss = 0.0, 0.0
        liveloss.update({'negative_critic_loss': -running_d_loss})
        
        progress_bar.set_description_str(
            "(d_loss={: 8.6f}, g_loss={: 8.6f})".format(d_loss, g_loss))
        
        if step % sample_interval == 0:
            generator.eval()
            samples = generator(sample_latent).cpu().detach().numpy()
            steps = [0, sample_interval, 10 * sample_interval, 100 * sample_interval, n_steps]
            if step in steps:
                history_samples[count] = samples
                count = count + 1

            clear_output(True)
            
            samples = samples.transpose(1, 0, 2, 3).reshape(n_tracks, -1, n_pitches)
            tracks = []
            for idx, (program, is_drum, track_name) in enumerate(
                zip(programs, is_drums, track_names)
            ):
                pianoroll = np.pad(
                    samples[idx] > 0.5,
                    ((0, 0), (lowest_pitch, 128 - lowest_pitch - n_pitches))
                )
                tracks.append(
                    Track(
                        name=track_name,
                        program=program,
                        is_drum=is_drum,
                        pianoroll=pianoroll
                    )
                )
            m = Multitrack(
                tracks=tracks,
                tempo=tempo_array,
                resolution=beat_resolution
            )
            axs = m.plot()
            plt.gcf().set_size_inches((16, 8))
            for ax in axs:
                for x in range(
                    measure_resolution,
                    4 * measure_resolution * n_measures,
                    measure_resolution
                ):
                    if x % (measure_resolution * 4) == 0:
                        ax.axvline(x - 0.5, color='k')
                    else:
                        ax.axvline(x - 0.5, color='k', linestyle='-', linewidth=1)
            plt.show()
            
        step += 1
        progress_bar.update(1)
        if step >= n_steps:
            break

In [None]:
steps = [0, sample_interval, 10 * sample_interval, 100 * sample_interval, n_steps]
for step in range(0,5):
    print(f"Step={steps[step]}")
    samples = history_samples[step].transpose(1, 0, 2, 3).reshape(n_tracks, -1, n_pitches)
    tracks = []
    for idx, (program, is_drum, track_name) in enumerate(zip(programs, is_drums, track_names)):
        pianoroll = np.pad(
            samples[idx] > 0.5,
            ((0, 0), (lowest_pitch, 128 - lowest_pitch - n_pitches))
        )
        tracks.append(
            Track(
                name=track_name,
                program=program,
                is_drum=is_drum,
                pianoroll=pianoroll,
            )
        )
    m = Multitrack(tracks=tracks, tempo=tempo_array, resolution=beat_resolution)
    axs = m.plot()
    for ax in axs:
        for x in range(
            measure_resolution,
            4 * measure_resolution * n_measures,
            measure_resolution
        ):
            if x % (measure_resolution * 4) == 0:
                ax.axvline(x - 0.5, color='k')
            else:
                ax.axvline(x - 0.5, color='k', linestyle='-', linewidth=1)
    plt.gcf().set_size_inches((16, 8))
    plt.show()

In [None]:
samples = history_samples[4].transpose(1, 0, 2, 3).reshape(n_tracks, -1, n_pitches)
tracks = []
for idx, (program, is_drum, track_name) in enumerate(zip(programs, is_drums, track_names)):
    pianoroll = np.pad(
        samples[idx] > 0.5,
        ((0, 0), (lowest_pitch, 128 - lowest_pitch - n_pitches))
    )
    tracks.append(
        Track(
            name=track_name,
            program=program,
            is_drum=is_drum,
            pianoroll=pianoroll,
        )
    )
generated_multitrack = pypianoroll.Multitrack(tracks=tracks, tempo=tempo_array, resolution=beat_resolution)
generated_multitrack.plot()

In [None]:
from pypianoroll import Multitrack
from pypianoroll import load as midi_load
import tensorflow as tf
generated_multitrack.save('/kaggle/working/out.npz')
m1 = midi_load('/kaggle/working/out.npz')
m1.write('/kaggle/working/final_out_10000.mid')

In [None]:
mt = pypianoroll.read("/kaggle/working/final_out_10000.mid")

In [None]:
pypianoroll.empty_beat_rate(torch.Tensor(mt),4)

In [None]:
pypianoroll.n_pitches_used(mt)

In [None]:
import pretty_midi
gm = pretty_midi.PrettyMIDI('/kaggle/working/final_out_10000.mid')
piano_roll = []
for instrument in gm.instruments:
    piano_roll.append(instrument.get_piano_roll(fs=100))
maximum = 0
for i in piano_roll:
    if i.shape[1] > maximum:
        maximum = i.shape[1]
print(maximum)
sizes = []
for i in piano_roll:
    sizes.append(i.shape)
print(sizes)
for i in range(0,len(piano_roll)):
    if piano_roll[i].shape[1] < maximum:
        print(piano_roll[i].shape[1])
        pad_value = 0
        diff = maximum - piano_roll[i].shape[1]
        print(diff)
        new = np.pad(piano_roll[i], ((0, 0), (0, diff)), mode='constant', constant_values=pad_value)
        piano_roll[i] = new
        print(piano_roll[i].shape[1])
sizes = []
for i in piano_roll:
    sizes.append(i.shape)
print(sizes)
concatenated_piano_roll = np.concatenate(piano_roll, axis=0)
transpose_piano_roll = np.transpose(concatenated_piano_roll)

In [None]:
pypianoroll.polyphonic_rate(transpose_piano_roll,2)

In [None]:
pypianoroll.pitch_range(transpose_piano_roll)

In [None]:
pypianoroll.pitch_range_tuple(transpose_piano_roll)

In [None]:
audio("/kaggle/input/generated-music/final_out_100000.mp3")