# Extract movement data
Here we prepare the accelerometer data from a phone (represented in a csv file)

In [7]:
import pandas as pd
import numpy as np
import re
from scipy.signal import find_peaks

# I have given the bpm at the end of every movement file 
def extract_bpm_from_filename(filename):
    match = re.search(r'_(\d+)', filename)  
    if match:
        return int(match.group(1))
    else:
        raise ValueError("BPM not found in the filename.")

def prepare_velocity_profiles(filename, bars_per_segment=2):
    bpm = extract_bpm_from_filename(filename)
    seconds_per_beat = 60 / bpm
    beats_per_bar = 4  
    seconds_per_segment = seconds_per_beat * beats_per_bar * bars_per_segment

    data = pd.read_csv(filename)
    # Calculate absolute acceleration (using good ol' pythagorean)
    data['Acceleration'] = np.sqrt(data['Acceleration x (m/s^2)']**2 + 
                                   data['Acceleration y (m/s^2)']**2 + 
                                   data['Acceleration z (m/s^2)']**2)
    # Detect peaks in the absolute acceleration data (using a prebuilt library find_peaks)
    height_threshold = data['Acceleration'].max() / 4
    peaks, properties = find_peaks(data['Acceleration'], height=height_threshold, distance=50)
    peak_values = data['Acceleration'].iloc[peaks]

    # Normalize the peak values to match midi values
    max_velocity_observed = peak_values.max()
    normalized_velocities = (peak_values / max_velocity_observed) * 130

    # Initialize the matrix array
    total_time = data['Time (s)'].iloc[-1]
    number_of_segments = int(total_time // seconds_per_segment)
    matrices = np.zeros((number_of_segments, 22, 32))

    # Populate the first row of each matrix with normalized peak data
    for i, peak_time in enumerate(data['Time (s)'].iloc[peaks]):
        segment_index = int(peak_time // seconds_per_segment)
        if segment_index < number_of_segments:
            time_index = int(((peak_time % seconds_per_segment) / seconds_per_segment) * 32)
            if time_index < 32:
                matrices[segment_index][0][time_index] = normalized_velocities.iloc[i]  # Storing normalized velocity

    return matrices

# Example usage (MovementData/Data6/Accelerometer_120.csv is the same data from video: https://streamable.com/e/wp8jh7)
filepath = 'MovementData/Data6/Accelerometer_120.csv'
matrices = prepare_velocity_profiles(filepath, bars_per_segment=2)

print(matrices.shape)
print(matrices[2][0])  # Output the chosen row of matrice


(3, 22, 32)
[  0.         120.41345528   0.           0.         121.01503267
 111.94024624   0.           0.         111.0735851    0.
 111.00209205 112.99096655   0.         122.34145698   0.
   0.           0.         117.04931698   0.           0.
 115.4117872  122.93285616   0.         109.01335435   0.
 115.78457476   0.         106.27258509   0.          90.70400612
   0.           0.        ]


# Predict genre
Here we load the chosen models, and predict what genre the model believes the related rythm is. 

In [15]:
import numpy as np
import joblib
from tensorflow.keras.models import load_model

def predict_segments(matrices, model, scaler, pca_list, genres):
    predictions = []
    for matrix in matrices:
        # Flatten the matrix to match the expected input structure for scaling
        velocity_profile = matrix.flatten().reshape(1, -1)
        # Standardize the velocity profile
        standardized_profile = scaler.transform(velocity_profile)
        
        # Prepare for PCA transformation by reshaping into num_instruments x num_timesteps
        standardized_profile_reshaped = standardized_profile.reshape(-1, 22, 32)

        # Apply PCA transformation on each timestep across all instruments
        pca_transformed = np.hstack([pca_list[i].transform(standardized_profile_reshaped[:, :, i]) for i in range(len(pca_list))])
        
        # Reshape for model input
        pca_transformed = pca_transformed.reshape(1, -1, 1)
        
        # Make predictions
        prediction = model.predict(pca_transformed)
        predicted_class = np.argmax(prediction, axis=1)
        predicted_genre = genres[predicted_class[0]]
        confidence = prediction[0][predicted_class[0]]
        
        # Store results
        predictions.append((predicted_genre, confidence))
    
    return predictions

# Load models and preprocessing objects (these are premade, can make more in drumclassifier_PCA_CNN (just make sure to update genres to match))
model = load_model('models/pca_cnn/pca_cnn_model_10G.keras')
scaler = joblib.load('models/pca_cnn/scaler_10G.pkl')
pca_list = joblib.load('models/pca_cnn/pca_list_10G.pkl')  

matrices = prepare_velocity_profiles(filepath, bars_per_segment=2)
genres = ['latin',  'jazz',  'punk', 'afrobeat', 'hiphop', 'soul', 'funk', 'rock', 'country', 'reggae']

# Predict for all segments
segment_predictions = predict_segments(matrices, model, scaler, pca_list, genres)

# Output predictions ()
for i, (genre, confidence) in enumerate(segment_predictions):
    print(f"Segment {i+1}: Predicted genre - {genre}, Confidence - {confidence:.2f}")


Segment 1: Predicted genre - rock, Confidence - 0.32
Segment 2: Predicted genre - rock, Confidence - 0.33
Segment 3: Predicted genre - rock, Confidence - 0.34


# PCA_LDA_CNN

In [13]:
import numpy as np
from tensorflow.keras.models import load_model

def predict_segments(matrices, model, scaler, pca, lda, genres):
    predictions = []
    for matrix in matrices:
        # Reducing matrix by averaging over the 32 timesteps to get 22 features
        reduced_matrix = np.mean(matrix, axis=1).reshape(1, -1)

        # Standardize the velocity profile
        standardized_profile = scaler.transform(reduced_matrix)

        # Apply PCA transformation
        pca_transformed = pca.transform(standardized_profile)

        # Apply LDA transformation
        lda_transformed = lda.transform(pca_transformed)

        # Artificially extend the LDA output to match model input expectations
        lda_transformed_extended = np.tile(lda_transformed, (32, 1)).reshape(1, 32, 1)

        # Make predictions
        prediction = model.predict(lda_transformed_extended)
        predicted_class = np.argmax(prediction, axis=1)
        predicted_genre = genres[predicted_class[0]]
        confidence = prediction[0][predicted_class[0]]

        # Store results
        predictions.append((predicted_genre, confidence))

    return predictions


model = load_model('models/pca_lda_cnn/pca_lda_cnn_model_10G.keras') 
scaler = joblib.load('models/pca_lda_cnn/scaler_10G.pkl')  
pca = joblib.load('models/pca_lda_cnn/pca_model_10G.pkl')  
lda = joblib.load('models/pca_lda_cnn/lda_model_10G.pkl') 

# Using the updated function
segment_predictions = predict_segments(matrices, model, scaler, pca, lda, genres)

# Output predictions
for i, (genre, confidence) in enumerate(segment_predictions):
    print(f"Segment {i+1}: Predicted genre - {genre}, Confidence - {confidence:.2f}")



Segment 1: Predicted genre - rock, Confidence - 0.39
Segment 2: Predicted genre - rock, Confidence - 0.40
Segment 3: Predicted genre - rock, Confidence - 0.40
