In [None]:
import numpy as np
np.int = int
from pathlib import Path
from tqdm import tqdm
import pretty_midi
import pickle
import random
from collections import defaultdict
from sklearn.preprocessing import LabelEncoder


### Cheking Duplicate:

In [42]:


# List of target composers
target_composers = ['Bach', 'Beethoven', 'Chopin', 'Mozart']
dataset_folder = Path('../data/midi/archive/midiclassics')

dataset = []         # [(file_path, composer)]
seen_filenames = {}  # filename -> path of first occurrence
duplicates = []      # [(filename, duplicate_path, original_path)]

for composer in target_composers:
    composer_folder = dataset_folder / composer

    for file_path in composer_folder.rglob('*'):
        if file_path.suffix.lower() in ['.mid', '.midi']:
            fname = file_path.name

            if fname not in seen_filenames:
                seen_filenames[fname] = str(file_path)
                dataset.append((str(file_path), composer))
            else:
                duplicates.append((fname, str(file_path), seen_filenames[fname]))

# Summary
print(f"Total unique MIDI files: {len(dataset)}")
print(f"Total duplicates found: {len(duplicates)}\n")

if duplicates:
    print("Duplicate files found (filename, duplicate path, original path):\n")
    for fname, dup_path, orig_path in duplicates:
        print(f"{fname}\n  DUP: {dup_path}\n  ORIG: {orig_path}\n")
else:
    print("No duplicates found.")


Total unique MIDI files: 1630
Total duplicates found: 0

No duplicates found.


### MIDI Parsing, Cleaning, and Feature Engineering

In [None]:
# Compatibility for older NumPy versions
if not hasattr(np, 'int'):
    np.int = int
    
target_composers = ['Bach', 'Beethoven', 'Chopin', 'Mozart']

# We’re making a function that takes in the file path of a MIDI file and gives us back a numeric array of features
def midi_to_feature_array(midi_path):

    try:
        # using the pretty_midi library
        pm = pretty_midi.PrettyMIDI(str(midi_path))  # pm will be an object containing all the instruments, notes, tempos, and other music data from that file(PrettyMIDI needs a string, not a path obj)

        # Corrupted files, wrong format will be skipped
    except Exception as e: 
        print(f"Error reading {midi_path}: {e}")
        return None

    # making an empty list rows to hold all the note data we care about
    rows = []
    for inst in pm.instruments: #  we loop over every instrument in the MIDI file.

        #--- preprocessing 1 ----
        if inst.is_drum:
            continue  # drop drums (we want pitched musical notes like C or D,..) 
        # --- feature extraction ----
        for n in inst.notes:
            # keep the raw fields we need (note start/end, pitch, velocity, instrument program)
            rows.append([n.start, n.end, n.pitch, n.velocity, inst.program])
                #region each notes contains
                # n.start -> when the note starts (in seconds).
                # n.end -> when the note ends (in seconds).
                # n.pitch -> the MIDI pitch number (e.g., 60 = Middle C).
                # n.velocity -> how hard the note is played (volume).
                # inst.program -> the MIDI instrument number (0 = piano, 40 = violin, etc.).          
                #endregion each notes contains

    if not rows:
        return None  # nothing useful in this file

    # We convert the list of notes into a NumPy array
    # sorted by x[0] -> start time (so earlier notes come first)
    # then by x[2] -> pitch (so if two notes start at the same time, sort by pitch)
    # This gives us a consistent order for all notes
    notes = np.array(sorted(rows, key=lambda x: (x[0], x[2])), dtype=np.float32)

    # unpack columns for readability
    starts, ends, pitch, vel, prog = notes.T
    dur = ends - starts  # note duration( We calculate how long each note lasts).

    # --- preprocessing  ----
    # filtering out very short notes (less than 0.0001 seconds).
    # remove ultra-short/invalid notes (cleanup)
    # MIDI data often contains ultra-short notes that come from bad noisy conversion from another format, or human input glitches.
    # These “blips” don’t have real musical meaning and can add noise to our  features, making the model’s job harder.
    keep = dur > 1e-4
    if not np.any(keep):
        return None
    # Look at the duration of each note. If it’s long enough, keep that note’s other features (start, pitch, velocity, program) as well. If it’s too short, throw away the whole note row
    starts, dur, pitch, vel, prog = starts[keep], dur[keep], pitch[keep], vel[keep], prog[keep]

    
    # --- feature engineering  ----
    # turning the cleaned data into model-friendly numbers
    
    # time gap between this note and the previous note
    dt = np.diff(starts, prepend=starts[0])
    dt = np.log1p(dt)  # We take the logarithm of the gaps. Because musical timing can vary the can be tiny gaps vs. long pauses.  logs compress the range so big gaps don’t dominate.

    #  pitch change between this note and the previous note.
    interval = np.diff(pitch, prepend=pitch[0]) # Positive = up in pitch, negative = down
    interval = np.clip(interval, -24, 24)  # We limit (clip) pitch changes to ±24 semitones (two octaves) because large jumps are rare, and extreme values can mess with training

    # pc = pitch class -> the position of the note within an octave (0 = C, 1 = C#, … 11 = B).
    # This ignores octave number and focuses on note's name.
    pc = np.mod(pitch, 12)

    # duration relative to time until next note (captures staccato/legato feel)
    tnext = np.r_[starts[1:], starts[-1] + dur[-1]]           # tnext = when the next note starts, For the last note, we pretend the “next start” is at the note’s end.
    time_to_next = np.clip(tnext - starts, 1e-3, None)        # time until the next note, We clip at 1e-3 so we don’t divide by zero later

    # dur_ratio = dur
    # dur_ratio = np.clip(dur / time_to_next, 0., 3.)  # how long a note lasts relative to the gap until the next not.  Near 1 = legato (notes connected). We clip to [0, 3] to avoid extreme values. 
    dur_ratio = np.log1p(dur / time_to_next) # Best result between these three feature was with this, not a significant but slightly better, betther than other experiements although it was a slight improvement
              

    # We stack all our engineered features into a single 2D array
    # plus useful raw-ish signals (velocity, program)
    #dt - > time gap to previous note
    #interval -> pitch change
    #pc -> pitch class
    #dur_ratio -> duration relative to next note #
    feats = np.stack([dt, interval, pc, dur_ratio, vel, prog], axis=1).astype(np.float32)
    return feats

    # Why we ignored pitch?
    # later on we found out using pitch feature dropped the accuracy ( Test Accuracy: dropped from 0.8296 to 0.7996)
    # THis 6‑feature set (pc‑based) was cleaner and simpler.
    # The new file balanced_chunks_seq70_7features.pkl is not “worse code,” but it changes the learning problem—more noise/features without compensating changes (tuning, capacity, or normalization strategies).
    


# Look through a root folder to load every MIDI file for that composer to convert them into feature arrays using midi_to_feature_array
def load_midi_dataset(root_dir, target_composers):
    

    data_dict = {} # Keys will be composer names and values will be lists of feature arrays (one per MIDI file for that composer).
    root_dir = Path(root_dir) # Path gives us nice methods like .rglob() for finding files
    for composer in target_composers: 
        composer_folder = root_dir / composer # making the full path to this composer’s folder.
        if not composer_folder.exists():
            print(f"Warning: Folder not found for {composer}")
            continue

        pieces = [] # empty list to hold all the processed pieces for this composer

        # We search recursively inside this composer’s folder for files ending in .mid or .midi.
        # .rglob("*.mid") finds all .mid files in all subfolders.
        midi_files = list(composer_folder.rglob("*.mid")) + list(composer_folder.rglob("*.midi"))

        for file_path in tqdm(midi_files, desc=f"Processing {composer}"): #  wrap the loop with tqdm so we get a nice progress bar in the terminal.
            arr = midi_to_feature_array(file_path)   # calling midi_to_feature_array to actually load it, clean it, and convert it into a numeric feature array for the model.
            if arr is not None:
                pieces.append(arr)

        data_dict[composer] = pieces # storing the list of that composer’s pieces into our main dictionary under their name.
        # at final we have 2D arrays looks like 
        # [ [dt, interval, pc, dur_ratio, vel, prog, pitch],   # note 1
        # [dt, interval, pc, dur_ratio, vel, prog, pitch],   # note 2
        # ...
        # ]

        print(f"Loaded {len(pieces)} pieces for {composer}")
    return data_dict




In [47]:
# the root directory of all our MIDI data
dataset_folder = Path("../data/midi/archive/midiclassics")
# Calling the above preprocessing and feature engineering functions 
parsed_data = load_midi_dataset(dataset_folder, target_composers)

Processing Bach: 100%|██████████| 1024/1024 [00:30<00:00, 33.30it/s]


Loaded 1024 pieces for Bach


Processing Beethoven:   4%|▍         | 9/213 [00:00<00:06, 31.79it/s]

Error reading ..\data\midi\archive\midiclassics\Beethoven\Anhang 14-3.mid: Could not decode key with 3 flats and mode 255


Processing Beethoven: 100%|██████████| 213/213 [00:34<00:00,  6.23it/s]


Loaded 212 pieces for Beethoven


Processing Chopin: 100%|██████████| 136/136 [00:07<00:00, 19.28it/s]


Loaded 136 pieces for Chopin


Processing Mozart:  61%|██████    | 156/257 [00:15<00:06, 14.74it/s]

Error reading ..\data\midi\archive\midiclassics\Mozart\Piano Sonatas\Nueva carpeta\K281 Piano Sonata n03 3mov.mid: Could not decode key with 2 flats and mode 2


Processing Mozart: 100%|██████████| 257/257 [00:26<00:00,  9.77it/s]

Loaded 256 pieces for Mozart





### Transposition Augmentation, Sliding-Window Chunking (seq_len=70, stride=35), and Class Balancing (Downsample to Smallest Class)

In [48]:

# region Data Augmentation
# without augmentation we faced this result in our evaluation and in confusion matrix with about 20% misunderstood with eachother in evaluation.
#             precision recall    f1-score   support
#Bach         0.82      0.90      0.86       525
#Beethoven    0.64      0.59      0.62       525
#Chopin       0.77      0.86      0.81       525
#Mozart       0.67      0.58      0.62       525
# Why?
# We went for this function because Beethoven and Mozart had fewer or less varied training examples, and in the test results they were often misclassified compared to Bach or Chopin.
# The model struggled to correctly identify Beethoven and Mozart pieces — likely because it didn’t see enough diverse examples during training.

## chunks are arrays where column index 2 is the pitch class (values 0–11)
def transpose_chunk(chunk, semitones):  # If we take a melody and shift all notes up or down by a few semitones, it’s still the same musical pattern, just in a different key.

    out = chunk.copy() # work on a copy so we don’t accidentally modify the original input
    out[:, 2] = np.mod(out[:, 2] + semitones, 12)  # augmentation only; keeps pattern, changes key- adding 'semitones' and wrap with mod 12 so it stays a valid pitch class. Ex: (class is 10(A#) + +4) mod 12 -> 2(D) 
    return out

def maybe_augment_chunks(chunks, label,
                         aug_per_chunk=1, #  we want to help Beethoven and Mozart not enough informative data a bit without exploding dataset size. If it goes in bad result we can add extra +2, +3 for example
                         semis_choices=(-4, -2, 2, 4), # Why shifts = (-4, -2, 2, 4)?  Small transpositions keep phrases in a believable range. Big jumps push lines into outside typical instrument zones. Using both up and down shifts avoids biasing the dataset toward higher or lower keys.
                         targets=("Beethoven", "Mozart")): 
    """
    If the piece's label/composer is in `targets`, make extra versions of each chunk
    by transposing pitch class up/down by a few semitones. 
    """
    # If this label isn’t in our target list (or augmentation count is 0), do nothing.
    if label not in targets or aug_per_chunk <= 0:
        return chunks
    out = []
    for ch in chunks:
        out.append(ch)  # Always keep the original
        for _ in range(aug_per_chunk):
            s = random.choice(semis_choices) # randomly pick a shift like -4, -2, +2, +4 semitones
            out.append(transpose_chunk(ch, s))  # add a transposed copy ( same shape, but new key)
    return out


# endregion Data Augmentation


# We already parsed the MID files (load_midi_dataset)
# We exatraced engineered features dt, interval, pc, dur_ratio, velocity, program
# given a parsed_data dictionary in conclusion in the above cell where we have data as result like this: { "Bach": [piece1_array, piece2_array, …], "Beethoven": ....}

# This functuin takes the parsed data and cuts it into fixed-length chunks 
def create_balanced_chunks(data_dict,
                           seq_len=70, # 70 is actually grounded in both EDA evidence and model performance testing ( Bach has some very short pieces, 100: too long -> more padding for short pieces , 50: too short that chunks don’t carry enough musical structure)
                           stride=35, # This creates overlap between chunks. First chunk: notes 0 -> 69, second chunk: 35 ->104, ... without overlapping we might miss patterns, with small stride, we have many chunk similar to each other and after playing with this we found it more fair
                           min_real_notes=50, # with a sq=70 If a chunk has fewer than 50 real notes, it gets thrown away( zero-paddings for example) 
                           max_chunks_per_piece=20, # is a cap to stop any single music piece from giving too many chunks and dominating the dataset
                           aug_per_chunk=1, # For each original chunk that qualifies for augmentation (Beethoven and Mozart in our case) create one extra augmented copy by shifting he pitch classes with below setting 
                           aug_semis=(-4, -2, 2, 4),
                           aug_targets=("Beethoven", "Mozart")):


    all_chunks = defaultdict(list) # each new key automatically starts with an empty list {'Bach': ['chunk1', 'chunk2'], 'Mozart': ['chunkA'] # } with a normal chunks_by_composer = {} I faced error "'Bach' key doesn't exist yet"

    # Loop over composers and their pieces (parsed_data dict)
    for composer, pieces in data_dict.items():
        for piece in pieces:
            n_notes = len(piece) # number of rows (notes/events) in this piece
            chunks_for_piece = [] # collecting this piece’s chunks before adding to the composer bucket

            # PREPROCESS: handle short pieces by padding to seq_len
            if n_notes < seq_len:
                pad_len = seq_len - n_notes # how many rows we need to reach seq_len
                padded = np.vstack([piece, np.zeros((pad_len, piece.shape[1]))]) # pad with zeros; zeros mean "no note" and are safe (can be masked later)
                # only keep if the piece still has enough real notes to matter
                if n_notes >= min_real_notes:
                    chunks_for_piece.append(padded)

            else:
                # PREPROCESS: sliding window to make fixed-length chunks
                #start positions:0, starting index of the chunk inside the piece
                # how far we slide the window forward after each chunk (35 here)
                for start in range(0, n_notes - seq_len + 1, stride): 
                    chunk = piece[start:start + seq_len] # This grabs seq_len rows from the piece. Each row is a note with all its features

                    #Checking how many real notes (non-zero rows) are inside the chunk, at least min_real_notes (50 here).
                    if np.count_nonzero(np.any(chunk != 0, axis=1)) >= min_real_notes:
                        chunks_for_piece.append(chunk)

            # if this composer is in the augmentation target list (Beethoven, Mozart that already we have) make extra versions of each chunk by shifting their pitch classes up or down
            chunks_for_piece = maybe_augment_chunks(
                chunks_for_piece, composer,
                aug_per_chunk=aug_per_chunk,
                semis_choices=aug_semis,
                targets=aug_targets
            )

            # cap the number of chunks per piece
            if len(chunks_for_piece) > max_chunks_per_piece:
                chunks_for_piece = random.sample(chunks_for_piece, max_chunks_per_piece)

            # collect for this composer
            all_chunks[composer].extend(chunks_for_piece)

    # handling imbalanced data: class balancing by downsampling to smallest class size among all composers
    min_count = min(len(chunks) for chunks in all_chunks.values())
    balanced_chunks = []
    for composer, chunks in all_chunks.items():
        sampled = random.sample(chunks, min_count)  # downsample to balance classes
        balanced_chunks.extend([(chunk, composer) for chunk in sampled]) # Adding them all to the balanced_chunks list in (features, label) format

    # Shuffling the final dataset so that the order of composers and chunks is mixed
    random.shuffle(balanced_chunks) 
    return balanced_chunks



# -------------------------------------
# call to run the whole preprocessing + augmentation + balancing pipeline we just broke down
balanced_dataset = create_balanced_chunks(
    parsed_data, 
    seq_len=70, stride=35, min_real_notes=50, max_chunks_per_piece=20,
    aug_per_chunk=1, aug_semis=(-4, -2, 2, 4), aug_targets=("Beethoven", "Mozart")
)


# saving-to-disk step so we don’t have to re-run the whole preprocessing pipeline every time. 
pkl_name = "../models/RNN/balanced_chunks_seq70.pkl"
with open(pkl_name, "wb") as f:
    pickle.dump(balanced_dataset, f)
print(f"Balanced dataset saved to {pkl_name} with {len(balanced_dataset)} chunks.")


Balanced dataset saved to ../models/RNN/balanced_chunks_seq70.pkl with 10152 chunks.


### Feature Distribution Sanity Check (min/median/95th/max for dt, interval, pc, dur_ratio, velocity, program)

In [58]:
# Peek at distributions (sanity check)
with open("../data/note_sequences/balanced_chunks_seq70.pkl", "rb") as f:
    balanced_dataset = pickle.load(f)

stacked = np.vstack([c for c,_ in balanced_dataset])  # 
names = ["dt","interval","pc","dur_ratio","velocity","program"]
for i,n in enumerate(names):
    col = stacked[:,i]
    col = col[np.isfinite(col)]
    print(f"{n:<9} min={col.min():.4f} p50={np.median(col):.4f} "
          f"p95={np.percentile(col,95):.4f} max={col.max():.4f}")
# p50 = the median
# p95 = the 95th percentile
# If max ≫ p95, w’ve got outliers.
# dt: time gap to the previous note (seconds, log1p-scaled). Smaller = faster notes; larger = pauses.
# interval: pitch change from previous note (semitones, clipped [-24, 24]). + up, - down.
# pc: pitch class 0–11 (C=0 … B=11). Octave ignored (note name only).
# dur_ratio: better than raw duration and no-duration features, and avoids hard-cap saturation.
# velocity: how hard the note is played (MIDI 1–127). Bigger = louder.
# program: MIDI instrument program (0–127). 0 = Acoustic Grand Piano.

dt        min=0.0000 p50=0.0038 p95=0.3483 max=2.3840
interval  min=-24.0000 p50=3.0000 p95=21.0000 max=24.0000
pc        min=0.0000 p50=5.0000 p95=11.0000 max=11.0000
dur_ratio min=0.0052 p50=3.8501 p95=6.7548 max=10.3669
velocity  min=1.0000 p50=74.0000 p95=116.0000 max=127.0000
program   min=0.0000 p50=0.0000 p95=68.0000 max=120.0000


### Chunk Count by Composer (post-balancing)

In [51]:


# Count the number of chunks per composer
unique_labels, counts = np.unique(y, return_counts=True)

for label, count in zip(unique_labels, counts):
    print(f"{label}: {count} chunks")


Bach: 2538 chunks
Beethoven: 2538 chunks
Chopin: 2538 chunks
Mozart: 2538 chunks
