<a href="https://colab.research.google.com/github/alleyibrahim/modus/blob/main/EDmodel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Convert audio to midi files

In [None]:
from google.colab import drive
import zipfile
import os

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Define the path to the zip file and the extraction directory
zip_path = '/content/drive/MyDrive/DEAMds/archive.zip'
extract_dir = '/content/drive/MyDrive/DEAMds'

# Extract the zip file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

# Verify the extraction
os.listdir(extract_dir)


In [None]:
pip install basic-pitch


In [None]:
pip install pydub

In [None]:
pip install librosa

In [None]:
pip install pretty_midi

In [None]:
import os
from basic_pitch.inference import predict_and_save

# Define directories
input_audio_dir = '/content/drive/MyDrive/DEAMds/DEAM_audio/MEMD_audio'
output_midi_dir = '/content/drive/MyDrive/DEAMds/ProcessedFiles/midi_files'
model_path = '/content/drive/MyDrive/DEAMds/ProcessedFiles/basic_pitch/nmp.tflite'

# Convert MP3 to MIDI using Basic Pitch
audio_files = [f for f in os.listdir(input_audio_dir) if f.endswith('.mp3')]

print(f"Found {len(audio_files)} MP3 files in {input_audio_dir}")

for audio_file in audio_files:
    input_audio_path = os.path.join(input_audio_dir, audio_file)
    output_directory = output_midi_dir  # Output directory for all files

    try:
        print(f"Predicting MIDI for {input_audio_path}")

        # Perform the conversion
        predict_and_save(

            [input_audio_path],  # List of input audio paths
            output_directory,    # Output directory for saving
            save_midi=True,
            sonify_midi=False,
            save_model_outputs=False,
            save_notes=False,
            model_or_model_path=model_path
        )
        print(f"Converted {input_audio_path} to MIDI in {output_directory}")

    except Exception as e:
        print(f"Error processing {input_audio_path}: {e}")

print("Conversion process completed.")


# Combine features and annotations

In [None]:
import pandas as pd
import os

def process_and_merge_batch(batch_files, features_dir, static_annotations_df, dynamic_valence_df, dynamic_arousal_df):
    batch_list = []

    for feature_file in batch_files:
        song_id = os.path.splitext(feature_file)[0]
        df = pd.read_csv(os.path.join(features_dir, feature_file))
        df['song_id'] = song_id
        batch_list.append(df)

    batch_df = pd.concat(batch_list, ignore_index=True)

    # Ensure song_id is of type string
    batch_df['song_id'] = batch_df['song_id'].astype(str)

    # Merge with static annotations
    merged_batch_df = batch_df.merge(static_annotations_df, on='song_id', how='inner')

    # Merge with dynamic annotations
    merged_batch_df = merged_batch_df.merge(dynamic_valence_df, on='song_id', how='inner')
    merged_batch_df = merged_batch_df.merge(dynamic_arousal_df, on='song_id', how='inner')

    return merged_batch_df

def load_and_merge_features_in_batches(features_dir, static_annotations_df, dynamic_valence_df, dynamic_arousal_df, max_files=500, batch_size=100):
    feature_files = [f for f in os.listdir(features_dir) if f.endswith('.csv')][:max_files]  # Limit to first 500 files
    output_file = '/content/drive/MyDrive/DEAMds/ProcessedFiles/combined_features_annotations.csv'

    first_batch = True
    for i in range(0, len(feature_files), batch_size):
        batch_files = feature_files[i:i + batch_size]
        merged_batch_df = process_and_merge_batch(batch_files, features_dir, static_annotations_df, dynamic_valence_df, dynamic_arousal_df)

        # Append to the final output file
        if first_batch:
            merged_batch_df.to_csv(output_file, index=False, mode='w')
            first_batch = False
        else:
            merged_batch_df.to_csv(output_file, index=False, mode='a', header=False)

        del merged_batch_df  # Free up memory

    return output_file

# Paths to features and annotations
features_dir = '/content/drive/MyDrive/DEAMds/features/features'
static_annotations_1_path = '/content/drive/MyDrive/DEAMds/DEAM_Annotations/annotations/annotations_averaged_per_song/song_level/static_annotations_averaged_songs_1_2000.csv'
static_annotations_2_path = '/content/drive/MyDrive/DEAMds/DEAM_Annotations/annotations/annotations_averaged_per_song/song_level/static_annotations_averaged_songs_2000_2058.csv'
dynamic_valence_path = '/content/drive/MyDrive/DEAMds/DEAM_Annotations/annotations/annotations_averaged_per_song/dynamic/valence.csv'
dynamic_arousal_path = '/content/drive/MyDrive/DEAMds/DEAM_Annotations/annotations/annotations_averaged_per_song/dynamic/arousal.csv'

# Load Static Annotations (Averaged per Song)
static_annotations_1 = pd.read_csv(static_annotations_1_path)
static_annotations_2 = pd.read_csv(static_annotations_2_path)
static_annotations_df = pd.concat([static_annotations_1, static_annotations_2], ignore_index=True)

# Ensure song_id is of type string
static_annotations_df['song_id'] = static_annotations_df['song_id'].astype(str)

# Load Dynamic Annotations (Averaged per Song)
dynamic_valence_df = pd.read_csv(dynamic_valence_path)
dynamic_arousal_df = pd.read_csv(dynamic_arousal_path)

# Ensure song_id is of type string
dynamic_valence_df['song_id'] = dynamic_valence_df['song_id'].astype(str)
dynamic_arousal_df['song_id'] = dynamic_arousal_df['song_id'].astype(str)

# Process and merge features in batches, limiting to the first 500 files
output_file = load_and_merge_features_in_batches(features_dir, static_annotations_df, dynamic_valence_df, dynamic_arousal_df, max_files=500)

print(f"Combined features and annotations for the first 500 files saved to {output_file}")


Combined features and annotations for the first 500 files saved to /content/drive/MyDrive/DEAMds/ProcessedFiles/combined_features_annotations.csv


# Data Preprocessing

In [None]:
import pandas as pd

# Load the combined dataframe
combined_csv_path = '/content/drive/MyDrive/DEAMds/ProcessedFiles/combined_features_annotations.csv'
combined_df = pd.read_csv(combined_csv_path)

# Drop columns with all null values
combined_df.dropna(axis=1, how='all', inplace=True)

# Drop rows where all values are null
combined_df.dropna(axis=0, how='all', inplace=True)

# Identify time-series columns based on the naming pattern "sample_"
time_series_cols = [col for col in combined_df.columns if col.startswith('sample_')]

# Identify non-time-series columns
non_time_series_cols = [col for col in combined_df.columns if col not in time_series_cols]

# Handle missing values in time-series data using forward fill, then backward fill
combined_df[time_series_cols] = combined_df[time_series_cols].fillna(method='ffill').fillna(method='bfill')

# Handle missing values in other data by replacing with 0
combined_df[non_time_series_cols] = combined_df[non_time_series_cols].fillna(0)

# Verify if all null values are handled
print(f"Remaining null values:\n{combined_df.isnull().sum().sum()}")

# Save cleaned DataFrame to a new CSV file
# save_path_cleaned = '/content/drive/MyDrive/DEAMds/ProcessedFiles/combined_annotations_and_features_cleaned.csv'
# combined_df.to_csv(save_path_cleaned, index=False)

# print(f"Cleaned and combined DataFrame saved to {save_path_cleaned}")


  combined_df[time_series_cols] = combined_df[time_series_cols].fillna(method='ffill').fillna(method='bfill')


Remaining null values:
0


In [None]:
!pip install miditoolkit numpy pandas scikit-learn tensorflow


Collecting miditoolkit
  Downloading miditoolkit-1.0.1-py3-none-any.whl.metadata (4.9 kB)
Downloading miditoolkit-1.0.1-py3-none-any.whl (24 kB)
Installing collected packages: miditoolkit
Successfully installed miditoolkit-1.0.1


# Extract midi features and merge with training data features


In [None]:
import miditoolkit
import numpy as np

def extract_midi_features(midi_file):
    # Load MIDI file
    midi_obj = miditoolkit.MidiFile(midi_file)
    notes = midi_obj.instruments[0].notes

    # Calculate total duration of the MIDI file
    total_duration = max(note.end for note in notes) if notes else 0

    # Original Features
    pitches = [note.pitch for note in notes]
    durations = [note.end - note.start for note in notes]
    velocities = [note.velocity for note in notes]
    tempo = midi_obj.tempo_changes[0].tempo if midi_obj.tempo_changes else 120  # Default tempo
    pitch_classes = [pitch % 12 for pitch in pitches]

    # *** New Features ***
    pitch_class_histogram = np.histogram(pitch_classes, bins=np.arange(13))[0]
    note_density = len(notes) / total_duration if total_duration > 0 else 0
    melodic_intervals = np.diff(pitches) if len(pitches) > 1 else []

    features = {
        'mean_pitch': np.mean(pitches) if pitches else 0,
        'std_pitch': np.std(pitches) if pitches else 0,
        'mean_duration': np.mean(durations) if durations else 0,
        'std_duration': np.std(durations) if durations else 0,
        'mean_velocity': np.mean(velocities) if velocities else 0,
        'std_velocity': np.std(velocities) if velocities else 0,
        'tempo': tempo,
        'note_density': note_density,
        'mean_melodic_interval': np.mean(melodic_intervals) if len(melodic_intervals) > 0 else 0,
        'std_melodic_interval': np.std(melodic_intervals) if len(melodic_intervals) > 0 else 0,
    }

    # *** Add pitch class histogram ***
    for i in range(12):
        features[f'pitch_class_{i}'] = pitch_class_histogram[i]

    return features


In [None]:
midi_dir = '/content/drive/MyDrive/DEAMds/ProcessedFiles/midi_files'
combined_csv_path = '/content/drive/MyDrive/DEAMds/ProcessedFiles/combined_features_annotations.csv'
combined_df = pd.read_csv(combined_csv_path)

# Extract features for each MIDI file
midi_features_list = []
for song_id in combined_df['song_id'].unique():
    midi_file_path = os.path.join(midi_dir, f"{song_id}_basic_pitch.mid")
    if os.path.exists(midi_file_path):
        features = extract_midi_features(midi_file_path)
        features['song_id'] = song_id
        midi_features_list.append(features)

midi_features_df = pd.DataFrame(midi_features_list)
midi_features_df.to_csv('/content/drive/MyDrive/DEAMds/ProcessedFiles/midi_features_new.csv', index=False)

# Merge with existing combined_df based on song_id
combined_df = pd.merge(combined_df, midi_features_df, on='song_id', how='inner')
combined_df.to_csv('/content/drive/MyDrive/DEAMds/ProcessedFiles/midi_f_a_f_new.csv', index=False)

# Clean And Prepare Data

In [None]:
# Step 5: Clean and Prepare Data
import pandas as pd
import numpy as np
import gc

# Load combined DataFrame
combined_csv_path = '/content/drive/MyDrive/DEAMds/ProcessedFiles/midi_f_a_f_new.csv'
combined_df = pd.read_csv(combined_csv_path)


In [None]:
# Convert columns to numeric
combined_df.columns = combined_df.columns.str.strip()

def convert_to_numeric_chunked(df, chunk_size=100000):
    numeric_dfs = []
    columns = df.columns
    for i in range(0, len(df), chunk_size):
        chunk = df.loc[i:i + chunk_size - 1].copy()
        for col in columns:
            chunk[col] = pd.to_numeric(chunk[col], errors='coerce')
        numeric_dfs.append(chunk)
        del chunk
        gc.collect()
    numeric_df = pd.concat(numeric_dfs)
    return numeric_df

combined_df_numeric = convert_to_numeric_chunked(combined_df)
combined_df_numeric.to_csv('/content/drive/MyDrive/DEAMds/ProcessedFiles/final_df_new_try2.csv', index=False)
del combined_df
gc.collect()


NameError: name 'combined_df' is not defined

# Model (RNN)

## Train test split

In [None]:
import pandas as pd
import numpy as np
import gc

# Step 6: Train New Model
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Load cleaned DataFrame
final_csv_path = '/content/drive/MyDrive/DEAMds/ProcessedFiles/final_df_new_try2.csv'
final_df = pd.read_csv(final_csv_path)

# Extract features and labels
X = final_df.drop(columns=['song_id', 'valence_mean', 'valence_std', 'arousal_mean', 'arousal_std']).values

# Convert to DataFrame if it's already not a DataFrame
X_df = pd.DataFrame(X)

# Fill NaN values with 0
X_df = X_df.fillna(0)

# Convert back to NumPy array if necessary
X = X_df.values

Y = final_df[['valence_mean', 'valence_std', 'arousal_mean', 'arousal_std']].values

# Scale features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(X)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features_scaled, Y, test_size=0.2, random_state=42)



## MODEL Development

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Masking, SimpleRNN, Dense
from tensorflow.keras.models import Model

# Define the input layer
inputs = Input(shape=(X_train.shape[1], 1))

# Apply Masking
x = Masking(mask_value=0.0)(inputs)

# Add RNN layers
x = SimpleRNN(64, return_sequences=True, name='simple_rnn_24')(x)
x = SimpleRNN(64, name='simple_rnn_25')(x)

# Add Dense output layer
outputs = Dense(4, name='dense_12')(x)

# Create the model
model = Model(inputs, outputs)

# Compile the model (necessary for some operations but not always for building the model)
model.compile(optimizer='adam', loss='mse')


In [None]:
# Reshape data for RNN (add time dimension)
X_train_rnn = np.expand_dims(X_train, axis=-1)
X_test_rnn = np.expand_dims(X_test, axis=-1)

# Train the model
history = model.fit(X_train_rnn, y_train, epochs=10, batch_size=1024, validation_split=0.2)

# Evaluate the model
loss = model.evaluate(X_test_rnn, y_test)


Epoch 1/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 661ms/step - loss: 5.9284 - val_loss: 0.8450
Epoch 2/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 635ms/step - loss: 0.7713 - val_loss: 0.8361
Epoch 3/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 634ms/step - loss: 0.7404 - val_loss: 0.6362
Epoch 4/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 636ms/step - loss: 0.7090 - val_loss: 0.6379
Epoch 5/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 637ms/step - loss: 0.6194 - val_loss: 0.6084
Epoch 6/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 637ms/step - loss: 0.5986 - val_loss: 0.5315
Epoch 7/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 636ms/step - loss: 0.5595 - val_loss: 0.5226
Epoch 8/10
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 635ms/step - loss: 0.5190 - val_loss: 0.4888
Epoch 9/10
[1m79/79[0m [32m━━

In [None]:
# Save the trained initial model
model.save('/content/drive/MyDrive/grandpa_model.keras')


In [None]:
model.summary()

## Extract features to transfer knowledge to second model

In [None]:
from tensorflow.keras.models import Model

# Define the feature extractor model
layer_name = 'simple_rnn_25'
feature_extractor = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)

# Now, the feature_extractor model will output the embeddings from the 'simple_rnn_25' layer


In [None]:
# Assuming X_train_rnn is your training data
# Extract features using the feature extractor model
X_train_features = feature_extractor.predict(X_train_rnn)

# Now X_train_features will have the output from the 'simple_rnn_25' layer
print(X_train_features.shape)  # Should print something like (number_of_samples, 64)


[1m3122/3122[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m377s[0m 121ms/step
(99894, 64)


# Model 2 (DNN)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import backend as K

# Custom activation function to ensure non-negative predictions
def custom_activation(x):
    return K.maximum(x, 0.37)  # Ensure minimum value is 0.37

# Define the model
second_model = Sequential([
    Input(shape=(22,)),  # Input shape matches the number of MIDI features
    Dense(32, activation='relu'),  # First dense layer
    Dense(16, activation='relu'),  # Second dense layer
    Dense(4, activation=custom_activation)  # Output layer with custom activation
])



In [None]:
# Transfer weights from the complex model to the simpler model
for complex_layer, simpler_layer in zip(model.layers[:-1], second_model.layers[:-1]):
    if isinstance(complex_layer, Dense) and isinstance(simpler_layer, Dense):
        # Transfer weights
        simpler_layer.set_weights(complex_layer.get_weights())


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Extract features and labels
X = final_df[['mean_pitch', 'std_pitch', 'mean_duration', 'std_duration',
                 'mean_velocity', 'std_velocity', 'tempo', 'note_density',
                 'mean_melodic_interval', 'std_melodic_interval',
                 'pitch_class_0', 'pitch_class_1', 'pitch_class_2', 'pitch_class_3',
                 'pitch_class_4', 'pitch_class_5', 'pitch_class_6', 'pitch_class_7',
                 'pitch_class_8', 'pitch_class_9', 'pitch_class_10', 'pitch_class_11']].values

Y = final_df[['valence_mean', 'valence_std', 'arousal_mean', 'arousal_std']].values

# Check for consistent lengths
print(f"Features shape: {X.shape}")
print(f"Labels shape: {Y.shape}")

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X,       # Feature columns
    Y,       # Target columns
    test_size=0.2,  # 20% of data for testing
    random_state=42 # For reproducibility
)

# Optionally, split the training set into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(
    X_train,        # Features
    y_train,        # Labels
    test_size=0.2,  # 20% of training data for validation
    random_state=42 # For reproducibility
)


Features shape: (124868, 22)
Labels shape: (124868, 4)


In [None]:
from tensorflow.keras.callbacks import EarlyStopping

# Compile the simpler model
second_model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Define the early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)


# Train the simpler model with MIDI features
second_model.fit(
    X_train,                # Use the training features (midi_features)
    y_train,                # Use the training labels
    epochs=50,              # Number of epochs
    batch_size=1024,        # Batch size
    validation_data=(X_val, y_val),  # Validation data (features and labels)
     callbacks=[early_stopping]
)

# Evaluate the simpler model
val_loss, val_mae = second_model.evaluate(X_test, y_test)  # Use test set for evaluation
print(f"Test Loss: {val_loss}, Test MAE: {val_mae}")


Epoch 1/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 21ms/step - loss: 183.3622 - mae: 4.2765 - val_loss: 10.8303 - val_mae: 2.7016
Epoch 2/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 10.8371 - mae: 2.6986 - val_loss: 10.8179 - val_mae: 2.6985
Epoch 3/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 10.8668 - mae: 2.7010 - val_loss: 10.8037 - val_mae: 2.6954
Epoch 4/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 10.8459 - mae: 2.6962 - val_loss: 10.7927 - val_mae: 2.6922
Epoch 5/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 10.8018 - mae: 2.6876 - val_loss: 10.7718 - val_mae: 2.6857
Epoch 6/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 10.7972 - mae: 2.6854 - val_loss: 10.7677 - val_mae: 2.6835
Epoch 7/50
[1m79/79[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - 

# Further testing and analysis of the second model

**Checking mean absolute error**

In [None]:
# Predict on the test set
y_pred = second_model.predict(X_test)

# Calculate Mean Absolute Error (MAE)
from sklearn.metrics import mean_absolute_error

mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error: {mae}")

# If y_test and y_pred are multi-dimensional (e.g., with multiple outputs), you may need to calculate MAE for each output separately
# Assuming y_test and y_pred have shape (n_samples, n_outputs)
mae_valence = mean_absolute_error(y_test[:, 0], y_pred[:, 0])
mae_arousal = mean_absolute_error(y_test[:, 1], y_pred[:, 1])

print(f"Mean Absolute Error for Valence Mean: {mae_valence}")
print(f"Mean Absolute Error for Arousal Mean: {mae_arousal}")


[1m781/781[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step
Mean Absolute Error: 1.0579183609795504
Mean Absolute Error for Valence Mean: 0.7257738621257614
Mean Absolute Error for Arousal Mean: 1.0361462504863548


## checking actual vs predicted min max

In [None]:
# Check min and max of predictions
y_pred = second_model.predict(X_test)
print(f"Predicted Min: {np.min(y_pred)}, Predicted Max: {np.max(y_pred)}")


[1m781/781[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 985us/step
Predicted Min: 0.3700000047683716, Predicted Max: 8.11032485961914


In [None]:
import numpy as np

# Extract target values
y_train_values = np.array(y_train)
y_test_values = np.array(y_test)

# Find min and max values for both training and test data
train_min = np.min(y_train_values)
train_max = np.max(y_train_values)
test_min = np.min(y_test_values)
test_max = np.max(y_test_values)

print(f"Training data range: Min = {train_min}, Max = {train_max}")
print(f"Test data range: Min = {test_min}, Max = {test_max}")


Training data range: Min = 0.37, Max = 8.1
Test data range: Min = 0.37, Max = 8.1


In [None]:
from tensorflow.keras.models import save_model

# Define the path where you want to save the model
model_save_path = '/content/drive/MyDrive/DEAMds/complete_model.keras'

# Save the model
second_model.save(model_save_path)
