In [None]:
import mido
from mido import Message, MidiFile, MidiTrack,MetaMessage
import string
import numpy as np
import pandas as pd
from midi_arr import *
import os
import csv


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

## Load data

In [None]:
# # label path     
# label_path = r'D:\BrownUnivercity\CS2470\final_proj\CS2470_final_project\data\label.csv'
# # Define the folder path
# folder_path = r'D:\BrownUnivercity\CS2470\final_proj\CS2470_final_project\data\test'
# # load data
# music,tag, align_length = get_music_data(folder_path, label_path)

In [None]:
music = load_music()
label = load_label()

In [None]:
print(len(music))
print(music[0].shape)


In [None]:
# help function
def one_hot_encode(labels, num_classes):
    one_hot_labels = np.zeros((len(labels), num_classes))
    for i, label in enumerate(labels):
        one_hot_labels[i, label - 1] = 1
    return one_hot_labels

# onehot_encode
labels = one_hot_encode(label,4)

In [None]:
print(labels)

## CVAE

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, latent_size, num_classes, dropout_prob=0.5):
        super(Encoder, self).__init__()
        self.fc1 = nn.Linear(input_size + num_classes, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3_mean = nn.Linear(hidden_size, latent_size)
        self.fc3_logvar = nn.Linear(hidden_size, latent_size)
        self.dropout = nn.Dropout(dropout_prob)
        
    def forward(self, x, y):
        # Concatenate input with class information
        # y = y.view(-1, 1)
        x = torch.cat((x, y), dim=1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        
        mean = self.fc3_mean(x)
        # Convert mean to one-hot encoded format
        _, max_indices = mean.max(dim=1)
        mean_one_hot = torch.zeros(mean.size(), dtype=torch.float32)
        mean_one_hot.scatter_(1, max_indices.view(-1, 1), 1)

        logvar = self.fc3_logvar(x)
        _, logvar_indice = logvar.max(dim=1)
        logvar_one_hot = torch.zeros(logvar.size(), dtype=torch.float32)
        logvar_one_hot.scatter_(1, logvar_indice.view(-1, 1), 1)

        return mean_one_hot, logvar_one_hot

class Decoder(nn.Module):
    def __init__(self, latent_size, hidden_size, output_size, num_classes):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(latent_size + num_classes, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        
    def forward(self, z, y):
        # Concatenate latent variable with class information
        z = torch.cat((z, y), dim=1)
        z = F.relu(self.fc1(z))
        reconstruction = self.fc2(z)  
        # Convert reconstruction 
        reconstruction = torch.sigmoid(reconstruction)
        reconstruction = (reconstruction * 87) + 21
        # reconstruction = reconstruction.round().int()
        return reconstruction

class CVAE(nn.Module):
    def __init__(self, input_size, hidden_size, latent_size, output_size, num_classes):
        super(CVAE, self).__init__()
        self.encoder = Encoder(input_size, hidden_size, latent_size, num_classes)
        self.decoder = Decoder(latent_size, hidden_size, output_size, num_classes)
        
    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std
    
    def forward(self, x, y):
        x = x.to(torch.float32)
        y = y.to(torch.float32)
        mean, logvar = self.encoder(x, y)
        # print(f"mean:{mean}")
        z = self.reparameterize(mean, logvar)
        reconstruction = self.decoder(z, y)
        # print(f"reconstruction:{reconstruction}")
        return reconstruction, mean, logvar



In [None]:
input_size = 500 * 3
hidden_size = 256
latent_size = 4
num_classes = 4
output_size = 500 * 3
cvae_model = CVAE(input_size, hidden_size, latent_size, output_size, num_classes)

In [None]:

# Define optimizer
learning_rate = 0.05
optimizer = optim.Adam(cvae_model.parameters(), lr=learning_rate)

# Define loss function
def loss_function(recon_x, x, mu, logvar,labels):
    
    CE = nn.CrossEntropyLoss(reduction='sum')  # Cross-entropy loss
    reconstruction_loss = CE(recon_x, x.view(-1, input_size))
    # KL divergence loss
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    # Conditional loss
    # Convert class indices to one-hot vectors with the same dimensionality as mu
    conditional_loss = torch.mean((labels - mu).pow(2))
    
    return reconstruction_loss + KLD + conditional_loss

# Define your training function
def train(epoch,train_loader,log_interval):
    cvae_model.train()
    train_loss = 0
    for batch_idx, (data, labels) in enumerate(train_loader):
        data = data.view(-1, input_size).to(torch.float32)
        labels = labels.to(torch.float32)
        optimizer.zero_grad()
        recon_batch, mu, logvar = cvae_model(data, labels)
        loss = loss_function(recon_batch, data, mu, logvar,labels)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item() / len(data)))



In [None]:
# Assuming you have defined train_loader (your data loader for training data)

label = torch.tensor(labels)
music = torch.tensor(music)

# data
dataset = TensorDataset(music, label)
dataloader = DataLoader(dataset, shuffle=True)

# Set number of epochs and log interval
num_epochs = 15
log_interval = 10

# Train the model
for epoch in range(num_epochs):
        train(epoch,dataloader,log_interval)

In [None]:
print(f"label_shape:{label.shape}") 
print(f"musci_shape:{music.shape}") 

In [None]:
# Assuming you have a trained decoder model
decoder = Decoder(latent_size=4, hidden_size=256, output_size=500 * 3, num_classes=4)

# Sample from the latent space (you can use any method to sample from a distribution, such as normal distribution)
latent_sample = torch.randn(1, 4)  # Assuming batch size of 1
target_label = [0,1,0,0]
target_label_tensor = torch.tensor(target_label, dtype=torch.float32).unsqueeze(0)
# Pass the sampled latent vectors through the decoder
with torch.no_grad():
    generated_data = decoder(latent_sample,target_label_tensor)

# The generated_data tensor contains the generated data points
generated_data = generated_data.round().int()
generated_data = generated_data.reshape(-1,3)
generated_data = generated_data.tolist()
print(generated_data)

In [None]:
def arr2midi(events, output_file):
    mid = mido.MidiFile()

    # Create track 0 for meta messages
    meta_track = mido.MidiTrack()
    mid.tracks.append(meta_track)

    # Set the tempo to default 120 BPM
    meta_track.append(mido.MetaMessage('set_tempo', tempo=500000))

    # Create track 1 for events
    event_track = mido.MidiTrack()
    mid.tracks.append(event_track)

    # Iterate through the events and convert them to MIDI messages
    for event in events:
        note, velocity, time = event[0], event[1], event[2]
        if note == 0 and velocity == 0 and time == 0:
            break

        note_on = mido.Message('note_on', note=note, velocity=velocity, time=time)
        event_track.append(note_on)

    # Save the MIDI file
    mid.save(output_file)




In [None]:
# MIDI events / list format
events = generated_data

# Output MIDI file name
output_file = "output_label2.mid"

# Reconstruct MIDI and save to file
arr2midi(events, output_file)

In [None]:
# Load the MIDI file
mid = mido.MidiFile('output_label2.mid')

# Iterate over all messages in all tracks
for i, track in enumerate(mid.tracks):
    print(f'Track {i}: {track.name}')
    for msg in track:
        print(msg)