In [1]:
from collections import deque

import numpy as np

import torch

In [16]:
# !pip install mido

!pip install pytorch_lightning

Collecting pytorch_lightning
  Downloading pytorch_lightning-2.5.2-py3-none-any.whl.metadata (21 kB)
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.7.3-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch_lightning)
  Downloading lightning_utilities-0.14.3-py3-none-any.whl.metadata (5.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=2.1.0->pytorch_lightning)
  Downloadi

In [5]:
test_midis = ['monteverdi_libri_dei_madrigali_1_5_(c)icking-archive.mid',
              'monteverdi_libri_dei_madrigali_1_10_(c)icking-archive.mid',
              'monteverdi_libri_dei_madrigali_4_12_(c)icking-archive.mid',
              'monteverdi_libri_dei_madrigali_4_13_(c)icking-archive.mid']

In [12]:
import os
from mido import MidiFile

input_folder = 'path/to/your/midi/folder'
output_folder = 'path/to/your/output/folder'

# Make sure the output folder exists
os.makedirs(output_folder, exist_ok=True)

def Midi_File_Input(mids):
    token = []
    for i, track in enumerate(mids.tracks):
        for msg in track:
            if msg.type == 'note_on':
                token.append(f"<{msg.type}_{msg.channel}_{msg.note}_{msg.velocity}>")
            elif msg.type == 'note_off':
                token.append(f"<{msg.type}_{msg.channel}_{msg.note}_{msg.velocity}>")
            # elif msg.type == 'track_name':
            #     token.append(f"<{msg.type}_{msg.name}>")
            elif msg.type == 'control_change':
                token.append(f"<{msg.type}_{msg.channel}_{msg.control}_{msg.value}>")
            elif msg.type == 'program_change':
                token.append(f"<{msg.type}_{msg.program}_>")
            elif msg.type == 'key_signature':
                token.append(f"<{msg.type}_{msg.key}>")
    return token

token_data = {}
all_tokens = []  # Flat list for training


# Process each MIDI file
for filename in test_midis:
    midi_path = '/content/drive/MyDrive/Final Project Folder/Midi Files/' + filename

    midi = MidiFile(midi_path)

    tokens = Midi_File_Input(midi)

    # Add song_start and song_end markers
    song_tokens = ['<song_start>'] + tokens + ['<song_end>']

    # Store in dictionary (per-song)
    token_data[filename] = song_tokens

    # Append to flat list (for generative model training)
    all_tokens.extend(song_tokens)


In [13]:
from collections import Counter

# Flatten all tokens across all songs
all_tokens_flat = [token for tokens in token_data.values() for token in tokens]

# Count and sort tokens (optional for ordering)
token_freq = Counter(all_tokens_flat)

# Assign token ID
vocab = {token: idx for idx, token in enumerate(sorted(token_freq))}

# Optionally store reverse map too:
inv_vocab = {idx: token for token, idx in vocab.items()}

token_ids_data = {}

for filename, tokens in token_data.items():
    token_ids = [vocab[token] for token in tokens]
    token_ids_data[filename] = token_ids

vocab["<pad>"] = 279

In [14]:
vocab

{'<control_change_0_100_0>': 0,
 '<control_change_0_101_0>': 1,
 '<control_change_0_10_104>': 2,
 '<control_change_0_10_24>': 3,
 '<control_change_0_10_44>': 4,
 '<control_change_0_10_64>': 5,
 '<control_change_0_10_84>': 6,
 '<control_change_0_12_3>': 7,
 '<control_change_0_38_0>': 8,
 '<control_change_0_6_12>': 9,
 '<control_change_0_7_100>': 10,
 '<control_change_0_7_101>': 11,
 '<control_change_0_7_102>': 12,
 '<control_change_0_7_103>': 13,
 '<control_change_0_7_104>': 14,
 '<control_change_0_7_105>': 15,
 '<control_change_0_7_106>': 16,
 '<control_change_0_7_107>': 17,
 '<control_change_0_7_108>': 18,
 '<control_change_0_7_109>': 19,
 '<control_change_0_7_110>': 20,
 '<control_change_0_7_111>': 21,
 '<control_change_0_7_112>': 22,
 '<control_change_0_7_113>': 23,
 '<control_change_0_7_114>': 24,
 '<control_change_0_7_115>': 25,
 '<control_change_0_7_116>': 26,
 '<control_change_0_7_117>': 27,
 '<control_change_0_7_118>': 28,
 '<control_change_0_7_83>': 29,
 '<control_change_0_7_8

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader

#from pytorch_lightning import
from pytorch_lightning import Trainer, LightningModule


class DecodeOTrans(LightningModule):
  def __init__(self, d_model, max_length, num_tokens = 100):

    #Here we specify the number of tokens available in the vocabulary,
    #Number of word embeddings per token
    #Max token length
    super().__init__()

    self.we = nn.Embedding(num_embeddings = num_tokens, embedding_dim = d_model) #We creating a word embedding value
    #Embeddings needs to know how many tokens are in the vocabulary and understand the dimension size to represent the embedding

    self.pe = PositionalEncoding(d_model = d_model, max_length = max_length)

    #Then we create a positional encoding object using the class we created earlier

    self.self_attention = Attention(d_model = d_model)

    self.fc_layer = nn.Linear(in_features = d_model, out_features = num_tokens) #This is our fully connected layer also know as our Dense Layer of neurons (RNN neural networks)

    self.loss = nn.CrossEntropyLoss()

  def encode_state(self, token_ids):

    word_embeddings = self.we(token_ids)  # [batch_size, seq_len, d_model]
    positional_encoding = self.pe(word_embeddings)  # add positional encoding

    seq_len = token_ids.size(1)
    #mask = torch.tril(torch.ones((seq_len, seq_len), device=token_ids.device)) == 0

    mask = torch.tril(torch.ones((seq_len, seq_len), device=token_ids.device)).unsqueeze(0) == 0

    self_attention_values = self.self_attention(
        positional_encoding,
        positional_encoding,
        positional_encoding,
        mask=mask
    )

    contextual_embeddings = positional_encoding + self_attention_values

    # Option A: use last token's hidden state
    state_vector = contextual_embeddings[:, -1, :]

    # OR Option B: use mean pooling
    # state_vector = contextual_embeddings.mean(dim=1)

    return state_vector  # [batch_size, d_model]

ModuleNotFoundError: No module named 'pytorch_lightning'

Agent RL + Trans (For Music GPT Adapted)

In [None]:
class MusicDQNAgent:
    def __init__(self, d_model, max_length, num_tokens, num_actions):

      self.encoder = TransformerStateEncoder(d_model, max_length, num_tokens)
      #This is our transformer based encoder we created which takes the tokens and outputs it as dense vectors with positional + attentional mechanism converted into a simple numerical format
      self.q_net = QNet(d_model, 128, num_actions)
      # This is the Q neural network this predicts possible actions from the inputted state.

      self.gamma = 0.99

      #Hyperparameter tells the model to reward future or immediate rewards.
      self.epsilon = 0.1

      #This is the exploration rate and tells the model to explore so choose randomly to further explore the entire environment and not get stuck at local minima.
      #self.memory = deque(maxlen=10000)

      #This stores tuples of (state, action, reward, next_state, done).
      self.optimizer = torch.optim.Adam(list(self.encoder.parameters()) + list(self.q_net.parameters()), lr=1e-4)

      #This is our optimiser Adam here we're balancing the predicted and Q values.

      #Parameters

        #d_model: The dimensionality of each token vector the larger the dimensionality the more detail which is included in the tokens descriptions
        #max_length: maximum sequence length expected by the transformer
        #num_tokens: Total number of unique tokens in my vocabulary
        #num_actions: The number of possible actions the agent can choose from
    def get_state(self, token_sequence):
        # token_sequence: [1, seq_len]
      with torch.no_grad():
          #This tells our encoder to not train anything since we only want the embeddings
          return self.encoder(token_sequence)  # [1, d_model]


    #Here we are defining how we are going to grab the state we are in

    def get_action(self, state_vector):
      if random.random() < self.epsilon:
        #This generates a float between 0 and 1 if the epison is larger then it will got with epsilon
        #Therefore the larger the episoln the less exploration there is
          return random.randint(0, self.q_net.linear2.out_features - 1)
            #If we are exploring then return a random token where num_tokens -1 would be the next one.
      q_values = self.q_net(state_vector)
        #If we are not exploring then use the current state to predict the next state using the Q Net.
      return torch.argmax(q_values).item()

    #     #Here we choose the token with the highest indexed value
    # def remember(self, state, action, reward, next_state, done):

    #   self.memory.append((state, action, reward, next_state, done))

    # def train_short_memory(self, state, action, reward, next_state, done):
    #     self.train_step(state, action, reward, next_state, done)

    # def train_long_memory(self, batch_size=64):
    #     if len(self.memory) < batch_size:
    #         return

    #     mini_batch = random.sample(self.memory, batch_size)

    #     for state, action, reward, next_state, done in mini_batch:
    #         self.train_step(state, action, reward, next_state, done)

    def compute_reward(predicted_token, target_token):
    return 1.0 if predicted_token == target_token else -0.1

    def train_step(self, state, action, reward, next_state, done):

      self.q_net.train()

        #This puts the Q neural network into training mode

      q_values = self.q_net(state)

        #So forward passes through the Q network
      target = q_values.clone().detach()

        #Create a copy of the Q-values tensor.

        #Detach() removes it from the computation graph — this prevents gradients from flowing through the target.

      with torch.no_grad():
          next_q = self.q_net(next_state)
          q_target = reward + self.gamma * torch.max(next_q) * (1 - int(done))

        #We're computing the target Q-value using the Bellman equation:

        target[0, action] = q_target

        #We now replace only the Q-value for the action we actually took with the target value.

        #This tells the network:

        #"You predicted X for action A, but the real value should have been Y — update your weights accordingly."

        loss = F.mse_loss(q_values, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


        #Mean Squared Error between:

        #q_values: what the network predicted

        #target: what it should have predicted (Bellman update)

        #Then we run standard backprop and gradient descent.




In [None]:
agent = MusicDQNAgent(d_model=128, max_length=20, num_tokens=len(vocab), num_actions=len(vocab))
num_episodes = 500

for episode in range(num_episodes):
    current_seq = [vocab["<start>"]]
    done = False

    while not done and len(current_seq) < 20:
        state_tensor = torch.tensor([current_seq], dtype=torch.long)
        state_vector = agent.get_state(state_tensor)

        action = agent.get_action(state_vector)

        next_seq = current_seq + [action]
        next_state_tensor = torch.tensor([next_seq], dtype=torch.long)
        next_state_vector = agent.get_state(next_state_tensor)

        # Dummy reward for now (customize later)
        reward = 1.0 if action == target_sequence[len(current_seq)] else -0.1 if len(current_seq) < len(target_sequence) else -0.5

        done = action == vocab["<end>"] or len(next_seq) >= 20

        #Directly train with this single experience
        agent.train_step(state_vector, action, reward, next_state_vector, done)

        current_seq = next_seq


In [None]:
import torch

# Assume 0 corresponds to <start> in your vocabulary
start_token_id = 0

# Begin with just the start token
generated_sequence = [start_token_id]

max_length = 20  # Set how long you want the output to be

for _ in range(max_length):
    # Convert to tensor: [1, seq_len]
    token_tensor = torch.tensor([generated_sequence], dtype=torch.long)

    # Get encoded state from transformer encoder
    state_vector = agent.get_state(token_tensor)

    # Choose next token (action)
    next_token = agent.get_action(state_vector)

    # Optionally stop if end token is produced (e.g., <end> = 1)
    if next_token == 1:  # Assuming 1 is <end>
        break

    # Add predicted token to sequence
    generated_sequence.append(next_token)

# Decode tokens back to symbols if needed
print("Generated token sequence:", generated_sequence)


Agent (Snakes Game)

In [None]:
# class Agent:
#   def __init__(self):
#     self.epsilon = 0 # Controls the randomness
#     self.gamma = 0 # discount rate
#     self.memort = deque(maxlen = MAX_MEMORY) #If we exceed this memory then it will automatically remove elements from the left

#     # TODO: model, trainer
#     pass
#   def get_state():

#     pass
#   def remember(self, state, action, reward, next_state, done): #It's likelu we will need to change this {done} to be something which is linked to my project like the final token or something
#     pass
#   def train_long_memory(self):
#     pass
#   def train_short_memory(self, state, action, reward, next_state, done):
#     pass
#   def get_action(self, state):
#     pass
#   def train():
#     plot_scores = []
#     plot_mean_scores = []
#     total_score = 0
#     record = 0
#     agent = Agent()
#     while True:
#       #Get old state
#       state_old = agent.get_state(game) #This is our previous state which will be used in my case for progressive training

#       #This will be the previous state
#       #so this will likely have to be adapted where we have append in a list the previous state

#       final_move = agent.get_action(state_old) #This is our action

#       reward, done, score = game.play_step(final_move)
#       state_new = agent.get_state(game)

#       #train short memeory

#       agent.train_short_memory(state_old, final_move, reward, state_new, done)

#       #remember

#       agent.remember(state_old, final_move, reward, state_new, done)

#       if done:
#         #train long memory
#         game.reset()
#         agent.n_games += 1
#         agent.train_long_memory()

#         if score > record:
#           record = score
#           agent.model.save()

#         print('Game', agent.n_games, 'Score', score, 'Record:', record)

#     #This is currently how it's done in the Snake game we would need to adapt this for our needs
#     #I think in our adaptation this would be the some form of validation variable.
#     pass