In [None]:
# Import required libraries
import pandas as pd
import numpy as np
from sklearn.svm import SVC, SVR
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, confusion_matrix

# Set random seed for reproducibility
np.random.seed(42)

# Load the dataset
df = pd.read_csv('/content/modified_output.csv')

# **1. Create 'Person_ID' Column**

# Calculate the total number of rows per individual
rows_per_individual = 4500

# Assign 'Person_ID' based on row indices
df['Person_ID'] = (df.index // rows_per_individual) + 1

# Verify that 'Person_ID' ranges from 1 to 16
print(f"Unique Person_IDs: {df['Person_ID'].unique()}")



df['Health_Status'] = np.where(df['Person_ID'] <= 12, 'Healthy', 'Patient')

# **3. Randomly Split Individuals into Training and Testing Sets**

# Get lists of healthy and patient IDs
healthy_ids = df.loc[df['Health_Status'] == 'Healthy', 'Person_ID'].unique()
patient_ids = df.loc[df['Health_Status'] == 'Patient', 'Person_ID'].unique()

# Randomly select 9 healthy IDs for training
train_healthy_ids = np.random.choice(healthy_ids, size=12, replace=False)
# The remaining healthy IDs for testing
test_healthy_ids = np.setdiff1d(healthy_ids, train_healthy_ids)

# Randomly select 3 patient IDs for training
train_patient_ids = np.random.choice(patient_ids, size=0, replace=False)
# The remaining patient IDs for testing
test_patient_ids = np.setdiff1d(patient_ids, train_patient_ids)

# Combine IDs for training and testing
train_ids = np.concatenate([train_healthy_ids, train_patient_ids])
test_ids = np.concatenate([test_healthy_ids, test_patient_ids])

print(f"Training Healthy IDs: {train_healthy_ids}")
print(f"Training Patient IDs: {train_patient_ids}")
print(f"Testing Healthy IDs: {test_healthy_ids}")
print(f"Testing Patient IDs: {test_patient_ids}")

# Create masks for training and testing data
train_mask = df['Person_ID'].isin(train_ids)
test_mask = df['Person_ID'].isin(test_ids)

# **4. Define Feature Columns and Target Variables**

relevant_columns = ["Body_Temperature", "Heart_Rate_Sensor1",
                    "Heart_Rate_Sensor2", "SpO2", "ECG"]

# **Feature Engineering for Heart Rate**

# 1. Average Heart Rate
df['Average_Heart_Rate'] = df[['Heart_Rate_Sensor1', 'Heart_Rate_Sensor2']].mean(axis=1)

# 2. Difference Between Heart Rate Sensors
df['Heart_Rate_Difference'] = abs(df['Heart_Rate_Sensor1'] - df['Heart_Rate_Sensor2'])

# **5. Add Rolling Mean and Std. Dev. for ECG per Individual**

# Choose a rolling window size (e.g., window_size = 5)
window_size = 5

# Compute rolling statistics per individual
df['ECG_Rolling_Mean'] = df.groupby('Person_ID')['ECG'].rolling(window=window_size, min_periods=1).mean().reset_index(level=0, drop=True)
df['ECG_Rolling_Std'] = df.groupby('Person_ID')['ECG'].rolling(window=window_size, min_periods=1).std().reset_index(level=0, drop=True)

# Handle NaN values resulting from rolling std (first few entries)
df['ECG_Rolling_Std'] = df['ECG_Rolling_Std'].bfill()


# **Update the list of relevant columns to include new features**

engineered_features = ['Average_Heart_Rate', 'Heart_Rate_Difference', 'ECG_Rolling_Mean', 'ECG_Rolling_Std']
relevant_columns_extended = relevant_columns + engineered_features

# **6. Define Features and Target Variables**

X = df[relevant_columns_extended].copy()
Y_anomaly = df["anomaly"].copy()
Y_point = df["first"].copy()
Y_contextual = df["second"].copy()

# **Create Training and Testing Datasets Based on Individuals**

X_train = X[train_mask].reset_index(drop=True)
X_test = X[test_mask].reset_index(drop=True)

Y_train = Y_anomaly[train_mask].reset_index(drop=True)
Y_test = Y_anomaly[test_mask].reset_index(drop=True)

Y_point_train = Y_point[train_mask].reset_index(drop=True)
Y_point_test = Y_point[test_mask].reset_index(drop=True)

Y_contextual_train = Y_contextual[train_mask].reset_index(drop=True)
Y_contextual_test = Y_contextual[test_mask].reset_index(drop=True)

# **7. Scale the Data**

# Fit the scaler on the training data
scaler = MinMaxScaler()
X_train_scaled = pd.DataFrame(
    scaler.fit_transform(X_train),
    columns=relevant_columns_extended
)
X_test_scaled = pd.DataFrame(
    scaler.transform(X_test),
    columns=relevant_columns_extended
)

# **8. Function to Detect Anomalies (Both Point and Contextual)**

def detect_anomalies(X_train_scaled, X_test_scaled, svm_predictions, relevant_columns, engineered_features):
    X_test_adjusted = X_test_scaled.copy()
    X_test_adjusted['Initial_Prediction'] = svm_predictions
    X_test_adjusted['Anomaly_Type'] = 0  # Initialize as normal

    # Indices predicted as anomalies by SVM
    anomaly_indices = X_test_adjusted[X_test_adjusted['Initial_Prediction'] == 1].index

    # Initialize baseline residuals and feature models for contextual anomalies
    baseline_residuals = {}
    feature_models = {}

    # Combine original and engineered features for regression
    all_features = relevant_columns + engineered_features

    # Train SVR models for each feature
    for feature in all_features:
        regression_features = [f for f in all_features if f != feature]
        reg = SVR(kernel='rbf')
        reg.fit(X_train_scaled[regression_features], X_train_scaled[feature])
        feature_models[feature] = reg

        # Calculate baseline residuals
        train_predictions = reg.predict(X_train_scaled[regression_features])
        residuals = np.abs(train_predictions - X_train_scaled[feature])
        baseline_residuals[feature] = np.percentile(residuals, 95)

    # Now detect point anomalies using thresholds within the function
    # Apply thresholds for each feature
    # Note: We need to access the original (unscaled) test data

    # Use the original test data (unscaled)
    X_test_original = X_test.copy()

    # Heart Rate Sensors
    hr_threshold_low = 55
    hr_threshold_high = 110
    hr1_anomalies = (X_test_original['Heart_Rate_Sensor1'] < hr_threshold_low) | (X_test_original['Heart_Rate_Sensor1'] > hr_threshold_high)
    hr2_anomalies = (X_test_original['Heart_Rate_Sensor2'] < hr_threshold_low) | (X_test_original['Heart_Rate_Sensor2'] > hr_threshold_high)

    # Body Temperature
    temp_threshold_low = 93
    temp_threshold_high = 110
    temp_anomalies = (X_test_original['Body_Temperature'] < temp_threshold_low) | (X_test_original['Body_Temperature'] > temp_threshold_high)

    # SpO2
    spo2_threshold_low = 92
    spo2_threshold_high = 100
    spo2_anomalies = (X_test_original['SpO2'] < spo2_threshold_low) | (X_test_original['SpO2'] > spo2_threshold_high)

    # ECG
    ecg_threshold_low = 450
    ecg_threshold_high = 700
    ecg_anomalies = (X_test_original['ECG'] < ecg_threshold_low) | (X_test_original['ECG'] > ecg_threshold_high)

    # Combine all point anomalies
    point_anomalies_indices = X_test_original.index[hr1_anomalies | hr2_anomalies | temp_anomalies | spo2_anomalies | ecg_anomalies]

    # Set 'Anomaly_Type' for point anomalies
    X_test_adjusted.loc[point_anomalies_indices, 'Anomaly_Type'] = 1  # Point anomalies

    # Remaining indices to check for contextual anomalies
    remaining_indices = anomaly_indices.difference(point_anomalies_indices)

    # Detect contextual anomalies among the remaining anomalies predicted by SVM
    for idx in remaining_indices:
        feature_violations = []
        context_scores = []

        for feature in all_features:
            regression_features = [f for f in all_features if f != feature]
            model = feature_models[feature]

            predicted_value = model.predict(X_test_scaled.loc[[idx], regression_features])[0]
            actual_value = X_test_scaled.loc[idx, feature]
            residual_error = abs(predicted_value - actual_value)

            if residual_error > baseline_residuals[feature]:
                feature_violations.append(feature)
                context_scores.append(residual_error / baseline_residuals[feature])

        if len(feature_violations) >= 2:
            X_test_adjusted.loc[idx, 'Anomaly_Type'] = 2  # Contextual anomaly
        else:
            # If not a contextual anomaly, and not already marked as point anomaly, mark as point anomaly
            X_test_adjusted.loc[idx, 'Anomaly_Type'] = 1  # Point anomaly

    return X_test_adjusted

# **9. Train the SVM Model**

svm_model = SVC(kernel='rbf', random_state=42)
svm_model.fit(X_train_scaled, Y_train)

# **10. Get Initial SVM Predictions**

svm_predictions = svm_model.predict(X_test_scaled)

# **11. Detect Anomalies Using SVM Predictions, Thresholds, and SVR-Based Contextual Detection**

results = detect_anomalies(
    X_train_scaled, X_test_scaled, svm_predictions, relevant_columns, engineered_features
)

# **12. Get Final Predictions**

final_point_predictions = (results['Anomaly_Type'] == 1).astype(int)
final_contextual_predictions = (results['Anomaly_Type'] == 2).astype(int)
final_overall_predictions = (final_point_predictions | final_contextual_predictions).astype(int)

# **13. Calculate Accuracies**

accuracy_overall = accuracy_score(Y_test, final_overall_predictions)
accuracy_point = accuracy_score(Y_point_test, final_point_predictions)
accuracy_contextual = accuracy_score(Y_contextual_test, final_contextual_predictions)

# **14. Print Accuracies**

print("\nAccuracy Scores:")
print(f"Overall Anomaly Detection Accuracy: {accuracy_overall:.4f}")
print(f"Point Anomaly Detection Accuracy: {accuracy_point:.4f}")
print(f"Contextual Anomaly Detection Accuracy: {accuracy_contextual:.4f}")


