This model utilizes a DCGAN architecture

In [7]:
import numpy as np 
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
import music21
import matplotlib as plt
import torchvision.utils as vutils

import torch.backends.cudnn as cudnn
torch.cuda.empty_cache()
cudnn.benchmark = True  # Optimise for hardware

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


NameError: name '_C' is not defined

In [6]:
BATCH_SIZE = 64
EPOCHS = 100
NOISE_DIM= 100
NUM_CLASSES = 18
BETA1 = 0.5 # Hyperparamter for adam optimizer
LRD = 0.0001 # Might need to adjust
LRG = 0.002
EMBEDDING_DIM = 50

In [3]:


import torch
import torch.nn as nn

NUM_CLASSES = 18  # Set this to your actual number of classes
EMBEDDING_DIM = 50  # Example embedding dimension; adjust as needed
NOISE_DIM = 100  # Example noise dimension; adjust as needed

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        # Embedding for the labels
        self.label_emb = nn.Embedding(NUM_CLASSES, EMBEDDING_DIM)  # (N, 50)

        # Dense layer for the label embedding
        self.fc_label = nn.Linear(EMBEDDING_DIM, 8 * 16)  # (N, 128)

        # Dense layer for the latent noise input
        self.fc_noise = nn.Linear(NOISE_DIM, 128 * 8 * 16)  # (N, 128 * 8 * 16)

        self.model = nn.Sequential(
            nn.ConvTranspose2d(129, 128, kernel_size=4, stride=2, padding=1),  # Upsample to 16x32
            nn.BatchNorm2d(128), nn.ReLU(0.2),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),   # Upsample to 32x64
            nn.BatchNorm2d(64), nn.ReLU(0.2),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),   # Upsample to 64x128
            nn.BatchNorm2d(32), nn.ReLU(0.2),
            nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1)     # Upsample to 128x256
        )

        #self._initialize_weights()
        
    def forward(self, noise, labels):
       
        # Step 1: Embed the labels
        label_embedding = self.label_emb(labels)  # (N, EMBEDDING_DIM)
        
        label_embedding = self.fc_label(label_embedding)  # (N, 128)
    
        label_embedding = label_embedding.view(BATCH_SIZE, 8, 16, 1)  # Reshape to (N, 8, 16, 1)
        
        # Step 2: Process the latent noise input through a dense layer
        noise_embedding = self.fc_noise(noise)  # (N, 128 * 8 * 16)
        #noise_embedding = torch.relu(noise_embedding)  # Apply ReLU
        noise_embedding = noise_embedding.view(-1, 128, 8, 16)  # Reshape to (N, 128, 8, 16)

        # Step 3: Concatenate the noise and label embeddings along the channel axis
        x = torch.cat((noise_embedding, label_embedding.permute(0, 3, 1, 2)), dim=1)  # (N, 129, 8, 16)

        x = self.model(x)  # Apply the transposed convolutional layers

        # Step 9: Apply sigmoid activation to get output in range [0, 1]
        out = torch.sigmoid(x)  # Sigmoid activation for binary output

        return out  # Return the outp
    
    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.normal_(m.weight, 1.0, 0.02)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)


# Instantiate the model
latent_dim = 100
num_classes = 18
model = Generator()

# Example input (noise and labels)
batch_size = 64
noise = torch.randn(batch_size, latent_dim)  # (N, latent_dim)
labels = torch.randint(0, num_classes, (batch_size,))  # (N,)

# Forward pass
output = model(noise, labels)
print(output.shape)  # Should be (batch_size, 1, 128, 256)




Label embedding layer:  torch.Size([1, 8, 16, 1])
Noise embedding layer:  torch.Size([1, 128, 8, 16])
After concatenation:  torch.Size([1, 129, 8, 16])
First layer:  torch.Size([1, 128, 16, 32])
Second layer:  torch.Size([1, 64, 32, 64])
Third layer:  torch.Size([1, 32, 64, 128])
Final layer:  torch.Size([1, 1, 128, 256])


In [4]:
import torch
import torch.nn as nn

class Discriminator(nn.Module):
    def __init__(self, in_shape=(1, 128, 256), num_classes=NUM_CLASSES):
        super(Discriminator, self).__init__()

        self.label_emb = nn.Embedding(num_classes, EMBEDDING_DIM)

        # The output dimension after embedding will be the same as the input shape
        self.fc_label = nn.Linear(EMBEDDING_DIM, in_shape[1] * in_shape[2])

        self.model = nn.Sequential(
            nn.Conv2d(2, 128, kernel_size=3, stride=2, padding=1),  # (2, 128, 256) -> (128, 64, 128)
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=2, padding=1),  # (128, 64, 128) -> (128, 32, 64)
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),
            nn.Conv2d(128, 64, kernel_size=3, stride=2, padding=1),   # (128, 32, 64) -> (64, 16, 32)
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1),    # (64, 16, 32) -> (64, 8, 16)
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),
            nn.Conv2d(64, 32, kernel_size=3, stride=2, padding=1),    # (64, 8, 16) -> (32, 4, 8)
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),
            nn.Conv2d(32, 1, kernel_size=3, stride=2, padding=1)      # (32, 4, 8) -> (1, 2, 4)
        )
        
        #self._initialize_weights()
    def forward(self, pr, labels):
        # Step 1: Embed the labels
        label_embedding = self.label_emb(labels)
        label_embedding = self.fc_label(label_embedding)
        label_embedding = label_embedding.view(BATCH_SIZE, 1, 128, 256)  # Reshape to (N, 1, 128, 256)
        
        # Step 2: Concatenate image and label embeddings along the channel dimension
        x = torch.cat((pr, label_embedding), dim=1)  # Concatenate along channel dimension

        # Step 3: Pass through the model
        x = self.model(x)  # Apply the convolutional layers
        
        # Step 4: Average over height and width to get a single output
        out = torch.mean(x, dim=[2, 3])  # Global average pooling
        out = torch.sigmoid(out)  # Apply sigmoid activation for output

        return out  # Final output will be in range [0, 1]


    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.normal_(m.weight, 1.0, 0.02)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight)
                m.weight.data *= 0.1
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
# Example of usage
discriminator = Discriminator(in_shape=(1, 128, 256), num_classes=10)

# Example inputs: random image and label
batch_size = 64
output = torch.randn(batch_size, 1, 128, 256)  # Random image input
labels = torch.randint(0, 10, (batch_size,))  # Random labels (10 classes)

# Forward pass
output1 = discriminator(output, labels)

print(output1.shape)  # Output should be (batch_size, 1)


Label embedding:  torch.Size([1, 1, 128, 256])
Concatenation:  torch.Size([1, 2, 128, 256])
First layer:  torch.Size([1, 128, 64, 128])
Second layer:  torch.Size([1, 128, 32, 64])
Flatten  torch.Size([1, 262144])
Final:  torch.Size([1, 1])
torch.Size([1, 1])
tensor([[0.5015]], grad_fn=<SigmoidBackward0>)


In [None]:
from torch.utils.data import DataLoader, TensorDataset

inputs_seq = torch.load("Input_tensors.pt")
labels_seq = torch.load("Labels_tensors.pt")
dataset = TensorDataset(inputs_seq, labels_seq)

#Split into batches
batch_size = BATCH_SIZE
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Check that the data is loaded correctly
print("Number of input pianorolls: ", len(dataset))
print("Length of dataloader: ", len(dataloader))


In [None]:
# custom weights initialization called on ``netG`` and ``netD``
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm2d') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
# Send the models to the GPU
netG = Generator().to(device)
netD = Discriminator().to(device)


criterion = nn.BCELoss()
# Initialize the labels
real_label = 0.95
fake_label = 0

fixed_noise = torch.randn(BATCH_SIZE, NOISE_DIM,  device=device)

# Define the optimizers
optimizerG = optim.Adam(netG.parameters(), lr=LRG, betas=(BETA1, 0.999))
optimizerD = optim.Adam(netD.parameters(), lr=LRD, betas=(BETA1, 0.999))

netG.apply(weights_init)
netD.apply(weights_init)

print("Generator model: ", netG)
print("Discriminator model: ", netD)

In [None]:

# Lists to keep track of progress
output_list = []
G_losses = []
D_losses = []
iters = 0

num_epochs = 30

""" 
Data is being loaded correctly. For some reason, model keeps blowing up and not fucking working. D(x)should start close to 1 (predicting everything as real) before settling around 0.5
D(g(z)) should start around 0 (everything should be picked as fake) before sitting at 0.5.

This means the discriminator can't determine which is real and which is fake, so generator has been trained sufficiently.

I think this could be errors in the data or data normalization process

"""

print("Starting Training Loop...")
# For each epoch
for epoch in range(30):
    # For each batch in the dataloader
    for i, data in enumerate(dataloader):
        
        ##########
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        ## Train with all-real batch
        
        # Format batch
        real_cpu = data[0].to(device).float()   # ensure tensor is float32
        real_cpu = real_cpu.unsqueeze(1)
        
        b_size = real_cpu.size(0)
        
        if b_size != BATCH_SIZE:
            #print(f"Skipping batch {i} due to insufficient size: {b_size}")
            continue
        
        real_label_from_data = data[1].to(device).int()     # Generator and discriminator expect integer labels
        
    
        # Creates a label tensor of 1s to pass to loss function
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
        netD.zero_grad()
        # Forward pass real batch through D
        output = netD(real_cpu, real_label_from_data).view(-1)
        
        # Calculate loss on all-real batch
        errD_real = criterion(output, label)
        # Calculate gradients for D in backward pass
        errD_real.backward()
        D_x = output.mean().item()

        ## Train with all-fake batch
        # Generate batch of latent vectors
        noise = torch.randn(b_size, NOISE_DIM, dtype=torch.float, device=device)

        # Generate fake image batch with G
        fake = netG(noise, real_label_from_data)
        label.fill_(fake_label)
        # Classify all fake batch with D
        output = netD(fake.detach(), real_label_from_data).view(-1)
        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output, label)
        # Calculate the gradients for this batch, accumulated (summed) with previous gradients
        errD_fake.backward()
        D_G_z1 = output.mean().item()
        # Compute error of D as sum over the fake and the real batches
        errD = errD_real + errD_fake
        # Update D
        optimizerD.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
    
        netG.zero_grad()
        label.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake, real_label_from_data).view(-1)
        # Calculate G's loss based on this output
        errG = criterion(output, label)
        # Calculate gradients for G
        errG.backward()
        D_G_z2 = output.mean().item()
        # Update G
        optimizerG.step()

        # Output training stats
        if i % 5 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        if (iters % 10 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake = netG(fixed_noise, real_label_from_data).detach().cpu()   # Move back to CPU so it can be used.
            
            output_list.append(fake)

        iters += 1

In [None]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
import music21 as m21
import os
OUTPUT_FILES = "training_files"
TIME_STEP = 0.25
def convert_stream(matrix, format="midi", file_name='output.mid',filepath=OUTPUT_FILES, step_duration=TIME_STEP):
    """
    Converts the piano roll matrix back into a music 21 stream. Writes this stream to a midi file.
    :params matrix: 2D piano roll matrix
    :params format: format file type to write the stream
    :params file_name: the file name of the output file
    :params filepath: the output path of the directory holding the output files
    :params step_duration: the size of the step on the x axis of the piano roll matrix
    :returns None.
    """
    
    # Find the shape of the input matrix
    
    rows, cols = matrix.shape
    matrix = (matrix > 0.51).float()
    nulls = np.zeros((rows, 1))
    matrix = np.hstack((matrix, nulls))
    # Create two dictionaries. The first holds the notes that are on. The second holds each 'finished' note and its offset
    active_notes = {}
    note_list = {}

    # Iterates through every member in the matrix
    for col in range(cols - 1):
        for row in range(rows - 1):
            # Finds the midi pitch and creates a new note to represent the pitch and duration
            midi_pitch = row
            note = m21.note.Note(midi_pitch)
            note.quarterLength = step_duration

            # If this note is 'on':
            if matrix[row, col] == 1:
                # remove midi pitches outside the range of a piano
                if midi_pitch < 21 or midi_pitch > 95:
                    continue
                # Checks if the note has already been turned on, or is active.
                if midi_pitch in active_notes:
                    
                    # If already active, updates the step duration of the note in the dictionary
                    lst = active_notes[midi_pitch]
                    lst[0] = lst[0] + step_duration
                    active_notes[midi_pitch] = lst

                # If newly activated, then adds the note duration and offset items to the midi pitch key in the dictionary
                else:
                    note.offset = col * step_duration
                    active_notes[midi_pitch] = [note.quarterLength, note.offset]

            # If the member is off but still in acitve notes, creates a new note and removes it from the dictionary
            elif midi_pitch in active_notes:
                # Grabs the duration and offset of the note and creates a new note object with duraiton, offset, midi pitch attributes
                lst = active_notes[midi_pitch]
                note = m21.note.Note( midi_pitch)
                note.quarterLength = lst[0]
                note.offset = lst[1]
                # Adds this note to the note dictionary based off of the offset
                note_list[note.offset] = note
                del active_notes[midi_pitch]
                    
    # Creates a new stream and grabs the keys (offsets) and values (note onjects) from the note list dictionary
    new_stream = m21.stream.Stream()
    keys = list(note_list.keys())
    notes = list(note_list.values())

    # Iterates through every item in the dictionary
    for i in range(len(note_list)):
        # Inserts the note based off of its offset
        new_stream.insert(keys[i], notes[i])

    # Creates the filepath for the output file
    path = os.path.join(filepath, file_name)

    # Makes the directory if it doesn't exist
    os.makedirs(filepath, exist_ok=True)

    # Writes the stream as a midi file to the path
    new_stream.write(format, fp=path)



import matplotlib.pyplot as plt


print(len(output_list))

for i in range(len(output_list) - 1, 40, -1):
    filename = "Class" + str(i) + ".mid"
    
    # Visualize the first generated sample
    
    sample = output_list[i][0][0].cpu()  # Remove batch dimension
    convert_stream(sample, file_name=filename)
    plt.imshow(sample.detach().numpy(), cmap='gray')  # Use cmap='gray' for single-channel images
    plt.axis('off')  # Turn off axis
    plt.show()
# Output shape should be (batch_size, 1, 128, 256)


# If you want to visualize or save the generated samples, you can process them further
# For example, if you are using matplotlib to visualize:




