# Mount and import

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Base folder path
data_folder = '/content/drive/My Drive/Colab Notebooks/MARG/Deep Realbook/data/'
models_folder = '/content/drive/My Drive/Colab Notebooks/MARG/Deep Realbook/models/'
utils_folder = '/content/drive/My Drive/Colab Notebooks/MARG/Deep Realbook/utils/'

import sys
sys.path.append(utils_folder)

import importlib
import utils, config

importlib.reload(utils)  # Reload the module after making changes
importlib.reload(config)  # Reload the module after making changes

Mounted at /content/drive


<module 'config' from '/content/drive/My Drive/Colab Notebooks/MARG/Deep Realbook/utils/config.py'>

# Preprocess

## Chord sequences

In [None]:
from utils import extract_chords_from_json

file_name = 'playlist.json'
json_file_path = data_folder + file_name
chord_sequences = extract_chords_from_json(json_file_path)

# Print the chord sequences
for song, chords in chord_sequences.items():
    print(f"{song}: {chords}")


In [None]:
import re
from collections import Counter

# Specific tension pairs to compare
specific_tension_pairs = [
    ('', '-#5'),
    ('9b5', '9#5'),
    ('7#11', '7b9b5'),
    ('-b6', '^7'),
    ('-7', '6'),
    ('7b9#9', '13b9'),
    ('13sus', '-11'),
    ('69', '9sus'),
    ('7b9sus', '-69'),
    ('13', '^13'),
    ('sus', '2'),
    ('-6', 'h7'),
    ('7b9#5', 'h9')
]

# Function to extract tensions from a chord
def extract_tensions(chord):
    regex = r"[^A-G][b#]?\d*"
    return re.findall(regex, chord)

# Function to extract and count tensions from all chord sequences
def count_tensions_in_chords(chord_sequences):
    tensions = []
    for sequence in chord_sequences.values():
        for chord in sequence:
            if chord != '|':  # Exclude bar symbols
                tensions.extend(extract_tensions(chord))
    return Counter(tensions)

# Counting tensions in all sequences
all_tensions_count = count_tensions_in_chords(chord_sequences)

# Function to compare tension frequencies for the given pairs
def compare_tension_frequencies(tension_count, pairs):
    comparisons = {}
    for pair in pairs:
        tension1, tension2 = pair
        count1 = tension_count.get(tension1, 0)
        count2 = tension_count.get(tension2, 0)
        comparisons[pair] = (count1, count2)
    return comparisons

# Compare frequencies for the specified tension pairs
tension_pair_comparisons = compare_tension_frequencies(all_tensions_count, specific_tension_pairs)

# Display the results
tension_pair_comparisons


{('', '-#5'): (0, 15),
 ('9b5', '9#5'): (14, 17),
 ('7#11', '7b9b5'): (343, 0),
 ('-b6', '^7'): (58, 9674),
 ('-7', '6'): (17237, 2023),
 ('7b9#9', '13b9'): (0, 0),
 ('13sus', '-11'): (0, 318),
 ('69', '9sus'): (66, 0),
 ('7b9sus', '-69'): (0, 25),
 ('13', '^13'): (297, 3),
 ('sus', '2'): (0, 0),
 ('-6', 'h7'): (1019, 2560),
 ('7b9#5', 'h9'): (0, 7)}

## Vector representations

In [None]:
from utils import create_vector_representation

vector_representations = create_vector_representation(chord_sequences)

for song, vectors in vector_representations.items():
    print(f"{song}: {vectors}")

## Circular representations

In [None]:
from utils import generate_sequence_tokens

# Assuming 'songs' contains your original song data
circular_representations = generate_sequence_tokens(vector_representations)

# Example of how to view the new sequences
for song, circulars in circular_representations.items():
    print(f"{song}: {circulars}")

## Mapped results

In [None]:
from config import tension_intervals_reduced
from utils import chord_to_vector, map_vectors_to_categories, get_lexicographically_smallest_rotation

from itertools import product

# Define the roots and tensions
first_root = 'C'
other_roots = ['C', 'Db', 'D', 'Eb', 'E', 'F', 'Gb', 'G', 'Ab', 'A', 'Bb', 'B']
tensions = list(tension_intervals_reduced.keys())

# Generate all combinations for chord-to-chord mapping with first root as 'C'
chord_to_chord_pairs = [(first_root + tension1, root2 + tension2) for tension1, root2, tension2 in product(tensions, other_roots, tensions)]

# Generate pairs involving 'NC', using 'C' as the root
nc_pairs = [(first_root + tension, 'NC') for tension in tensions] + \
           [('NC', first_root + tension) for tension in tensions] + \
           [('NC', 'NC')]

# Combine both sets of pairs
all_chord_pairs = chord_to_chord_pairs + nc_pairs

# Process each chord pair
mapped_results = []
for first_chord, next_chord in all_chord_pairs:
    vector1 = chord_to_vector(first_chord)
    vector2 = chord_to_vector(next_chord)
    categories = map_vectors_to_categories(vector1, vector2)
    smallest_rotation = get_lexicographically_smallest_rotation(categories)
    mapped_results.append((first_chord, next_chord, smallest_rotation))

# Print some examples from the results
print("First few mapped results:")
for example in mapped_results[:5]:
    print(example)


First few mapped results:
('C', 'C', ['A', 'A', 'A', 'A', 'D', 'A', 'A', 'A', 'D', 'A', 'A', 'D'])
('C', 'C7#11', ['A', 'A', 'A', 'D', 'A', 'C', 'D', 'A', 'A', 'C', 'A', 'D'])
('C', 'C-7', ['A', 'A', 'C', 'A', 'D', 'A', 'A', 'C', 'B', 'A', 'A', 'D'])
('C', 'C13#9', ['A', 'A', 'C', 'D', 'A', 'A', 'D', 'A', 'C', 'C', 'A', 'D'])
('C', 'C69', ['A', 'A', 'D', 'A', 'C', 'A', 'A', 'D', 'A', 'C', 'A', 'D'])


# Save and load preprocessed data

In [None]:
import json

# Save as JSON
with open(data_folder + 'chord_sequences.json', 'w') as file:
    json.dump(chord_sequences, file)

with open(data_folder + 'vector_representations.json', 'w') as file:
    json.dump(vector_representations, file)

with open(data_folder + 'circular_representations.json', 'w') as file:
    json.dump(circular_representations, file)

with open(data_folder + 'mapped_results.json', 'w') as file:
    json.dump(mapped_results, file)

In [None]:
import json

# Load the dictionary
with open(data_folder + 'chord_sequences.json', 'r') as file:
    chord_sequences = json.load(file)

with open(data_folder + 'vector_representations.json', 'r') as file:
    vector_representations = json.load(file)

with open(data_folder + 'circular_representations.json', 'r') as file:
    circular_representations = json.load(file)

with open(data_folder + 'mapped_results.json', 'r') as file:
    mapped_results = json.load(file)

# GPT2 fine_tune

## Import and device

In [None]:
!pip install accelerate -U

import torch
from tokenizers import Tokenizer, models, trainers
from transformers import GPT2LMHeadModel, PreTrainedTokenizerFast, TextDataset, DataCollatorForLanguageModeling, Trainer, TrainingArguments, pipeline

# Check if CUDA is available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Collecting accelerate
  Downloading accelerate-0.27.2-py3-none-any.whl (279 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/280.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m276.5/280.0 kB[0m [31m9.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m280.0/280.0 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: accelerate
Successfully installed accelerate-0.27.2
Using device: cuda


In [None]:
representation_to_use = chord_sequences

## Tokenizer

In [None]:
# Extract unique sequences
unique_sequences = set()

for sequences in representation_to_use.values():
    unique_sequences.update(sequences)

# Write the sequences to a file
with open("unique_sequences.txt", "w") as file:
    for sequence in unique_sequences:
        file.write(sequence + "\n")

In [None]:
# Initialize a tokenizer with BPE
tokenizer = Tokenizer(models.BPE(unk_token="[UNK]"))
trainer = trainers.BpeTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"])

# Train the tokenizer
tokenizer.train(["unique_sequences.txt"], trainer)

# Save the tokenizer
tokenizer.save(models_folder + "custom_tokenizer")

## Config

In [None]:
from config import validation_set

# Split data into training and validation
train_sequences = {k: v for k, v in representation_to_use.items() if k not in validation_set}
validation_sequences = {k: representation_to_use[k] for k in validation_set}

# Write the training sequences to a file
with open("training_sequences.txt", "w") as file:
    for sequences in train_sequences.values():
        file.write(" ".join(sequences) + "\n")

# Write the validation sequences to a file
with open("validation_sequences.txt", "w") as file:
    for sequences in validation_sequences.values():
        file.write(" ".join(sequences) + "\n")

In [None]:
# Load your custom tokenizer
tokenizer = PreTrainedTokenizerFast(tokenizer_file=models_folder + "custom_tokenizer")
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# Load GPT-2 model
model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
model.resize_token_embeddings(len(tokenizer))

# Prepare datasets for training and validation
train_dataset = TextDataset(
    tokenizer=tokenizer,
    file_path="training_sequences.txt",
    block_size=512
)
validation_dataset = TextDataset(
    tokenizer=tokenizer,
    file_path="validation_sequences.txt",
    block_size=512
)
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=False
)

# Update TrainingArguments to include evaluation during training
training_args = TrainingArguments(
    output_dir=models_folder + "gpt2-text-reps_custom_token",
    overwrite_output_dir=True,
    num_train_epochs=100,
    per_device_train_batch_size=4,
    save_steps=1_000,
    save_total_limit=3,
    evaluation_strategy="epoch",  # Evaluate each epoch
)

# Instantiate Trainer with validation dataset
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]



In [None]:
from transformers import TrainerCallback

class SaveOnBestValidationLossCallback(TrainerCallback):
    def __init__(self):
        self.best_loss = float('inf')

    def on_evaluate(self, args, state, control, **kwargs):
        # Check if the current validation loss is better (lower) than the best loss
        if state.log_history:
            current_loss = state.log_history[-1].get('eval_loss')
            if current_loss and current_loss < self.best_loss:
                self.best_loss = current_loss
                # Save the model
                model.save_pretrained(args.output_dir)
                tokenizer.save_pretrained(args.output_dir)

# Add the callback to your trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    callbacks=[SaveOnBestValidationLossCallback()]
)

## Train

In [None]:
trainer.train()

# Save the fine-tuned model
model.save_pretrained(models_folder + "gpt2-text-reps_custom_token_val")

Epoch,Training Loss,Validation Loss
1,No log,0.794945
2,No log,0.703922
3,No log,0.663019
4,0.865800,0.628382
5,0.865800,0.605317
6,0.865800,0.58828
7,0.865800,0.578638
8,0.659400,0.568519
9,0.659400,0.562137
10,0.659400,0.562696


KeyboardInterrupt: 

In [None]:
# Assuming 'training_sequences.txt' is in the current directory
with open("training_sequences.txt", "r") as file:
    total_examples = sum(1 for line in file)

# Now calculate the epoch
batch_size = 4  # From your training configuration
checkpoint_step = 2000  # The step number in your checkpoint name
steps_per_epoch = total_examples / batch_size
estimated_epoch = checkpoint_step / steps_per_epoch
print(f"Checkpoint was approximately at the end of epoch: {estimated_epoch:.2f}")


Checkpoint was approximately at the end of epoch: 6.22


## Inference and decoding

In [None]:
# Replace with your specific checkpoint path
checkpoint_path = models_folder + "gpt2-text-reps_custom_token/checkpoint-2000"  # xxxx should be replaced with the specific checkpoint number

# Load the model from the checkpoint
model = GPT2LMHeadModel.from_pretrained(checkpoint_path)

from transformers import PreTrainedTokenizerFast

#tokenizer = PreTrainedTokenizerFast.from_pretrained(checkpoint_path)

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(1402, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D()
          (c_proj): Conv1D()
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=1402, bias=False)
)

In [None]:
# Create a text generation pipeline using the GPU
generator = pipeline('text-generation', model=model, tokenizer=tokenizer, device=device)

In [None]:
# Define a starting prompt
prompt = "C-7 F7"  # Example starting sequence

# The length you want for the generated part
desired_length = 400

# Calculate the total length including the prompt
total_length = len(tokenizer.encode(prompt)) + desired_length

# Generate text with the specified total length
generated_sequences = [sequence['generated_text'] for sequence in generator(prompt, max_length=total_length, num_return_sequences=3)]

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


In [None]:
print(generated_sequences)

['C-7 F7 | Bb- 7 Eb 7b9 | Ab ^ 7 | Ab- 7 Db 7 | F 9 | Bb- 7 | Eb7#11 D 9 | G- 7 C 7 | F 6 D7b9\n A 7 | G- 7 C 7 | C 7 | G- 7 C 7 | A 7 | Bb- 7 Eb 7 | Ab ^ 7 | A- 7 Ab- 7 Db 7 | G- 7 C 7 | F- 7 Bb 7 | G- 7 D 7#9 | Db 7 C 7 | C- 7 F 7 | F- 7 Bb 7 | Eb ^ 7 | A- 7 Ab- 7 Db 7 | G- 7 C 7 | A 7 | Bb- 7 Eb 7 | Ab ^ 7 | A- 7 Ab- 7 Db 7 | G- 7 C 7 | A 7 | Bb- 7 Eb 7 | Ab ^ 7 | Ab- 7 Db 7 | F 9 | Bb- 7 | Eb7#11 D 9 | G- 7 C 7 | F6\n F- 7 F# o 7 | G- 7 | G- 7 D 7#9 | A- 7 D 7 | F- 7 | Bb 7 | Eb ^ 7 E 7 | Eb ^ 7 | C^ 7 | F- 7 Bb 7 | Eb ^ 7 | E o 7 | F- 7 Db 7 | C- 7 B 7 | Bb- 7 | A h 7 D7b9 | G- 7 | F# o 7 | G- 7 D 7#9', 'C-7 F7 | Bb 6 Bb 7# 5 | Eb ^ 7 F7/ Bb | A 7 Ab 7#11 | Bb 6 D 7 | G- 7 C 7 | G- 7 C 7 | Bb 7 Eb- 7 | D 7 G7b9 | C- 7 F 7 | Bb 6 F7\n Bb 6 | D-7/ C | G / D | G / D | G / D | E 7sus Bb / D | A- 7 | D 7sus | Bb 7#9 | Bb 6 | D-7/ C | G / D | G / D | G / D | G / D | A- 7 | D 7sus | Bb 7#9 | Bb 6 | G- 7 F# 7b13 | E 7sus D 7 | G- 7 F# 7b13 G- 7 | Bb 6 Ab 7 | Bb 6 C 7 | F 6 E- 7 A 7 | D- 7

In [None]:
rearranged_sequences = []

for sequence in generated_sequences:
    # Remove all existing spaces
    sequence_no_spaces = sequence.replace(" ", "")

    new_sequence = ''
    for char in sequence_no_spaces:
        new_sequence += char
        if char in ['E', 'F']:
            new_sequence += ' '

    # Strip trailing space and add to list
    rearranged_sequences.append(new_sequence.strip())

# rearranged_sequences now contains the modified sequences

# Process and transpose the generated sequences
all_transposed_sequences, all_final_sequences = process_and_transpose_sequences(rearranged_sequences, mapped_results)

# Print the final transposed chord stream for each generated sequence
for sequence_index, transposed_chords in enumerate(all_transposed_sequences):
    print(f"Transposed Sequence {sequence_index + 1}:")
    for first_chord, second_chord in transposed_chords:
        print(f"{first_chord} {second_chord}")
    print()  # Print a new line for separation between sequences

# Print the final sequence as a stream of first chords with bar tokens
for sequence_index, final_sequence in enumerate(all_final_sequences):
    print(f"Final Chord Stream {sequence_index + 1}: {' '.join(final_sequence)}")
    print()  # Print a new line for separation between sequences

# Else

In [None]:
formatted_chord_sequences = []

for transposed_chords in all_transposed_sequences:
    # Join the first chord of each pair with the specified format
    formatted_sequence = "|".join([f"{chord[0]}." for chord in transposed_chords]) + "|"
    formatted_chord_sequences.append(formatted_sequence)

# Print the formatted chord sequences
for sequence_index, sequence in enumerate(formatted_chord_sequences):
    print(f"Formatted Sequence {sequence_index + 1}: {sequence}")


Formatted Sequence 1: C-7.|F7.|C7.|A7.|D7.|D-7.|G7.|G-7.|C7.|C-7.|F7.|Bb^7.|F-7.|Bbh7.|Ebh7.|Ab7b9.|Db-7.|Gb7.|Ab-7.|Dbh7.|Gb7b9.|B-7.|E-7.|Dh7.|D-7.|G7.|A-7.|G-7.|F-7.|Bb7.|C-7.|Bb-7.|Eb7.|F-7.|Eb-7.|Db-7.|Gb7.|Db7.|Bb7.|Eb7.|Eb-7.|Ab7.|Db^7.|Ab-7.|
Formatted Sequence 2: C-7.|F7.|Bb-7.|Bb-7.|Eb7.|F-7.|F-7.|Bb7.|C-7.|Co7.|C^7#5.|E^7.|Gb^7.|Eb7.|F^7.|A-7.|D7.|G^7.|G^7.|B-7.|E7.|A^7.|A^7.|Db-7.|Gb7.|Ab-7.|E7.|A-7.|A-7.|D7.|E-7.|E-7.|A7.|B^7.|Eb-7.|Ab7.|Bb-7.|Ab7.|Gb^7.|Gb^7.|Bb-7.|
Formatted Sequence 3: C-7.|F7.|F7.|Bb7.|Bb7.|Eb7.|Eb7.|Ab7.|Ab7.|Db7.|Db7.|Gb7.|Ab-7.|Ab9#5.|Db^7.|C-7.|F7.|F7.|Bb7.|Bb7.|Eb7.|Ab7.|Ab7.|Db7.|Db7.|Gb7.|B7.|B7.|E7.|E7.|A7.|A7.|D7.|D7.|G7.|G7.|C7.|F7.|G^7.|G^7.|Co7.|Co7.|Db-7.|B-7.|A-7.|B-7.|E7.|Gbh7.|B7b9.|E-7.|


In [None]:
import matplotlib.pyplot as plt

# Path to your file
file_path = "chord_sequences.txt"

# Initialize a list to store the lengths
sequence_lengths = []

# Read and tokenize each line in the file
with open(file_path, 'r', encoding='utf-8') as file:
    for line in file:
        tokens = tokenizer.encode(line.strip(), add_special_tokens=False)
        sequence_lengths.append(len(tokens))

# You can now analyze the sequence_lengths list to determine the optimal block size
print("Token lengths of sequences:", sequence_lengths)

# Example analysis: Find the maximum length
max_length = max(sequence_lengths)
print("Maximum token length:", max_length)

# Example analysis: Find the average length
average_length = sum(sequence_lengths) / len(sequence_lengths)
print("Average token length:", average_length)

# Plotting a histogram of the sequence lengths
plt.figure(figsize=(10, 6))
plt.hist(sequence_lengths, bins=20, edgecolor='black')
plt.title("Histogram of Token Lengths of Sequences")
plt.xlabel("Token Length")
plt.ylabel("Frequency")
plt.grid(True)
plt.show()

# Optional: Calculating the variance
variance = sum((x - sum(sequence_lengths) / len(sequence_lengths)) ** 2 for x in sequence_lengths) / len(sequence_lengths)
print(f"Variance of Token Lengths: {variance}")

FileNotFoundError: [Errno 2] No such file or directory: 'chord_sequences.txt'