In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [2]:
# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 512 # what is the maximum context length for predictions?
max_iters = 2500
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 140
n_head = 4
n_layer = 4
dropout = 0.3
# ------------

In [3]:
print(device)

cpu


In [4]:
torch.manual_seed(1337)

<torch._C.Generator at 0x105684310>

In [5]:
with open('../finalAssignment_musicDataset/Modified Code/inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()

## Description of the Dataset - ./inputMelodiesAugmented.txt

In [6]:
with open('../finalAssignment_musicDataset/Modified Code/inputMelodiesAugmented.txt', 'r') as file:
    melodies = file.readlines()

In [7]:
melody_tokens = [melody.strip().split() for melody in melodies]

In [8]:
def parse_augmented_data(melody_tokens):
    pitches = []
    durations = []
    for token in melody_tokens:
        if token.startswith("R"):
            pitches.append("R")
            durations.append(int(token[2:-1]))
        else:
            pitch, duration = token.split("(")
            pitches.append(pitch)
            durations.append(int(duration[:-1]))
    return pitches, durations

In [9]:
parsed_data = []
for melody in melodies:
    melody_tokens = melody.strip().split()
    pitches, durations = parse_augmented_data(melody_tokens)
    parsed_data.append((pitches, durations))

In [10]:
pitch_vocab = sorted(set(pitch for pitches, _ in parsed_data for pitch in pitches))
duration_vocab = sorted(set(duration for _, durations in parsed_data for duration in durations))

pitch_to_id = {pitch: idx for idx, pitch in enumerate(pitch_vocab)}
id_to_pitch = {idx: pitch for pitch, idx in pitch_to_id.items()}

duration_to_id = {duration: idx for idx, duration in enumerate(duration_vocab)}
id_to_duration = {idx: duration for duration, idx in duration_to_id.items()}

pitch_vocab_size = len(pitch_vocab)
duration_vocab_size = len(duration_vocab)

In [11]:
def encode_sequence(parsed_data):
    pitch_encoded = [pitch_to_id[p] for p in parsed_data[0]]
    duration_encoded = [duration_to_id[d] for d in parsed_data[1]]
    return pitch_encoded, duration_encoded

encoded_data = [encode_sequence(data) for data in parsed_data]

In [12]:
def decode_sequence(pitch_output, duration_output):
    decoded_sequence = []
    pitch_output = pitch_output.cpu().numpy().flatten()
    duration_output = duration_output.cpu().numpy().flatten()

    for pitch_id, duration_id in zip(pitch_output, duration_output):
        pitch = id_to_pitch.get(pitch_id, "Unknown")
        duration = id_to_duration.get(duration_id, "Unknown")
        
        if pitch == "R":
            decoded_sequence.append(f"R({duration})")
        else:
            decoded_sequence.append(f"{pitch}({duration})")
    
    return decoded_sequence

In [13]:
pitch_data = torch.tensor([p for pitch_seq, _ in encoded_data for p in pitch_seq], dtype=torch.long)
duration_data = torch.tensor([d for _, duration_seq in encoded_data for d in duration_seq], dtype=torch.long)

In [14]:
n = int(0.9 * len(pitch_data))  # first 90% for training, rest for validation
train_pitch_data = pitch_data[:n]
val_pitch_data = pitch_data[n:]
train_duration_data = duration_data[:n]
val_duration_data = duration_data[n:]

In [15]:
def get_batch(split):
    data_pitch = train_pitch_data if split == 'train' else val_pitch_data
    data_duration = train_duration_data if split == 'train' else val_duration_data
    ix = torch.randint(len(data_pitch) - block_size, (batch_size,))
    
    pitch_batch = torch.stack([data_pitch[i:i+block_size] for i in ix])
    duration_batch = torch.stack([data_duration[i:i+block_size] for i in ix])

    pitch_target = torch.stack([data_pitch[i+1:i+block_size+1] for i in ix])
    duration_target = torch.stack([data_duration[i+1:i+block_size+1] for i in ix])
    
    return pitch_batch.to(device), duration_batch.to(device), (pitch_target.to(device), duration_target.to(device))

In [16]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [17]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=True)
        self.query = nn.Linear(n_embd, head_size, bias=True)
        self.value = nn.Linear(n_embd, head_size, bias=True)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

In [18]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [19]:
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [20]:
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [21]:
class GPTMelodyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.pitch_embedding = nn.Embedding(pitch_vocab_size, n_embd)
        self.duration_embedding = nn.Embedding(duration_vocab_size, n_embd)
        self.position_embedding = nn.Embedding(block_size, n_embd)

        self.pitch_weight = nn.Parameter(torch.ones(1))
        self.duration_weight = nn.Parameter(torch.ones(1))

        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)

        self.pitch_head = nn.Linear(n_embd, pitch_vocab_size)
        self.duration_head = nn.Linear(n_embd, duration_vocab_size)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, pitch_idx, duration_idx, targets=None):
        B, T = pitch_idx.shape
        pitch_emb = self.pitch_embedding(pitch_idx)
        duration_emb = self.duration_embedding(duration_idx)
        pos_emb = self.position_embedding(torch.arange(T, device=device))

        x = (self.pitch_weight * pitch_emb) + (self.duration_weight * duration_emb) + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)

        pitch_logits = self.pitch_head(x)
        duration_logits = self.duration_head(x)

        if targets is None:
            return pitch_logits, duration_logits, None

        pitch_target, duration_target = targets
        pitch_loss = F.cross_entropy(pitch_logits.view(-1, pitch_logits.size(-1)), pitch_target.view(-1))
        duration_loss = F.cross_entropy(duration_logits.view(-1, duration_logits.size(-1)), duration_target.view(-1))
        
        loss = 0.7 * pitch_loss + 0.3 * duration_loss
        return pitch_logits, duration_logits, loss

    def generate(self, context, max_new_tokens):
        pitch_idx = context.clone()
        duration_idx = context.clone()

        for _ in range(max_new_tokens):
            pitch_cond = pitch_idx[:, -block_size:]
            duration_cond = duration_idx[:, -block_size:]

            pitch_logits, duration_logits, _ = self(pitch_cond, duration_cond)

            pitch_logits = pitch_logits[:, -1, :]
            duration_logits = duration_logits[:, -1, :]

            pitch_probs = F.softmax(pitch_logits, dim=-1)
            duration_probs = F.softmax(duration_logits, dim=-1)

            pitch_next = torch.multinomial(pitch_probs, num_samples=1)
            duration_next = torch.multinomial(duration_probs, num_samples=1)

            pitch_idx = torch.cat((pitch_idx, pitch_next), dim=1)
            duration_idx = torch.cat((duration_idx, duration_next), dim=1)

        return pitch_idx, duration_idx

In [22]:
model = GPTMelodyModel()
model = model.to(device)

In [23]:
import math

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_iters)

for iter in range(max_iters):
    if iter % eval_interval == 0 or iter == max_iters - 1:
        model.eval()
        with torch.no_grad():
            losses = {}
            perplexities = {}
            for split in ['train', 'val']:
                loss = torch.tensor([model(*get_batch(split))[-1].item() for _ in range(eval_iters)]).mean()
                losses[split] = loss
                perplexities[split] = math.exp(loss)
            print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, "
                 f"train perplexity {perplexities['train']:.4f}, val perplexity {perplexities['val']:.4f}")
        model.train()

    pitch_batch, duration_batch, targets = get_batch('train')
    pitch_logits, duration_logits, loss = model(pitch_batch, duration_batch, targets)
    
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    scheduler.step()

    if iter % eval_interval == 0:
        print(f"Current Learning Rate: {scheduler.get_last_lr()[0]:.6f}")

step 0: train loss 5.2188, val loss 5.1922, train perplexity 184.7104, val perplexity 179.8593
Current Learning Rate: 0.000300
step 500: train loss 1.0176, val loss 1.0451, train perplexity 2.7667, val perplexity 2.8438
Current Learning Rate: 0.000271
step 1000: train loss 0.9134, val loss 0.9510, train perplexity 2.4928, val perplexity 2.5884
Current Learning Rate: 0.000196
step 1500: train loss 0.8706, val loss 0.9151, train perplexity 2.3884, val perplexity 2.4969
Current Learning Rate: 0.000103
step 2000: train loss 0.8509, val loss 0.8989, train perplexity 2.3418, val perplexity 2.4569
Current Learning Rate: 0.000029
step 2499: train loss 0.8453, val loss 0.8932, train perplexity 2.3288, val perplexity 2.4430


In [24]:
context = torch.zeros((1, block_size), dtype=torch.long, device=device)
pitch_output, duration_output = model.generate(context, max_new_tokens=500)

In [25]:
decoded_melody = decode_sequence(pitch_output, duration_output)
print("Decoded Melody:", " ".join(decoded_melody))

Decoded Melody: A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) A2(1) 

# Evaluation Metrics

## Jaccard Similarity

In [27]:
N = 5
max_new_tokens = 500

generated_sequences = []
for _ in range(N):
    context = torch.zeros((1, block_size), dtype=torch.long, device=device)
    pitch_output, duration_output = model.generate(context, max_new_tokens=max_new_tokens)
    pitch_output_sliced=pitch_output.squeeze()[250:]
    duration_output_sliced=pitch_output.squeeze()[250:]
    generated_sequences.append((pitch_output_sliced.tolist(), duration_output_sliced.tolist()))

In [28]:
def calculate_jaccard_similarity(seq1, seq2):
    set1, set2 = set(seq1), set(seq2)
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return intersection / union if union > 0 else 0

In [29]:
def calculate_average_jaccard_similarity(sequences):
    total_similarity = 0
    count = 0
    for i in range(len(sequences)):
        for j in range(i + 1, len(sequences)):
            similarity = calculate_jaccard_similarity(sequences[i], sequences[j])
            total_similarity += similarity
            count += 1
    return total_similarity / count if count > 0 else 0

In [32]:
print(f"Pitch Jaccard Similarity): {pitch_jaccard:.4f}")
print(f"Duration Jaccard Similarity): {duration_jaccard:.4f}")

Pitch Jaccard Similarity): 0.5565
Duration Jaccard Similarity): 0.5565
