In [None]:
import os
import subprocess

MIDI_DIR = "/Users/gurjeetkaur/Desktop/ml/lmd_full"
OUTPUT_DIR = "/Users/gurjeetkaur/Desktop/ml/abc_out"

os.makedirs(OUTPUT_DIR, exist_ok=True)

ABC_HEADERS = ("X:", "T:", "M:", "L:", "Q:", "K:", "V:")


ALLOWED_MUSIC_CHARS = set(
    "ABCDEFGabcdefgzZ|[]:0123456789,^_=()/' "
)


BANNED_SUBSTRINGS = {
    "%", "user", "calling", "track",
    "sharps", "flats", "major", "minor",
    ".mid", ".midi"
}

def is_pure_music_line(line: str) -> bool:
    """
    Returns True ONLY if line contains
    pure ABC symbolic music characters.
    """
    return all(c in ALLOWED_MUSIC_CHARS for c in line)


processed = 0
success_count = 0
fail_count = 0

for root, _, files in os.walk(MIDI_DIR):
    rel = os.path.relpath(root, MIDI_DIR)
    out_dir = os.path.join(OUTPUT_DIR, rel)
    os.makedirs(out_dir, exist_ok=True)

    for file in files:
        if not file.lower().endswith((".mid", ".midi")):
            continue

        midi_path = os.path.join(root, file)
        abc_path = os.path.join(out_dir, file.rsplit(".", 1)[0] + ".abc")

        try:
            proc = subprocess.Popen(
                ["midi2abc", midi_path],
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
                text=True
            )

            try:
                stdout, _ = proc.communicate(timeout=10)  
            except subprocess.TimeoutExpired:
                proc.kill()
                fail_count += 1
                continue

            clean_lines = []
            seen_headers = set()

            for line in stdout.splitlines():
                line = line.strip()
                if not line:
                    continue

                lower = line.lower()

                if any(bad in lower for bad in BANNED_SUBSTRINGS):
                    continue

             
                if line.startswith(ABC_HEADERS):
                    tag = line.split(":")[0]
                    if tag not in seen_headers:
                        if tag == "T:":
                            clean_lines.append("T:Unknown")
                        else:
                            clean_lines.append(line)
                        seen_headers.add(tag)
                    continue

                if is_pure_music_line(line):
                    clean_lines.append(line)

            if clean_lines:
                with open(abc_path, "w") as f:
                    f.write("\n".join(clean_lines))
                success_count += 1
            else:
                fail_count += 1

        except Exception:
            fail_count += 1

        processed += 1
        if processed % 100 == 0:
            print(f"Processed {processed} files...")


print("\n===== SUMMARY =====")
print("Processed             :", processed)
print("Successfully converted:", success_count)
print("Failed conversions    :", fail_count)
print("====================")


Processed 100 files...
Processed 200 files...
Processed 300 files...
Processed 400 files...
Processed 500 files...
Processed 600 files...
Processed 700 files...
Processed 800 files...
Processed 900 files...
Processed 1000 files...
Processed 1100 files...
Processed 1200 files...
Processed 1300 files...
Processed 1400 files...
Processed 1500 files...
Processed 1600 files...
Processed 1700 files...
Processed 1800 files...
Processed 1900 files...
Processed 2000 files...
Processed 2100 files...
Processed 2200 files...
Processed 2300 files...
Processed 2400 files...
Processed 2500 files...
Processed 2600 files...
Processed 2700 files...
Processed 2800 files...
Processed 2900 files...
Processed 3000 files...
Processed 3100 files...
Processed 3200 files...
Processed 3300 files...
Processed 3400 files...
Processed 3500 files...
Processed 3600 files...
Processed 3700 files...
Processed 3800 files...
Processed 3900 files...
Processed 4000 files...
Processed 4100 files...
Processed 4200 files...
P

In [None]:
import os

INPUT_DIR = "/Users/gurjeetkaur/Desktop/ml/clean_midi"
OUTPUT_DIR = "/Users/gurjeetkaur/Desktop/ml/abc_out"

valid_files = []
invalid_files = []
total_files = 0 

for root, dirs, files in os.walk(OUTPUT_DIR):
    for file in files:
        if file.endswith(".abc"):
            total_files += 1  
            file_path = os.path.join(root, file)
            with open(file_path, "r") as f:
                content = f.read().strip()
            
            #
            if len(content) == 0 or len(content.split()) < 10:  
                invalid_files.append(file_path)
            else:
                valid_files.append(file_path)

for f in invalid_files:
    os.remove(f)


print(f"Total ABC files processed: {total_files}")
print(f"Valid files: {len(valid_files)}")
print(f"Removed {len(invalid_files)} invalid/empty files")

Total ABC files processed: 192819
Valid files: 182375
Removed 10444 invalid/empty files


In [None]:
import os
import hashlib


MAX_TOKENS = 1024
CLEANED_DIR = "/Users/gurjeetkaur/Desktop/ml/abc_cleaned"
os.makedirs(CLEANED_DIR, exist_ok=True)

total_chunks = 0
files_processed = 0

for file_path in valid_files:
    files_processed += 1
    chunk_tokens = []  
    chunk_index = 1

    base = os.path.basename(file_path)
    file_hash = hashlib.md5(base.encode()).hexdigest()[:10]

 
    with open(file_path, "r") as f:
        for line in f:
            line_tokens = line.strip().split()
            if not line_tokens:
                continue
            chunk_tokens.extend(line_tokens)

           
            while len(chunk_tokens) >= MAX_TOKENS:
                current_chunk = chunk_tokens[:MAX_TOKENS]
                chunk_tokens = chunk_tokens[MAX_TOKENS:]

               
                chunk_filename = f"{file_hash}_chunk{chunk_index}.abc"
                chunk_path = os.path.join(CLEANED_DIR, chunk_filename)
                with open(chunk_path, "w") as cf:
                    cf.write(" ".join(current_chunk))
                chunk_index += 1
                total_chunks += 1

    if chunk_tokens:
        chunk_filename = f"{file_hash}_chunk{chunk_index}.abc"
        chunk_path = os.path.join(CLEANED_DIR, chunk_filename)
        with open(chunk_path, "w") as cf:
            cf.write(" ".join(chunk_tokens))
        total_chunks += 1


print(f"Total files processed: {files_processed}")
print(f"Total chunks created: {total_chunks}")


Total files processed: 182375
Total chunks created: 188371


In [None]:
import os
import json
import random
import re
from collections import Counter
ABC_DIR   = "/Users/gurjeetkaur/Desktop/ml/abc_out"
TOKEN_DIR = "/Users/gurjeetkaur/Desktop/ml/tokenized_note"
os.makedirs(TOKEN_DIR, exist_ok=True)

TRAIN_PATH = os.path.join(TOKEN_DIR, "train.txt")
VAL_PATH   = os.path.join(TOKEN_DIR, "val.txt")
TEST_PATH  = os.path.join(TOKEN_DIR, "test.txt")


TRAIN_P = 0.98
VAL_P   = 0.01

NOTE_REGEX = re.compile(
    r"(\^+|_+|=)?[A-Ga-g][,']*(\d+(/\d+)?)?|z\d*"
)

def extract_notes(line):
    """Extract note/rest tokens from an ABC line."""
    return [m.group() for m in NOTE_REGEX.finditer(line)]

files = []
for root, _, fs in os.walk(ABC_DIR):
    for f in fs:
        if f.endswith(".abc"):
            files.append(os.path.join(root, f))

print("ABC files found:", len(files))

print("Building note-level vocabulary...")
counter = Counter()

for path in files:
    with open(path, "r", errors="ignore") as r:
        for line in r:
            notes = extract_notes(line)
            if notes:
                counter.update(notes)

# OPTIONAL: prune rare notes (usually not needed)
MIN_COUNT = 1
kept_notes = [n for n,c in counter.items() if c >= MIN_COUNT]

vocab = {note: i for i, note in enumerate(sorted(kept_notes))}
vocab["<UNK>"] = len(vocab)

with open(os.path.join(TOKEN_DIR, "vocab.json"), "w") as f:
    json.dump(vocab, f, indent=2)

print("Vocab size:", len(vocab))
print("Tokenizing (note-level)...")

total_tokens = 0
train_tokens = val_tokens = test_tokens = 0

TRAIN = open(TRAIN_PATH, "w")
VAL   = open(VAL_PATH, "w")
TEST  = open(TEST_PATH, "w")

for path in files:
    with open(path, "r", errors="ignore") as r:
        for line in r:
            notes = extract_notes(line)
            if not notes:
                continue

            ids = [str(vocab.get(n, vocab["<UNK>"])) for n in notes]
            out = " ".join(ids) + "\n"

            rnum = random.random()
            if rnum < TRAIN_P:
                TRAIN.write(out)
                train_tokens += len(ids)
            elif rnum < TRAIN_P + VAL_P:
                VAL.write(out)
                val_tokens += len(ids)
            else:
                TEST.write(out)
                test_tokens += len(ids)

            total_tokens += len(ids)

TRAIN.close()
VAL.close()
TEST.close()
print("\n===== TOKENIZATION COMPLETE =====")
print("Total tokens :", total_tokens)
print("Train tokens :", train_tokens)
print("Val tokens   :", val_tokens)
print("Test tokens  :", test_tokens)
print("================================")


ABC files found: 182375
Building note-level vocabulary...
Vocab size: 1734
Tokenizing (note-level)...

===== TOKENIZATION COMPLETE =====
Total tokens : 143325894
Train tokens : 140459152
Val tokens   : 1440188
Test tokens  : 1426554
