## Anomaly Detection on Financial Support for each Strategic Priority

In [1]:
# Library imports
import pandas as pd
import numpy as np
import os
from pathlib import Path

In [2]:
# Set the base path for input data files
CURRENT_DIR = Path().resolve()
DATA_BASE_PATH = CURRENT_DIR.parent / "outputs" / "data_output"

In [3]:
# set the path for the modeling data file
output_dir = os.path.join("..", "outputs", "model_output")
os.makedirs(output_dir, exist_ok=True)

In [4]:
# Load data
df = pd.read_csv("../outputs/data_output/Financial_Cleaned.csv")

In [5]:
# Step 1: Ratio Features
epsilon = 1e-6
for year in range(2020, 2026):
    df[f"{year}_Exp_per_Req"] = df[f"{year} Expenditure"] / (df[f"{year} Required"] + epsilon)
    df[f"{year}_Exp_per_Avail"] = df[f"{year} Expenditure"] / (df[f"{year} Available"] + epsilon)
    df[f"{year}_Avail_per_Req"] = df[f"{year} Available"] / (df[f"{year} Required"] + epsilon)

In [6]:
# Step 2: Select features
raw_cols = [f"{year} {k}" for year in range(2020, 2026) for k in ["Required", "Available", "Expenditure"]]
agg_cols = ['Total required resources', 'Total available resources', 'Total expenditure resources']
ratio_cols = [col for col in df.columns if "_per_" in col]

num_features = df[raw_cols + agg_cols + ratio_cols].replace([np.inf, -np.inf], np.nan).fillna(0)

In [7]:
# Categorical features to one-hot encode

cat_cols = ['Country', 'Region', 'Theme', 'SP_Label', 'SDG Goals', 'Agencies']
df_cat = pd.get_dummies(df[cat_cols].fillna("Unknown"), drop_first=True)

In [8]:
# Combine all features
X = pd.concat([num_features, df_cat], axis=1)

In [9]:
from sklearn.preprocessing import StandardScaler

# Step 2: Standardization
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [10]:
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
# Models dictionary
anomaly_models = {}

# Model 1: Isolation Forest
anomaly_models["IsolationForest"] = IsolationForest(contamination=0.05, random_state=42)

# Model 2: One-Class SVM
anomaly_models["OneClassSVM"] = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')

# Model 3: Local Outlier Factor (fit_predict only, no separate fit)
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
df['Anomaly_LOF'] = lof.fit_predict(X_scaled)

# Fit the remaining models
for name, model in anomaly_models.items():
    df[f'Anomaly_{name}'] = model.fit_predict(X_scaled)

# Compare Anomaly Detection Results
print("Anomaly Counts:")
for col in df.columns:
    if "Anomaly_" in col:
        n_anomalies = (df[col] == -1).sum()
        print(f"{col}: {n_anomalies}")

Anomaly Counts:
Anomaly_LOF: 130
Anomaly_IsolationForest: 130
Anomaly_OneClassSVM: 290


In [11]:
from sklearn.metrics import silhouette_score

print("\nSilhouette Scores:")
for col in df.columns:
    if "Anomaly_" in col:
        try:
            labels = df[col]
            score = silhouette_score(X_scaled, labels)
            print(f"{col}: {score:.4f}")
        except Exception as e:
            print(f"{col}: Could not compute ({e})")


Silhouette Scores:
Anomaly_LOF: 0.1227
Anomaly_IsolationForest: -0.0085
Anomaly_OneClassSVM: 0.1079


In [12]:
from sklearn.neighbors import LocalOutlierFactor

# Fit LOF again to ensure it's reproducible here
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
df['SP_Anomaly_Flag'] = lof.fit_predict(X_scaled)

# Map -1 to "Yes" (anomalous), 1 to "No"
df['SP_Anomaly_Flag'] = df['SP_Anomaly_Flag'].map({-1: "Yes", 1: "No"})

In [13]:
df.to_csv(os.path.join(output_dir, "anomaly_detection.csv"), index=False)