In [None]:
# import TensorFlow
import tensorflow as tf

#Check the version of TensorFlow you are using
print(tf.__version__)
print(tf.config.list_physical_devices('GPU'))

2.15.0
[]


In [None]:
# load required libraries
import os
import librosa
import numpy as np
import tensorflow as tf
import pandas as pd
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.image import resize
from tensorflow.keras.models import load_model
# import wandb

In [None]:
from google.colab import drive
import sys

#wandb.login()

drive.mount('/content/drive')
sys.path.append('/content/drive/MyDrive/ucph')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def convert_label(df):
  lab2id = {"clear": 0, "slice": 1, "smash": 2}
  # df['label'] = df['shot'].map(lab2id) this did not word as intended
  def map_label(shot):
        return lab2id.get(shot, -1)  # return -1 if shot is not in lab2id

  # Apply the map_label function to each element of the 'shot' column
  df['label'] = df['shot'].apply(map_label)
  return df

def add_mel_spectrogram(df):
  def create_mel_spectrogram(audio_array):
    audio_array = np.array(audio_array)
    mel_spectrogram = librosa.feature.melspectrogram(y=audio_array)
    return np.array(mel_spectrogram)

  df["audio"] = df["audio"].apply(create_mel_spectrogram)
  return df

def prepare_and_load_dataset():
  path = '/content/drive/MyDrive/ucph/CCS2/data/'
  column_names = ["filename", "shot", "pose", "audio", "mel_spectrogram", "start_point", "end_point"]
  # read csv and convert label
  train_df = pd.read_json(f'{path}train_0.json')
  test_df = pd.read_json(f'{path}test_0.json')

  for i in range(1, 10):
    data = pd.read_json(f'{path}train_{i}.json')
    train_df = pd.concat([train_df, data], ignore_index=True)
    data = pd.read_json(f'{path}test_{i}.json')
    test_df = pd.concat([test_df, data], ignore_index=True)

  # reset index
  train_df.reset_index(drop=True, inplace=True)
  test_df.reset_index(drop=True, inplace=True)

  # shuffle datasets
  train_df = train_df.sample(frac=1, random_state=42)
  test_df = test_df.sample(frac=1, random_state=42)

  # convert string lables to int
  train_df = convert_label(train_df)
  test_df = convert_label(test_df)

  # create mel_spectrograms
  train_df = add_mel_spectrogram(train_df)
  test_df = add_mel_spectrogram(test_df)

  def create_pose_array(pose_array):
    try:
      pose_array = np.array(pose_array)
      if pose_array.shape == (62, 33, 4):
        # Calculate the mean and standard deviation over the second and third dimensions for each entry
        mean = np.mean(pose_array, axis=(0, 1))
        std = np.std(pose_array, axis=(0, 1))
        # Normalize the array
        pose_array = (pose_array - mean) / std
        return pose_array
      else:
        return np.nan
    except:
      return np.nan

  train_df['pose'] = train_df['pose'].apply(create_pose_array)
  train_df = train_df.dropna(subset=['pose'])
  test_df['pose'] = test_df['pose'].apply(create_pose_array)
  test_df = test_df.dropna(subset=['pose'])

  return train_df, test_df

def get_model_input_data(train_df, test_df, column_name):
  # extract data and labels
  x_train = train_df[column_name].values
  x_train = np.stack(x_train)
  x_test = test_df[column_name].values
  x_test = np.stack(x_test)
  y_train = train_df["label"].values
  y_train_cat = to_categorical(y_train, num_classes=3)  # Convert labels to one-hot encoding
  y_test = test_df["label"].values
  return x_train, x_test, y_train_cat, y_test

In [None]:
train_df, test_df = prepare_and_load_dataset()

In [None]:
# Function to preprocess and classify one sample
def test_sample(sample, model):
    # Make predictions
    sample = sample.reshape(1, *sample.shape)
    predictions = model.predict(sample)

    # Get the class probabilities
    class_probabilities = predictions[0]

    # Get the predicted class index
    predicted_class_index = np.argmax(class_probabilities)
    return class_probabilities, predicted_class_index

# Load data
x_train_pose, x_test_pose, y_train_cat_pose, y_test_pose = get_model_input_data(train_df, test_df, 'pose')
x_train_audio, x_test_audio, y_train_cat_audio, y_test_audio = get_model_input_data(train_df, test_df, 'audio')

# Load the saved model
audio_model = load_model(f'/content/drive/MyDrive/ucph/CCS2/models/audio_classification_model_2.keras')
pose_model = load_model(f'/content/drive/MyDrive/ucph/CCS2/models/pose_classification_model_1.keras')

# Prepare Train data
audio_x = np.zeros((x_train_audio.shape[0], 3))
pose_x = np.zeros((x_train_audio.shape[0], 3))

for index in range(x_train_audio.shape[0]):
    class_probabilities, predicted_class_index = test_sample(x_train_audio[index], audio_model)
    audio_x[index] = class_probabilities
    class_probabilities, predicted_class_index = test_sample(x_train_pose[index], pose_model)
    pose_x[index] = class_probabilities

X_train_new = np.concatenate((audio_x, pose_x), axis=1)

# Prepare Test Data
audio_x = np.zeros((x_test_audio.shape[0], 3))
pose_x = np.zeros((x_test_audio.shape[0], 3))

for index in range(x_test_audio.shape[0]):
    class_probabilities, predicted_class_index = test_sample(x_test_audio[index], audio_model)
    audio_x[index] = class_probabilities
    class_probabilities, predicted_class_index = test_sample(x_test_pose[index], pose_model)
    pose_x[index] = class_probabilities

X_test_new = np.concatenate((audio_x, pose_x), axis=1)




In [None]:
from keras.models import Sequential

# fit our model indicating epoch and batch_size
def create_fit_model(X_train_fold, y_train_fold, X_val_fold, y_val_fold, fold, model_name='ensemble'):

  # create nn model
  # create model
  model = Sequential()
  model.add(Dense(12, input_dim=6, activation='relu'))
  model.add(Dense(3, activation='softmax'))

  # Compile model
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

  model.fit(X_train_fold, y_train_fold, epochs=20, batch_size=32, validation_data=(X_val_fold, y_val_fold))
  scores = model.evaluate(X_val_fold, y_val_fold,verbose=0)
  print(scores)

  # Save the model
  print(f'/content/drive/MyDrive/ucph/CCS2/models/{model_name}_classification_model_{fold}.keras')
  model.save(f'/content/drive/MyDrive/ucph/CCS2/models/{model_name}_classification_model_{fold}.keras')

  return scores
# train the model

In [None]:
from sklearn.model_selection import KFold

model_name = 'ensemble'

# Assuming X_train and y_train are your feature and target datasets
kf = KFold(n_splits=10, shuffle=True, random_state=42)

# Define lists to store training and validation indices
train_indices_list = []
val_indices_list = []

# Generate the training and validation indices for each fold
for train_indices, val_indices in kf.split(X_train_new):
    train_indices_list.append(train_indices)
    val_indices_list.append(val_indices)

# define list to store scores (loss, accuracy, val_loss, val_arruracy)
scores = []

# use cross validation
for fold in range(10):
    X_train_fold = X_train_new[train_indices_list[fold]]
    y_train_fold = y_train_cat_pose[train_indices_list[fold]]
    X_val_fold = X_train_new[val_indices_list[fold]]
    y_val_fold = y_train_cat_pose[val_indices_list[fold]]

    test_scores = create_fit_model(X_train_fold, y_train_fold, X_val_fold, y_val_fold, fold, model_name)
    scores.append(test_scores)

# find the best model among the 10 -> best val accuracy
print(f"Model: {model_name}")
for entry in scores:
  print(entry)

# find max for val accuracy
max_index = np.argmax(np.array(scores)[:, -1])
print(f"Best model was found to be: {model_name}_classification_model_{max_index}")


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
[0.13052517175674438, 1.0]
/content/drive/MyDrive/ucph/CCS2/models/ensemble_classification_model_0.keras
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
[0.2247791588306427, 0.9454545378684998]
/content/drive/MyDrive/ucph/CCS2/models/ensemble_classification_model_1.keras
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
[0.17925259470939636, 1.0]
/content/drive/MyDrive/ucph/CCS2/models/ensemble_classif

In [None]:
from sklearn.metrics import f1_score, accuracy_score

# Define your class labels
classes = ['clear', 'slice', 'smash']

# test the best model for audio, pose and comb for test data
def test_model(model_file_name, X_test, y_test):

  # Load the saved model
  model = load_model(f'/content/drive/MyDrive/ucph/CCS2/models/{model_file_name}.keras')

  predictions = []
  predictions_per_class = []

  # Test an audio file
  for index in range(X_test.shape[0]):
    class_probabilities, predicted_class_index = test_sample(X_test[index], model)
    predictions.append(predicted_class_index)
    predictions_per_class.append(class_probabilities)

  y_test_cat = to_categorical(y_test, num_classes=3)  # Convert labels to one-hot encoding

  macro_f1 = f1_score(y_test, predictions, average='macro')

  # Calculate accuracy for each individual class
  class_accuracies = []
  for i in range(len(classes)):
      class_accuracy = accuracy_score(y_test == i, np.array(predictions) == i)
      class_accuracies.append(class_accuracy)

  # Calculate overall accuracy
  overall_accuracy = accuracy_score(y_test, predictions)

  # convert predictions to other data types
  predictions = np.array(predictions)
  predictions_per_class = np.stack(predictions_per_class)

  # calculate mean abs error
  mean_abs_error = np.sum(np.abs(np.where(y_test-predictions != 0, 1, 0))) / y_test.shape[0]

  # calculate rel error
  a = np.sum(np.multiply(y_test_cat, predictions_per_class), axis=1)
  b = np.where(y_test-predictions != 0, 1, 0)
  rel_error = np.sum(np.multiply(a, b))

  # print all the results
  print("-------------------------------------------------------------------------")
  print(f"{model_name} evaluation on test set")
  print(f"Macro F1 score: {macro_f1}")
  print("Accuracy for each individual class:")
  for i, accuracy in enumerate(class_accuracies):
      print(f"Class {classes[i]}: {accuracy}")
  print(f"Overall accuracy: {overall_accuracy}")
  print(f"Mean Absolute Error: {mean_abs_error}")
  print(f"Relative Error: {rel_error}")

test_model('ensemble_classification_model_0',X_test_new, y_test_audio)

-------------------------------------------------------------------------
ensemble evaluation on test set
Macro F1 score: 0.7146094215861658
Accuracy for each individual class:
Class clear: 0.9827586206896551
Class slice: 0.9741379310344828
Class smash: 0.9482758620689655
Overall accuracy: 0.9396551724137931
Mean Absolute Error: 0.0603448275862069
Relative Error: 3.753638632595539
