<a href="https://colab.research.google.com/github/ZsofiaK/masterthesis/blob/main/Implementation/Pipeline/Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classification (binary action recognition)

## Setting up requirements

In [1]:
# Setting up name dictionary for later ease of use.
dataset_dict = {'fishClips' : 'Fish clips'}

In [2]:
# Set the dataset, model and embedding specifics for the classification.

dataset_name = 'fishClips'

dataset_dir = dataset_dict[dataset_name]

frame_selection_method = 'motionAbsdiff_10'

embedding_method = 'default'

clf_name = 'ShallowNetwork'

val_score = 'f1_score'  # Score to use during cross-validation.

input_size = 3840   # Size of the input vectors (embeddings).

seed = 23   # For reproducability in pseudo-randomness.

In [3]:
# Setting up parameters for cross validation.

# Number of folds to use.
cv_folds = 3

# Parameter grids to use for the models.
param_grid = { 'ShallowNetwork': {
    'layers': [0, 1, 2],
    'dropout_rate': [0.0, 0.25, 0.5],
    'learning_rate': [0.01, 0.001, 0.0001],
    'pos_threshold' : [0.1, 0.15, 0.2, 0.5],  # Threshold for turning sigmoid prediction to binary label.
    'epochs': [50],
    'batch_size': [32]
}
}

In [4]:
# Setting up folder to save outputs.
import os

output_dir = 'Output'
os.makedirs(output_dir, exist_ok=True)

In [5]:
# Mount Drive.
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# Specify data source
data_source = f"/content/drive/My Drive/UvA/M Thesis/Data/{dataset_dir}"

# Specify directory to copy to
data_dir = f"/content/{dataset_dir}"

In [7]:
# Copy the folder to destination
import shutil
shutil.copytree(data_source, data_dir)

'/content/Fish clips'

## Loading data

In [9]:
# Import libraries
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.utils.class_weight import compute_class_weight
from IPython.display import clear_output

In [10]:
# Load video embeddings and labels
embeddings_dir = f'{data_dir}/Embeddings/{frame_selection_method}/{embedding_method}'
clips_csv_path = f'{data_dir}/clips.csv'

clips_df = pd.read_csv(clips_csv_path)

nr_clips = len(clips_df)
progress = 0

X = []  # Embeddings
y = []  # Labels
video_names = []  # Video names for saving predictions

not_found_embeddings = []

for index, row in clips_df.iterrows():
    video_name = row['video'].replace('.mp4', '')
    label = row['label']

    embedding_path = os.path.join(embeddings_dir, f"{video_name}.npy")

    if os.path.exists(embedding_path):
        embedding = np.load(embedding_path)

        X.append(embedding)
        y.append(label)

        video_names.append(video_name)

    else:
      not_found_embeddings.append(video_name)

    progress += 1

    clear_output(wait=True)
    print(f'Number of videos: {nr_clips}')
    print(f'Progress: {progress/nr_clips * 100:.2f}%')

Number of videos: 220
Progress: 100.00%


In [11]:
# Check if all embeddings were successfully loaded.

if len(not_found_embeddings) > 0:
  print('Failed to find embeddings for:')

  for video in not_found_embeddings:
    print(video)

else:
  print('Success! All embeddings read.')

Success! All embeddings read.


In [12]:
# Converting to numpy.
X = np.array(X)
y = np.array(y)

In [13]:
# Create train-test split
X_train, X_test, y_train, y_test, video_names_train, video_names_test = train_test_split(
    X, y, video_names, test_size=0.2, stratify=y, random_state=seed)

In [14]:
# Handle class imbalance through class weights
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}

## Cross-validate model

In [15]:
## FUNCTIONS FOR MODEL CROSS-VALIDATION.

import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adam
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import recall_score, accuracy_score, f1_score
from IPython.display import clear_output

def create_shallow_network(input_dim, hidden_layers, dropout_rate, learning_rate):
    model = Sequential()

    # Dense input layer with ReLu
    model.add(Dense(10, input_dim=input_dim, activation='relu'))

    # Dense hidden layers with ReLu and dropout
    for _ in range(hidden_layers):
        model.add(Dense(128, activation='relu'))
        model.add(Dropout(dropout_rate))

    # Dense output layer with sigmoid activation
    model.add(Dense(1, activation='sigmoid'))

    optimizer = Adam(learning_rate=learning_rate)

    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

    return model

def crossval_shallow_network(n_splits, input_dim, X, y, params_grid, \
                             val_score = 'accuracy', verbose=False):

  kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

  # Set up best result tracker for grid search loop
  best_score = 0

  best_params = {}

  # Calculate number of runs for progress monitoring.
  total_runs = 1

  for params in params_grid.values():
    total_runs *= len(params)

  total_runs *= n_splits

  progress = 0

  # Grid search loop
  for hidden_layers in params_grid['layers']:
      for dropout_rate in params_grid['dropout_rate']:
          for learning_rate in params_grid['learning_rate']:

            scores = {threshold : [] for threshold in params_grid['pos_threshold']}

            for train_index, test_index in kf.split(X):
                X_train, X_test = X[train_index], X[test_index]
                y_train, y_test = y[train_index], y[test_index]

                # Create and fit the model
                model = create_shallow_network(input_dim, hidden_layers, \
                                                dropout_rate, learning_rate)

                model.fit(X_train, y_train, epochs=10, verbose=0)

                # Sigmoid output
                y_pred_raw = model.predict(X_test)

                # Predicted labels based on sigmoid output
                for threshold in params_grid['pos_threshold']:
                  y_pred = (y_pred_raw > threshold).astype(int).squeeze()

                  # Calculate performance based on selected score
                  if val_score == 'accuracy':
                    score = accuracy_score(y_test, y_pred)

                  elif val_score == 'recall':
                    score = recall_score(y_test, y_pred)

                  elif val_score == 'f1_score':
                    score = f1_score(y_test, y_pred)

                  else:
                    print(f'ERROR: unexpected validation score {val_score}.')
                    print('Select one of: accuracy, recall, f1_score')

                  scores[threshold].append(score)

                  progress += 1

                  clear_output(wait=True)
                  print(f'Cross-validation progress: {progress / total_runs * 100:.2f}%')

                # Best average score across all folds
                best_threshold = max(scores, key=lambda thr: np.mean(scores[thr]))
                best_average_score = np.mean(scores[best_threshold])

                # Check if current model settings beat the current best
                if best_average_score > best_score:
                    best_score = best_average_score
                    best_params = {'hidden_layers': hidden_layers,
                                  'dropout_rate': dropout_rate,
                                  'learning_rate': learning_rate,
                                  'pos_threshold': best_threshold}

  # Print best parameters and their score
  if verbose:
    print(f"\nBest Score: {best_score:.4f}")
    print("Best Parameters:", best_params)

  return best_params

In [16]:
# Setting up model dictionary for ease of reuse.
# Only models which do not need cross-validation are included in this.
# Models with cross-validation are handled on an individual basis.

models_dict = {
    'LogisticRegression' : LogisticRegression(max_iter=1000, class_weight=class_weights_dict),
    'SVC-Linear' : SVC(class_weight='balanced', kernel='linear')
    }

# Models which do not need cross-validation.
no_cross_val = list(models_dict.keys())

In [17]:
# Selecting model (with potential cross validation).
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

# If model requires cross-validation.
if clf_name not in no_cross_val:
  if clf_name == 'ShallowNetwork':
    best_params = crossval_shallow_network(cv_folds, input_size, X_train, \
                                           y_train, param_grid['ShallowNetwork'],\
                                           val_score = val_score, verbose=True)

    model = create_shallow_network(input_size, best_params['hidden_layers'], \
                                 best_params['dropout_rate'], best_params['learning_rate'])

  # Saving best hyperparameters for the model.
  best_params_df = pd.DataFrame([best_params])

  params_output_path = os.path.join(output_dir, 'Parameters')

  os.makedirs(params_output_path, exist_ok=True)

  params_csv_path = os.path.join(output_dir, f'Parameters/params_{dataset_name}_{frame_selection_method}_{embedding_method}_{clf_name}.csv')

  best_params_df.to_csv(params_csv_path, index=False)

# If model does not require cross-validation.
else:
  model = models_dict[clf_name]

Cross-validation progress: 100.00%

Best Score: 0.3784
Best Parameters: {'hidden_layers': 0, 'dropout_rate': 0.0, 'learning_rate': 0.0001, 'pos_threshold': 0.2}


In [18]:
# Train model on the training set.
model.fit(X_train, y_train)



<keras.src.callbacks.History at 0x7b9ea927ca60>

In [19]:
# Make predictions
if clf_name == 'ShallowNetwork':
  y_pred = (model.predict(X_test) > best_params['pos_threshold']).astype(int).squeeze()

else:
  y_pred = model.predict(X_test)



In [20]:
# Prepare DataFrame with video names and predictions.
predictions_df = pd.DataFrame({
    'video': video_names_test,
    'prediction': y_pred
})

# Create predictions output directory.
pred_output_path = os.path.join(output_dir, 'Predictions')
os.makedirs(params_output_path, exist_ok=True)

# Save predictions.
predictions_csv_path = os.path.join(output_dir, f'pred_{dataset_name}_{frame_selection_method}_{embedding_method}_{clf_name}.csv')
predictions_df.to_csv(predictions_csv_path, index=False)

In [21]:
# Producing classification report.
report = classification_report(y_test, y_pred, target_names=['No attack', 'Attack'])

print(report)

              precision    recall  f1-score   support

   No attack       0.76      0.74      0.75        35
      Attack       0.10      0.11      0.11         9

    accuracy                           0.61        44
   macro avg       0.43      0.43      0.43        44
weighted avg       0.63      0.61      0.62        44



In [23]:
# Copy files to Drive.
for file in os.listdir(output_dir):
  source_dir = os.path.join(output_dir, file)

  if '_' in file:
    dataset = file.split('_')[1]
    data_dir_name = dataset_dict[dataset]

    drive_output_dir = f"/content/drive/My Drive/UvA/M Thesis/Data/{data_dir_name}/Predictions"

    if not os.path.exists(drive_output_dir):
      os.makedirs(drive_output_dir)

    destination = f'{drive_output_dir}/{file}'

    shutil.copy(source_dir, destination)