In [1]:
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Extract Sentiment
zip_path = '/content/drive/My Drive/AAI511_ML/midi_classic_music.zip'
extract_path = '/content/midi_classic_music/'

# Unzip the file.
!unzip -q -n "{zip_path}" -d "{extract_path}"

Mounted at /content/drive


# Keep only the composers mentioned in the final project

Please only do your prediction only for below composers, therefore you need to select the required composers from the given dataset above.

1-Bach

2-Beethoven

3-Chopin

4-Mozart

In [2]:
import shutil

composers = ['Bach', 'Beethoven', 'Chopin', 'Mozart']

for item in os.listdir(extract_path):
    item_path = os.path.join(extract_path, item)
    if os.path.isfile(item_path) and (item.endswith('.mid') or item.endswith('.MID')):
        os.remove(item_path)
        print(f"Remove file at root: {item}")

midi_classic_path = extract_path + 'midiclassics'

for item in os.listdir(midi_classic_path):
    item_path = os.path.join(midi_classic_path, item)
    if os.path.isfile(item_path) and (item.endswith('.mid') or item.endswith('.MID')):
        os.remove(item_path)
        print(f"Remove file at midi_classic_path: {item}")

# remove rest of non-composer files.
for root, dirs, files in os.walk(extract_path, topdown=False):
    for dir_name in dirs:
        if dir_name in composers:
            continue  # Keep this directory

        full_path = os.path.join(root, dir_name)
        contains_desired = False

        for subroot, subdirs, subfiles in os.walk(full_path):
            if any(composer in subdirs for composer in composers):
                contains_desired = True
                break

        if not contains_desired:
            shutil.rmtree(full_path)
            print(f"Removed: {full_path}")

Remove file at root: Tchaikovsky Lake Of The Swans Act 2 14mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 4mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 2 13mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 2 12mov.mid
Remove file at root: Rothchild Symphony Rmw12 2mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 7-8movs.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 5mov.mid
Remove file at root: Tchaicovsky Waltz of the Flowers.MID
Remove file at root: Rothchlid Symphony Rmw12 3mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 3mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 2 10mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 2 11mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 6mov.mid
Remove file at root: Tchaikovsky Lake Of The Swans Act 1 1mov.mid
Remove file at root: Sibelius Kuolema Vals op44.mid
Remove file at root: Tchaikovsky Lake Of The S

# Pre-processing

Convert the musical scores into a format suitable for deep learning models. This involves converting the musical scores into MIDI files and applying data augmentation techniques.

In [3]:
!pip install music21



In [None]:
# previous version, too slow. had to update to faster version.
# import music21
# import glob
# import numpy as np
# import tensorflow as tf
# import warnings
# from collections import Counter
# from tensorflow.keras.utils import to_categorical

# base_path_of_composers = "/content/midi_classic_music/midiclassics/"

# warnings.filterwarnings("ignore", category=music21.midi.translate.TranslateWarning)

# # Search for both lowercase '.mid' and uppercase '.MID'
# search_path_lower = f"{base_path_of_composers}/*.mid"
# search_path_upper = f"{base_path_of_composers}/*.MID"

# files_lower = glob.glob(search_path_lower, recursive=True)
# files_upper = glob.glob(search_path_upper, recursive=True)

# # Combine the two lists to get all files
# all_midi_files = files_lower + files_upper

# print(f"Found {len(all_midi_files)} MIDI files (.mid and .MID).")

# notes = []

# # start pre-processing tasks

# # Loop through each file
# for i, file in enumerate(all_midi_files):
#     try:
#         # --- OPTIMIZATION: Parse the file only ONCE ---
#         print(f"Processing file {i+1}/{len(all_midi_files)}: {file}")
#         midi = music21.converter.parse(file)

#         # --- OPTIMIZATION: Get a list of all note/chord objects once ---
#         # This is much faster than working with the full stream repeatedly.
#         notes_to_transpose = list(midi.flatten().notes)

#         # Now, loop through the transposition intervals and apply them to the extracted notes
#         for transpose_interval in range(-6, 6):
#             # Loop through the list of objects
#             for element in notes_to_transpose:
#                 # Transpose the single element (much faster than transposing the whole stream)
#                 transposed_element = element.transpose(transpose_interval)

#                 # Append the string representation to our master list
#                 if isinstance(transposed_element, music21.note.Note):
#                     notes.append(str(transposed_element.pitch))
#                 elif isinstance(transposed_element, music21.chord.Chord):
#                     notes.append('.'.join(str(n) for n in transposed_element.normalOrder))

#     except Exception as e:
#         print(f"Could not process {file}: {e}")

In [8]:
import music21
import glob
import numpy as np
import multiprocessing as mp
import tensorflow as tf
import warnings
from functools import partial
from collections import Counter
from tensorflow.keras.utils import to_categorical
import pickle

base_path_of_composers = "/content/midi_classic_music/midiclassics/"

warnings.filterwarnings("ignore", category=music21.midi.translate.TranslateWarning)

warnings.filterwarnings("ignore", category=music21.midi.translate.TranslateWarning)

def process_single_file(file_path, transpose_intervals=None):
    """Process a single MIDI file with optional transposition"""
    if transpose_intervals is None:
        transpose_intervals = range(-6, 6)

    notes = []
    try:
        # Parse the file once
        midi = music21.converter.parse(file_path)

        # Extract notes/chords once - only get what we need
        elements = []
        for element in midi.flatten().notes:
            if isinstance(element, music21.note.Note):
                elements.append(('note', element.pitch))
            elif isinstance(element, music21.chord.Chord):
                elements.append(('chord', element.normalOrder))

        # Apply transpositions to the extracted data (much faster)
        for interval in transpose_intervals:
            for elem_type, elem_data in elements:
                if elem_type == 'note':
                    # Transpose pitch directly
                    transposed_pitch = elem_data.transpose(interval)
                    notes.append(str(transposed_pitch))
                elif elem_type == 'chord':
                    # Transpose chord normal order
                    transposed_chord = [(n + interval) % 12 for n in elem_data]
                    notes.append('.'.join(str(n) for n in transposed_chord))

        return notes, None

    except Exception as e:
        return [], str(e)

def process_files_parallel(all_midi_files, num_processes=None, batch_size=50):
    """Process files in parallel with progress tracking"""
    if num_processes is None:
        num_processes = min(mp.cpu_count() - 1, 8)  # Leave one core free, max 8

    print(f"Using {num_processes} processes to process {len(all_midi_files)} files")

    all_notes = []
    errors = []

    # Process in batches to avoid memory issues
    for i in range(0, len(all_midi_files), batch_size):
        batch = all_midi_files[i:i+batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(len(all_midi_files)-1)//batch_size + 1}")

        with mp.Pool(processes=num_processes) as pool:
            results = pool.map(process_single_file, batch)

        # Collect results
        for j, (notes, error) in enumerate(results):
            if error:
                errors.append(f"File {batch[j]}: {error}")
            else:
                all_notes.extend(notes)

        print(f"Processed {min(i+batch_size, len(all_midi_files))}/{len(all_midi_files)} files")

    if errors:
        print(f"Encountered {len(errors)} errors:")
        for error in errors[:5]:  # Show first 5 errors
            print(f"  {error}")
        if len(errors) > 5:
            print(f"  ... and {len(errors)-5} more")

    return all_notes

def optimized_preprocessing(base_path, cache_file="processed_notes.pkl"):
    """Main preprocessing function with caching"""

    # Check if we have cached results
    if os.path.exists(cache_file):
        print(f"Loading cached results from {cache_file}")
        with open(cache_file, 'rb') as f:
            return pickle.load(f)

    # Find MIDI files
    search_patterns = [
        f"{base_path}**/*.mid",
        f"{base_path}**/*.MID",
        f"{base_path}**/*.midi",
        f"{base_path}**/*.MIDI"
    ]

    all_midi_files = []
    for pattern in search_patterns:
        all_midi_files.extend(glob.glob(pattern, recursive=True))

    # Remove duplicates
    all_midi_files = list(set(all_midi_files))
    print(f"Found {len(all_midi_files)} unique MIDI files")

    # Process files
    all_notes = process_files_parallel(all_midi_files)

    # Cache results
    print(f"Caching results to {cache_file}")
    with open(cache_file, 'wb') as f:
        pickle.dump(all_notes, f)

    print(f"Processed {len(all_notes)} total notes/chords")
    return all_notes

# faster version with reduced transpositions
def ultra_fast_preprocessing(base_path, cache_file="processed_notes_fast.pkl"):
    """Ultra-fast version with fewer transpositions"""

    # Find files
    search_patterns = [f"{base_path}**/*.mid", f"{base_path}**/*.MID"]
    all_midi_files = []
    for pattern in search_patterns:
        all_midi_files.extend(glob.glob(pattern, recursive=True))
    all_midi_files = list(set(all_midi_files))

    print(f"Found {len(all_midi_files)} files")

    # Use fewer transpositions for speed (you can adjust this)
    transpose_intervals = [-2, -1, 0, 1, 2]  # Instead of -6 to 6

    process_func = partial(process_single_file, transpose_intervals=transpose_intervals)

    all_notes = []
    batch_size = 100
    num_processes = min(mp.cpu_count() - 1, 6)

    for i in range(0, len(all_midi_files), batch_size):
        batch = all_midi_files[i:i+batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(len(all_midi_files)-1)//batch_size + 1}")

        with mp.Pool(processes=num_processes) as pool:
            results = pool.map(process_func, batch)

        for notes, error in results:
            if not error:
                all_notes.extend(notes)

    # Cache results
    with open(cache_file, 'wb') as f:
        pickle.dump(all_notes, f)

    print(f"Processed {len(all_notes)} total notes/chords")
    return all_notes

# For maximum speed (recommended for 500 files):
notes = ultra_fast_preprocessing(base_path_of_composers)

print(f"Final dataset has {len(notes)} notes/chords")
print(f"Sample notes: {notes[:10]}")

Found 491 files
Processing batch 1/5
Processing batch 2/5
Processing batch 3/5
Processing batch 4/5
Processing batch 5/5
Processed 6742125 total notes/chords
Final dataset has 6742125 notes/chords
Sample notes: ['G#4', 'F5', 'C#5', 'G#4', 'C#3', 'F3', 'G#3', 'F4', 'C#4', 'G#4']


In [9]:
# Create Vocabulary and Convert Notes to Integers

# Count the occurrences of each unique note/chord
note_counts = Counter(notes)
n_vocab = len(note_counts)
print(f"Vocabulary Size (unique notes/chords): {n_vocab}")

# Get all unique pitch names
pitch_names = sorted(note_counts.keys())

# Create a dictionary to map pitches to integers
note_to_int = {note: number for number, note in enumerate(pitch_names)}
sequence_length = 100  # Length of input sequences
network_input = []
network_output = []

# Create input sequences and their corresponding output note
for i in range(0, len(notes) - sequence_length, 1):
    sequence_in = notes[i : i + sequence_length]
    sequence_out = notes[i + sequence_length]
    network_input.append([note_to_int[char] for char in sequence_in])
    network_output.append(note_to_int[sequence_out])

n_patterns = len(network_input)
print(f"Total training patterns: {n_patterns}")

# Reshape input into a format compatible with LSTM layers: (samples, time_steps, features)
network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))

# Normalize the input data to be between 0 and 1
network_input = network_input / float(n_vocab)

# One-hot encode the output data
network_output = to_categorical(network_output, num_classes=n_vocab)

print("\n--- Pre-processing Complete ---")
print(f"network_input shape: {network_input.shape}")
print(f"network_output shape: {network_output.shape}")

Vocabulary Size (unique notes/chords): 1617
Total training patterns: 6742025

--- Pre-processing Complete ---
network_input shape: (6742025, 100, 1)
network_output shape: (6742025, 1617)


In [11]:
# saved preprocessed data for later to save time.

# Ensure drive is mounted
drive.mount('/content/drive')

# Create directory
os.makedirs('/content/drive/MyDrive/AAI511_ML', exist_ok=True)

save_path = '/content/drive/MyDrive/AAI511_ML/preprocessed_composer_data.npz'

print(f"\nSaving pre-processed data to {save_path}...")

np.savez_compressed(
    save_path,
    network_input=network_input,
    network_output=network_output
)

print("Data saved")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

Saving pre-processed data to /content/drive/MyDrive/AAI511_ML/preprocessed_composer_data.npz...
Data saved
