<a href="https://colab.research.google.com/github/DRIP-AI-RESEARCH-JUNIOR/MUSIC_GENEARATION/blob/master/DCGanMusic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!cp -r /content/drive/My\ Drive/midi /content

In [3]:
!cp -r /content/drive/My\ Drive/Nottingham /content

In [1]:
%matplotlib inline
import os
import sys
import random
import math
sys.path.append('midi')
import torch.utils.data as data
from midi_utils import midiread, midiwrite
import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt
import skimage.io as io
from IPython.display import FileLink

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.utils as vutils
import numpy as np
import torch.utils.data as data

# DATALOADER

In [3]:
def midi_filename_to_piano_roll(midi_filename):
    
    midi_data = midiread(midi_filename, dt=0.3)
    
    piano_roll = midi_data.piano_roll.transpose()
    
    # Pressed notes are replaced by 1
    piano_roll[piano_roll > 0] = 1
    
    return piano_roll
 
 
def pad_piano_roll(piano_roll, max_length=132333, pad_value=0):
        
    original_piano_roll_length = piano_roll.shape[1]
    
    padded_piano_roll = np.zeros((88, max_length))
    padded_piano_roll[:] = pad_value
    
    padded_piano_roll[:, -original_piano_roll_length:] = piano_roll
 
    return padded_piano_roll
 
 

In [4]:
 class NotesGenerationDataset(data.Dataset):
    
    def __init__(self, midi_folder_path, longest_sequence_length=1491):
        
        self.midi_folder_path = midi_folder_path
        
        midi_filenames = os.listdir(midi_folder_path)
        
        self.longest_sequence_length = longest_sequence_length
        
        midi_full_filenames = map(lambda filename: os.path.join(midi_folder_path, filename),midi_filenames)
        
        self.midi_full_filenames = list(midi_full_filenames)
        
        if longest_sequence_length is None:
            
            self.update_the_max_length()
    
    
    def update_the_max_length(self):
        
        sequences_lengths = map(lambda filename: midi_filename_to_piano_roll(filename).shape[1],self.midi_full_filenames)
        
        max_length = max(sequences_lengths)
        
        self.longest_sequence_length = max_length
                
    
    def __len__(self):
        
        return len(self.midi_full_filenames)
    
    def __getitem__(self, index):
        
        midi_full_filename = self.midi_full_filenames[index]
        
        piano_roll = midi_filename_to_piano_roll(midi_full_filename)

        # padding sequence so that all of them have the same length
        input_sequence_padded = pad_piano_roll(piano_roll, max_length=self.longest_sequence_length)
        # print(input_sequence_padded.shape)
                
        input_sequence_padded = input_sequence_padded.transpose()
        input_sequence_padded = torch.FloatTensor(input_sequence_padded).unsqueeze(0)

        return input_sequence_padded

In [5]:
trainset = NotesGenerationDataset('Nottingham/train/', longest_sequence_length=None)
 
train_loader = data.DataLoader(trainset, batch_size=4,shuffle=True, drop_last=True)

In [None]:
a = next(iter(train_loader))
print(a.shape)

torch.Size([8, 1, 1491, 88])


# UTILS FUNCTIONS

In [6]:
def conv_cond_concat(x, y):
    """Concatenate conditioning vector on feature map axis."""
    x_shapes = x.shape
    y_shapes = y.shape
    y2 = y.expand(x_shapes[0],y_shapes[1],x_shapes[2],x_shapes[3])

    return torch.cat((x, y2),1)

def conv_prev_concat(x, y):
    """Concatenate conditioning vector on feature map axis."""
    x_shapes = x.shape
    y_shapes = y.shape
    if x_shapes[2:] == y_shapes[2:]:
        y2 = y.expand(x_shapes[0],y_shapes[1],x_shapes[2],x_shapes[3])

        return torch.cat((x, y2),1)

    else:
        print(x_shapes[2:])
        print(y_shapes[2:])



def batch_norm_1d(x):
    x_shape = x.shape[1]
    batch_nor = nn.BatchNorm1d(x_shape, eps=1e-05, momentum=0.9, affine=True)
    batch_nor = batch_nor.cuda()

    output = batch_nor(x)
    return output


def batch_norm_1d_cpu(x):
    x_shape = x.shape[1]
    # ipdb.set_trace()
    # batch_nor = nn.BatchNorm1d(x_shape, eps=1e-05, momentum=0.9, affine=True)
    # output = batch_nor(x)
    output = x
    return output





def batch_norm_2d(x):
    x_shape = x.shape[1]
    batch_nor = nn.BatchNorm2d(x_shape, eps=1e-05, momentum=0.9, affine=True)
    batch_nor = batch_nor.cuda()
    output = batch_nor(x)
    return output


def batch_norm_2d_cpu(x):
    # x_shape = x.shape[1]
    # batch_nor = nn.BatchNorm2d(x_shape, eps=1e-05, momentum=0.9, affine=True)
    # batch_nor = batch_nor
    # output = batch_nor(x)
    output = x
    return output



def sigmoid_cross_entropy_with_logits(inputs,labels):
    loss = nn.BCEWithLogitsLoss()
    output = loss(inputs, labels)
    return output



def reduce_mean(x):
    output = torch.mean(x,0, keepdim = False)
    output = torch.mean(output,-1, keepdim = False)
    return output


def reduce_mean_0(x):
    output = torch.mean(x,0, keepdim = False)
    return output


def l2_loss(x,y):
    loss_ = nn.MSELoss(reduction='sum')
    l2_loss_ = loss_(x, y)/2
    return l2_loss_



def lrelu(x, leak=0.2):
    z = torch.mul(x,leak)
    return torch.max(x, z)

# DISCRIMINATOR

In [7]:
# Batch size during training
batch_size = 4

# Spatial size of training images. All images will be resized to this
#   size using a transformer.
image_size = 64

# Number of channels in the training images. For color images this is 3
nc = 1

# Size of z latent vector (i.e. size of generator input)
nz = 100

# Size of feature maps in generator
ngf = 16

# Size of feature maps in discriminator
ndf = 16

# Number of training epochs
epochs = 5

# Learning rate for optimizers
lr = 0.0002

# Beta1 hyperparam for Adam optimizers
beta1 = 0.5

# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1

In [8]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        nc = 1
        ndf = 16
        self.layer = nn.Sequential(
            # input is 1 x 1491 x 88
            nn.Conv2d(nc, ndf, (11,3), 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf, ndf * 2, (11,3), 3, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 2, ndf * 4, (7,3), 3, 2, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 4, ndf * 8, (9,3), 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 8, 1, (7,5), 4, 1, bias=False),
            nn.BatchNorm2d(1),
            nn.LeakyReLU(0.2, inplace=True),
        )
        self.linear = nn.Conv2d(1, 1, (9,1),1,0,bias=False)
        self.sig = nn.Sigmoid()
        

    def forward(self, input):
        x = self.layer(input)
        x = self.linear(x)
        out = self.sig(x)
        return out,x

In [13]:
inp = torch.randn(128,1,1491,88)
d = Discriminator()
out,x = d(inp)
print(out.shape,x.shape)

torch.Size([128, 1, 1, 1]) torch.Size([128, 1, 1, 1])


# GENERATOR

In [9]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d( nz, 1, (9,1),1,0,bias=False),
            nn.BatchNorm2d(1),
            nn.ReLU(True),
            nn.ConvTranspose2d(1, ngf * 8, (8,5), 4, 1, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            # state size. (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, (9,4), 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            # state size. (ngf*4) x 8 x 8
            nn.ConvTranspose2d( ngf * 4, ngf * 2, (9,4), 3, 2, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            # state size. (ngf*2) x 16 x 16
            nn.ConvTranspose2d( ngf * 2, ngf, (12,4), 3, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            # state size. (ngf) x 32 x 32
            nn.ConvTranspose2d( ngf, nc,  (11,4), 2, 1, bias=False),
            nn.Tanh()
            # state size. (nc) x 64 x 64
        )
        # self.conv1 = nn.ConvTranspose2d( nz, 1, (9,1),1,0,bias=False)
        # self.conv2 = nn.ConvTranspose2d( 1, ngf * 8, (8,5), 4, 1, bias=False)
        # self.conv3 = nn.ConvTranspose2d(ngf * 8, ngf * 4, (9,4), 2, 1, bias=False)
        # self.conv4 = nn.ConvTranspose2d( ngf * 4, ngf * 2, (9,4), 3, 2, bias=False)
        # self.conv5 = nn.ConvTranspose2d( ngf * 2, ngf, (12,4), 3, 1, bias=False)
        # self.conv6 = nn.ConvTranspose2d( ngf, nc,  (11,4), 2, 1, bias=False)

    def forward(self, input):
        # print('1',input.shape)
        # x = self.conv1(input)
        # print('2',x.shape)
        # x = self.conv2(x)
        # print('3',x.shape)
        # x = self.conv3(x)
        # print('4',x.shape)
        # x = self.conv4(x)
        # print('5',x.shape)
        # x = self.conv5(x)
        # print('6',x.shape)
        # x = self.conv6(x)
        # return x
        return self.main(input)

In [None]:
net_g = Generator()
z = torch.randn(128,100,1,1)
out = net_g(z)
print(out.shape)

torch.Size([128, 1, 1491, 88])


# testing model

# SANPLE GENERATOR

In [None]:
class sample_generator(nn.Module):
    def __init__(self):
        super(sample_generator, self).__init__()
        self.gf_dim   = 64
        self.y_dim   = 13
        self.n_channel = 256

        self.h1      = nn.ConvTranspose2d(in_channels=157, out_channels=pitch_range, kernel_size=(2,1), stride=(2,2))
        self.h2      = nn.ConvTranspose2d(in_channels=157, out_channels=pitch_range, kernel_size=(2,1), stride=(2,2))
        self.h3      = nn.ConvTranspose2d(in_channels=157, out_channels=pitch_range, kernel_size=(2,1), stride=(2,2))
        self.h4      = nn.ConvTranspose2d(in_channels=157, out_channels=1, kernel_size=(1,pitch_range), stride=(1,2))

        self.h0_prev = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(1,pitch_range), stride=(1,2))
        self.h1_prev = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(2,1), stride=(2,2))
        self.h2_prev = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(2,1), stride=(2,2))
        self.h3_prev = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(2,1), stride=(2,2))

        self.linear1 = nn.Linear(113,1024)
        self.linear2 = nn.Linear(1037,self.gf_dim*2*2*1)

    def forward(self, z, prev_x, y ,batch_size,pitch_range):

        # h3_prev = F.leaky_relu(self.batch_nor_256(self.h0_prev(prev_x)),0.2)
        h0_prev = lrelu(batch_norm_2d_cpu(self.h0_prev(prev_x)),0.2)   #[72, 16, 16, 1]
        h1_prev = lrelu(batch_norm_2d_cpu(self.h1_prev(h0_prev)),0.2)  #[72, 16, 8, 1]
        h2_prev = lrelu(batch_norm_2d_cpu(self.h2_prev(h1_prev)),0.2)  #[72, 16, 4, 1]
        h3_prev = lrelu(batch_norm_2d_cpu(self.h3_prev(h2_prev)),0.2)  #[72, 16, 2, 1])

        yb = y.view(batch_size,  self.y_dim, 1, 1)  #(72,13,1,1)

        z = torch.cat((z,y),1)         #(72,113)

        h0 = F.relu(batch_norm_1d_cpu(self.linear1(z)))    #(72,1024)
        h0 = torch.cat((h0,y),1)   #(72,1037)

        h1 = F.relu(batch_norm_1d_cpu(self.linear2(h0)))   #(72, 256)
        h1 = h1.view(batch_size, self.gf_dim * 2, 2, 1)     #(72,128,2,1)
        h1 = conv_cond_concat(h1,yb) #(b,141,2,1)
        h1 = conv_prev_concat(h1,h3_prev)  #(72, 157, 2, 1)

        h2 = F.relu(batch_norm_2d_cpu(self.h1(h1)))  #(72, 128, 4, 1)
        h2 = conv_cond_concat(h2,yb) #([72, 141, 4, 1])
        h2 = conv_prev_concat(h2,h2_prev)  #([72, 157, 4, 1])

        h3 = F.relu(batch_norm_2d_cpu(self.h2(h2)))  #([72, 128, 8, 1]) 
        h3 = conv_cond_concat(h3,yb)  #([72, 141, 8, 1])
        h3 = conv_prev_concat(h3,h1_prev) #([72, 157, 8, 1])

        h4 = F.relu(batch_norm_2d_cpu(self.h3(h3)))  #([72, 128, 16, 1])
        h4 = conv_cond_concat(h4,yb)  #([72, 141, 16, 1])
        h4 = conv_prev_concat(h4,h0_prev) #([72, 157, 16, 1])

        g_x = torch.sigmoid(self.h4(h4)) #([72, 1, 16, 128])

        return g_x


# TRAINING

In [10]:
def train(netG, netD, optimizerG, optimizerD,criterion ,epochs,dataloader, batch_size, nz, device=torch.device('cuda')):
  
  fixed_noise = torch.randn(batch_size, nz, device=device)
  netD.train()
  netG.train()
  
  real_label = 1
  fake_label = 0
  average_lossD = 0
  average_lossG = 0
  average_D_x   = 0
  average_D_G_z = 0
  
  for epoch in range(epochs):
    sum_lossD = 0
    sum_lossG = 0
    sum_D_x   = 0
    sum_D_G_z = 0
    average_lossD = 0
    average_lossG = 0
    average_D_x   = 0
    average_D_G_z = 0
    
    for i, (data) in enumerate(train_loader, 0):
      
      #############################################################
      # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
      #############################################################
      
      # train with real      
      netD.zero_grad()
      real_cpu = data.to(device)
      
      batch_size = real_cpu.size()[0]
      label = torch.full((batch_size,), real_label,dtype=torch.float, device=device)
      D, D_logits = netD(real_cpu)
      
      #####loss
      d_loss_real = reduce_mean(sigmoid_cross_entropy_with_logits(D_logits, 0.9*torch.ones_like(D)))
      d_loss_real.backward(retain_graph=True)
      D_x = D.mean().item()
      sum_D_x += D_x 
      
      # train with fake
      noise = torch.randn(batch_size, nz,1,1, device=device)
      fake = netG(noise)
      label.fill_(fake_label)
      D_, D_logits_ = netD(fake.detach())
      d_loss_fake = reduce_mean(sigmoid_cross_entropy_with_logits(D_logits_, torch.zeros_like(D_)))
      
      d_loss_fake.backward(retain_graph=True)
      D_G_z1 = D_.mean().item()
      errD = d_loss_real + d_loss_fake
      errD = errD.item()
      sum_lossD += errD
      optimizerD.step()
      
      #############################################
      # (2) Update G network: maximize log(D(G(z)))
      #############################################
      
      netG.zero_grad()
      label.fill_(real_label)  # fake labels are real for generator cost
      D_, D_logits_= netD(fake)
      
      ###loss
      errG = reduce_mean(sigmoid_cross_entropy_with_logits(D_logits_, torch.ones_like(D_)))

      errG.backward(retain_graph=True)
      sum_lossG +=errG
      D_G_z2 = D_.mean().item()
      sum_D_G_z += D_G_z2
      optimizerG.step()
  
    average_lossD = (sum_lossD / len(train_loader))
    average_lossG = (sum_lossG / len(train_loader))
    average_D_x = (sum_D_x / len(train_loader))
    average_D_G_z = (sum_D_G_z / len(train_loader))
  
    # lossD_list.append(average_lossD)
    # lossG_list.append(average_lossG)
    # D_x_list.append(average_D_x)
    # D_G_z_list.append(average_D_G_z)
  
    print('==> Epoch: {} Average lossD: {:.10f} average_lossG: {:.10f},average D(x): {:.10f},average D(G(z)): {:.10f} '.format(
     epoch, average_lossD,average_lossG,average_D_x, average_D_G_z))
    del average_lossD,average_lossG,data,average_D_x,real_cpu,average_D_G_z,sum_lossD,sum_lossG,sum_D_x,sum_D_G_z,label,noise,fake,D,D_,D_logits,D_logits_,d_loss_fake,d_loss_real
    torch.cuda.empty_cache()
  

In [11]:
def main():
  epochs = 50
  lr = 0.0002
  
  device = torch.device('cuda')
  
  netG = Generator().to(device)
  netD = Discriminator().to(device)

  criterion = nn.BCELoss()
  
  optimizerD = optim.Adam(netD.parameters(), lr=0.001, betas=(0.5, 0.999))
  optimizerG = optim.Adam(netG.parameters(), lr=0.01, betas=(0.5, 0.999)) 
  train(netG, netD, optimizerG, optimizerD,criterion ,epochs,train_loader,batch_size, nz, device=device)

In [12]:
main()

==> Epoch: 0 Average lossD: 0.8903129959 average_lossG: 1.1008552313,average D(x): 0.7327624298,average D(G(z)): 0.3446985123 
==> Epoch: 1 Average lossD: 0.4859435402 average_lossG: 2.0230648518,average D(x): 0.8898300408,average D(G(z)): 0.1384144361 
==> Epoch: 2 Average lossD: 0.3993730983 average_lossG: 2.9117200375,average D(x): 0.8949334318,average D(G(z)): 0.0566387063 
==> Epoch: 3 Average lossD: 0.3763580059 average_lossG: 3.5685777664,average D(x): 0.8950791486,average D(G(z)): 0.0311852932 
==> Epoch: 4 Average lossD: 0.3477070602 average_lossG: 4.1130576134,average D(x): 0.8989750846,average D(G(z)): 0.0164908592 
==> Epoch: 5 Average lossD: 0.3375416052 average_lossG: 4.6302423477,average D(x): 0.8999665827,average D(G(z)): 0.0098784929 
==> Epoch: 6 Average lossD: 0.3333091135 average_lossG: 5.0038037300,average D(x): 0.9001691486,average D(G(z)): 0.0067403686 
==> Epoch: 7 Average lossD: 0.3316283386 average_lossG: 5.3023233414,average D(x): 0.8997073311,average D(G(z))

# test

In [21]:
def train(netG, netD, optimizerG, optimizerD,criterion ,epochs,dataloader, batch_size, nz, device=torch.device('cuda')):
    img_list = []
    G_losses = []
    D_losses = []
    iters = 0
    real_label = 1.
    fake_label = 0.

    print("Starting Training Loop...")
    # For each epoch
    for epoch in range(num_epochs):
    # For each batch in the dataloader
        for i, data in enumerate(dataloader, 0):

        ############################
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        ## Train with all-real batch
            netD.zero_grad()
        # Format batch
            real_cpu = data.to(device)
            b_size = real_cpu.size()[0]
            label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
        # Forward pass real batch through D
            output = netD(real_cpu).view(-1)
        # Calculate loss on all-real batch
            errD_real = criterion(output, label)
        # Calculate gradients for D in backward pass
            errD_real.backward()
            D_x = output.mean().item()

        ## Train with all-fake batch
        # Generate batch of latent vectors
            noise = torch.randn(b_size, nz, 1, 1, device=device)
        # Generate fake image batch with G
            fake = netG(noise)
            label.fill_(fake_label)
        # Classify all fake batch with D
            output = netD(fake.detach()).view(-1)
        # Calculate D's loss on the all-fake batch
            errD_fake = criterion(output, label)
        # Calculate the gradients for this batch
            errD_fake.backward()
            D_G_z1 = output.mean().item()
        # Add the gradients from the all-real and all-fake batches
            errD = errD_real + errD_fake
        # Update D
            optimizerD.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
            netG.zero_grad()
            label.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass of all-fake batch through D
            output = netD(fake).view(-1)
        # Calculate G's loss based on this output
            errG = criterion(output, label)
        # Calculate gradients for G
            errG.backward()
            D_G_z2 = output.mean().item()
        # Update G
            optimizerG.step()

        # Output training stats
            if i % 50 == 0:
                print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                      % (epoch, num_epochs, i, len(dataloader),
                         errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
            G_losses.append(errG.item())
            D_losses.append(errD.item())
