## Balanced Random Forest (BRF) Model Training

In [None]:
# Path configuration
import os

# Main project folder
BASE_PATH = "./data/"

# --- FOLDERS AUTOMATICALLY CREATED IN BASE_PATH ---

# Folder with extracted features
RESULT_FEATURES = os.path.join(BASE_PATH, "result_features/")

# Folder with CNN activations
ACTIVATION_PATH = os.path.join(BASE_PATH, "cnn_activations/")

# Folder where the BRF model will be stored
BRF_MODEL = os.path.join(BASE_PATH, "brf_model/")
OPTUNA_PATH = os.path.join(BRF_MODEL, "optuna/") # Folder for Optuna optimization results (hyperparameter tuning)
RESULT_PATH = os.path.join(BRF_MODEL, "results/") # Folder where the results on test and validation data will be stored

# Ensure folders exist
for path in [BASE_PATH, BRF_MODEL, RESULT_FEATURES, ACTIVATION_PATH, OPTUNA_PATH, RESULT_PATH]:
    os.makedirs(path, exist_ok=True)


### 1. Dataset Summary and Class Distribution

In [None]:
import pandas as pd

# Load the feature data from a CSV file
features = os.path.join(RESULT_FEATURES, "image_features_elipse.csv")
data = pd.read_csv(features, sep=';')

# Count the number of occurrences of each class (Non-Animals = 0, Animals = 1)
class_counts = data['class'].value_counts()

class_distribution = data.groupby(['dataset', 'class']).size().unstack(fill_value=0)

class_distribution.columns = ['Count of Non-Animals (0)', 'Count of Animals (1)']

# Calculate total number of samples and percentages for each class
total_samples = class_distribution.sum().sum()
class_distribution['Total'] = class_distribution.sum(axis=1)
class_distribution['Non-Animals (0)[%]'] = (class_distribution['Count of Non-Animals (0)'] / total_samples) * 100
class_distribution['Animals (1)[%]'] = (class_distribution['Count of Animals (1)'] / total_samples) * 100
class_distribution['Total [%]'] = (class_distribution['Total'] / total_samples) * 100

# Print the class distribution and counts
print(class_distribution)
print(f"\nCount of Non-Animals (0):", class_counts.get(0, 0))
print("Count of Animals (1):    ", class_counts.get(1, 0))

print("Column names:")
print(data.columns, data.shape)

### 2. Hyperparameter Optimization with Cross-Validation (Optuna)

In [None]:
import numpy as np
import pandas as pd
import optuna
from optuna.samplers import TPESampler
from imblearn.ensemble import BalancedRandomForestClassifier
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.metrics import classification_report, roc_auc_score, cohen_kappa_score
import pickle
import os
import time

start_time = time.time()

feature_mapping_file = os.path.join(RESULT_FEATURES, 'feature_mapping.txt')

# Load activation layers
activations_layer_1 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_1_cutouts.npy'))
activations_layer_2 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_2_cutouts.npy'))
activations_layer_3 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_3_cutouts.npy'))
activations_layer_4 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_4_cutouts.npy'))


# Function to compute mean activations
def calculate_mean_activation(activations):
    return np.mean(activations, axis=(1, 2))  # Compute mean activation instead of flattening

# Calculate mean activations for each layer
activations_layer_1_mean = calculate_mean_activation(activations_layer_1)
activations_layer_2_mean = calculate_mean_activation(activations_layer_2)
activations_layer_3_mean = calculate_mean_activation(activations_layer_3)
activations_layer_4_mean = calculate_mean_activation(activations_layer_4)

# Load CSV with classical features
file_path = os.path.join(RESULT_FEATURES, 'image_features_elipse.csv')
csv_data = pd.read_csv(file_path, sep=';')

csv_features = csv_data[['Major Axes', 'Minor Axes', 'Std PCA 1', 'Std PCA 2',
                         'Eccentricity', 'Circumference', 'Area', 'Median Difference', 'Max Difference', 'Min Difference',
                         'Remaining Temp Range', 'Ellipse Temp Range']]

# Combine all features (mean activations + classical features)
X = np.hstack([activations_layer_1_mean, activations_layer_2_mean, activations_layer_3_mean, activations_layer_4_mean, csv_features])
y = csv_data['class'].values  # Target class (1 - animal, 0 - other)

# Split data (training set only needed for Optuna)
X_train = X[csv_data['dataset'] == 'train']
y_train = y[csv_data['dataset'] == 'train']

def objective(trial):
    """Objective function for Optuna optimization"""
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 1500, step=50),
        'max_depth': trial.suggest_int('max_depth', 10, 50, step=5),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 50),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 15),
        'max_samples': trial.suggest_float('max_samples', 0.5, 1.0, step=0.1),
        'sampling_strategy': trial.suggest_categorical('sampling_strategy', ['auto', 'not minority', 'not majority', 'all']),
        'replacement': trial.suggest_categorical('replacement', [False, True]),
        'class_weight': trial.suggest_categorical('class_weight', [None, 'balanced', 'balanced_subsample']),
        'random_state': 42
    }

    model = BalancedRandomForestClassifier(**params)
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='roc_auc', n_jobs=-1)
    return np.mean(scores)

# Run Optuna study
study = optuna.create_study(
    storage="sqlite:///" + os.path.join(OPTUNA_PATH, "optuna_results.db"),
    study_name="BRF_optimization",
    direction="maximize",
    load_if_exists=True,
    sampler=TPESampler()
)
study.optimize(objective, n_trials=200, show_progress_bar=True)

# Save study results
with open(os.path.join(OPTUNA_PATH, "optuna_search_results.pkl"), "wb") as f:
    pickle.dump(study, f)

# (Optional) Save trial details to CSV
df_trials = study.trials_dataframe()
df_trials.to_csv(os.path.join(OPTUNA_PATH, "optuna_trials_summary.csv"), index=False, sep=';')

print(f"\nOptuna optimization completed. Best parameters:\n{study.best_params}")
print(f"Elapsed time [min]: {(time.time() - start_time)/60:.2f}")

print(f"Results saved successfully at: {OPTUNA_PATH}")

### 3. Train BRF Model with Selected Parameters

In [None]:
import numpy as np
import pandas as pd
from imblearn.ensemble import BalancedRandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer
from sklearn.model_selection import StratifiedKFold
import pickle
import os
import time

start_time = time.time()

feature_mapping_file = os.path.join(RESULT_FEATURES, 'feature_mapping.txt')

# Load activation data
activations_layer_1 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_1_cutouts.npy'))
activations_layer_2 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_2_cutouts.npy'))
activations_layer_3 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_3_cutouts.npy'))
activations_layer_4 = np.load(os.path.join(RESULT_FEATURES, 'activations_layer_4_cutouts.npy'))


def calculate_mean_activation(activations):
    return np.mean(activations, axis=(1, 2))

# Compute mean activations
activations_layer_1_mean = calculate_mean_activation(activations_layer_1)
activations_layer_2_mean = calculate_mean_activation(activations_layer_2)
activations_layer_3_mean = calculate_mean_activation(activations_layer_3)
activations_layer_4_mean = calculate_mean_activation(activations_layer_4)

# Load CSV with features
file_path = os.path.join(RESULT_FEATURES, 'image_features_elipse.csv')
csv_data = pd.read_csv(file_path, sep=';')

csv_features = csv_data[['Major Axes', 'Minor Axes', 'Std PCA 1', 'Std PCA 2', 'Eccentricity', 'Circumference', 'Area', 'Median Difference', 'Max Difference', 'Min Difference', 'Remaining Temp Range', 'Ellipse Temp Range']]

# Combine all data into a single feature set
X = np.hstack([
    activations_layer_1_mean,
    activations_layer_2_mean,
    activations_layer_3_mean,
    activations_layer_4_mean,
    csv_features
])
y = csv_data['class'].values

print(f"Number of features: {X.shape[1]}")

# Prepare training, validation, and test sets
datasets = {
    'train': ('X_train', 'y_train'),
    'val': ('X_val', 'y_val'),
    'test': ('X_test', 'y_test')
}

for set_name, (X_name, y_name) in datasets.items():
    locals()[X_name] = X[csv_data['dataset'] == set_name]
    locals()[y_name] = y[csv_data['dataset'] == set_name]

# Train the model
rf_model = BalancedRandomForestClassifier(
    n_estimators=800,
    min_samples_split=2,
    sampling_strategy='not majority',
    min_samples_leaf=1,
    max_samples=0.6,
    max_depth=35,
    class_weight=None,
    replacement=False
)
rf_model.fit(X_train, y_train)

# Predict probabilities
y_test_proba = rf_model.predict_proba(X_test)[:, 1]
y_val_proba = rf_model.predict_proba(X_val)[:, 1]

# Save the model
model_file_path = os.path.join(BRF_MODEL, "BRF_model.sav")
with open(model_file_path, 'wb') as model_file:
    pickle.dump(rf_model, model_file)

# Save test predictions
test_data = csv_data[csv_data['dataset'] == 'test'].copy()
test_data['predicted_proba'] = y_test_proba
test_data.to_csv(os.path.join(RESULT_PATH, "BRF_predictions_test_set.csv"), index=False)

# Save validation predictions
val_data = csv_data[csv_data['dataset'] == 'val'].copy()
val_data['predicted_proba'] = y_val_proba
val_data.to_csv(os.path.join(RESULT_PATH, "BRF_predictions_val_set.csv"), index=False)

end_time = time.time()
print(f"Execution time: {(end_time - start_time)/60:.2f} min")


### 4. Optimal Decision Threshold Selection

In [None]:
from sklearn.model_selection import StratifiedKFold
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import cohen_kappa_score, precision_score, recall_score, f1_score, accuracy_score

X_val_array = X_val
y_val_array = y_val

thresholds = np.arange(0, 1, 0.1)

kappa_scores = np.zeros_like(thresholds)
precision_scores = np.zeros_like(thresholds)
recall_scores = np.zeros_like(thresholds)
f1_scores = np.zeros_like(thresholds)
accuracy_scores = np.zeros_like(thresholds)

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for train_idx, test_idx in cv.split(X_val_array, y_val_array):
    X_fold, y_fold = X_val_array[test_idx], y_val_array[test_idx]
    y_fold_proba = rf_model.predict_proba(X_fold)[:, 1]

    for i, threshold in enumerate(thresholds):
        y_pred = (y_fold_proba >= threshold).astype(int)
        kappa_scores[i] += cohen_kappa_score(y_fold, y_pred)
        precision_scores[i] += precision_score(y_fold, y_pred, zero_division=0)
        recall_scores[i] += recall_score(y_fold, y_pred, zero_division=0)
        f1_scores[i] += f1_score(y_fold, y_pred, zero_division=0)
        accuracy_scores[i] += accuracy_score(y_fold, y_pred)

# Average metrics over folds
kappa_scores /= cv.get_n_splits()
precision_scores /= cv.get_n_splits()
recall_scores /= cv.get_n_splits()
f1_scores /= cv.get_n_splits()
accuracy_scores /= cv.get_n_splits()

# Plot
plt.figure(figsize=(10, 5))
plt.plot(thresholds, kappa_scores, label="Cohen's Kappa")
plt.plot(thresholds, precision_scores, label="Precision")
plt.plot(thresholds, recall_scores, label="Recall")
plt.plot(thresholds, f1_scores, label="F1-score")
plt.plot(thresholds, accuracy_scores, label="Accuracy")
plt.title("Cross-Validated Metrics vs. Threshold")
plt.xlabel("Threshold")
plt.ylabel("Metric value")
plt.xticks(np.arange(0, 1.05, 0.1))
plt.yticks(np.arange(0, 1.05, 0.1))
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


### 5. Model Evaluation: Metrics and Threshold Curves

In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    classification_report, roc_auc_score, average_precision_score, cohen_kappa_score,
    confusion_matrix, roc_curve, auc, precision_recall_curve
)

test_data = pd.read_csv(os.path.join(RESULT_PATH, "BRF_predictions_test_set.csv"))
val_data = pd.read_csv(os.path.join(RESULT_PATH, "BRF_predictions_val_set.csv"))

# Data
y_test = test_data['class'].values
y_test_proba = test_data['predicted_proba'].values
y_val = val_data['class'].values
y_val_proba = val_data['predicted_proba'].values

threshold = 0.4
y_test_pred = (y_test_proba >= threshold).astype(int)
y_val_pred = (y_val_proba >= threshold).astype(int)

# REPORTS
print(f"\nTEST SET (threshold = {threshold})")
print(classification_report(y_test, y_test_pred))
print(f"ROC AUC: {roc_auc_score(y_test, y_test_proba):.4f}")
print(f"PR AUC: {average_precision_score(y_test, y_test_proba):.4f}")
print(f"Cohen's Kappa: {cohen_kappa_score(y_test, y_test_pred):.4f}")

print(f"\nVALIDATION SET (threshold = {threshold})")
print(classification_report(y_val, y_val_pred))
print(f"ROC AUC: {roc_auc_score(y_val, y_val_proba):.4f}")
print(f"PR AUC: {average_precision_score(y_val, y_val_proba):.4f}")
print(f"Cohen's Kappa: {cohen_kappa_score(y_val, y_val_pred):.4f}")

# Confusion Matrix - TEST
conf_matrix = confusion_matrix(y_test, y_test_pred)
plt.figure(figsize=(5, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Non-Animal', 'Animal'], yticklabels=['Non-Animal', 'Animal'])
plt.title("Confusion Matrix - Test Set")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.tight_layout()
plt.show()

# ROC Curve - TEST
fpr, tpr, _ = roc_curve(y_test, y_test_proba)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(5, 4))
plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})', color='darkorange')
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.title("ROC Curve - Test Set")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# Precision-Recall Curve - TEST & VAL
precision_val, recall_val, _ = precision_recall_curve(y_val, y_val_proba)
ap_val = average_precision_score(y_val, y_val_proba)

precision_test, recall_test, _ = precision_recall_curve(y_test, y_test_proba)
ap_test = average_precision_score(y_test, y_test_proba)

plt.figure(figsize=(5, 4))
plt.plot(recall_val, precision_val, label=f'Validation Set (AP = {ap_val:.2f})')
plt.plot(recall_test, precision_test, label=f'Test Set (AP = {ap_test:.2f})')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend(loc='lower left')
plt.grid(True)
plt.tight_layout()
plt.show()


### 6. Feature Importance Analysis

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Load feature names from file
feature_mapping_file = os.path.join(RESULT_FEATURES, "feature_mapping.txt")

with open(feature_mapping_file, 'r') as f:
    feature_names = [line.strip() for line in f]

# Get feature importances from the model
importances = rf_model.feature_importances_

total_importance = np.sum(importances)
normalized_importances = importances / total_importance if total_importance != 0 else importances

# Create vertical bar plot
plt.figure(figsize=(max(12, len(feature_names) * 0.4), 8))
bars = plt.bar(range(len(normalized_importances)), normalized_importances, tick_label=feature_names)
plt.ylabel('Importance')
plt.title('Feature Importance')
plt.xticks(rotation=90)

# Add percentage values 
for bar, value in zip(bars, normalized_importances):
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2, height + 0.005, f'{value:.2%}', ha='center', va='bottom', rotation=90)
    
plt.grid(axis='y', linestyle='-', alpha=0.7)
plt.ylim(0, max(normalized_importances) + 0.03)
plt.tight_layout()
plt.show()