# Creating Risk Context from Anomaly Signals with an Alert Object:

### Imports:

In [28]:
import os
import numpy as np
import pandas as pd

### Alert Object Builder Class:

In [24]:
class AlertObjectBuilder:
    """
    Converts raw reconstruction error outputs into contextual, SOC-ready alert objects. Can operat on Autoencoder errors,
    Isolation Forest scores, or hybridd risk metrics.
    """
    
    def __init__(self, percentile_thresholds: dict | None=None, top_k: int=3) -> None:
        """
        Initializing the alert object builder.
        
        Args:
            percentile_thresholds: The risk band thresholds
            top_k: Number of top contributing features to extract
            
        Returns:
            None:
        """
        if percentile_thresholds is None:
            percentile_thresholds = {
                "LOW": 80,
                "MEDIUM": 90,
                "HIGH": 95,
                "CRITICAL": 100
            }
        
        self.percentile_thresholds = percentile_thresholds
        self.top_k = top_k
        self.baseline_errors = None
        
    
    def fit_baseline(self, reconstruction_errors: np.ndarray) -> None:
        """
        Stores the baseline reconstruction error distribution.
        
        Args:
            reconstruction_errors: Array of baseline reconstruction errors.
            
        Returns:
            None:
        """
        self.baseline_errors = np.sort(reconstruction_errors)
        
    
    def compute_percentile(self, error: float) -> float:
        """
        Computes the percentile rank of an error value.
        
        Args:
            error: The error value to rank
            
        Returns:
            float: The percentile the error value falls within (0-100)
        """
        if self.baseline_errors is None:
            raise ValueError("Baseline distribution not fitted.")
        
        # Calculating the percentile
        percentile = (np.searchsorted(self.baseline_errors, error)) / len(self.baseline_errors) * 100
        return percentile
    
    
    def assign_risk_band(self, percentile: float) -> str:
        """
        Assigns risk bands based on the provided percentile.
        
        Args:
            percentile: The assigned percentile of an error value
            
        Returns:
            str: The risk band level 
        """
        for label, thresh in self.percentile_thresholds.items():
            if percentile <= thresh:
                return label
                
        return "CRITICAL"
        
    
    def extract_top_contributors(self, row: pd.Series) -> list:
        """
        Extracts the top-K contributing factors from a row.
        
        Args:
            row: A row consisting of metadata and recontruction error data
            
        Returns:
            list: A list of the form: (feature_name, contribution_value)
        """
        # Extracting the contribution-related columns
        contribution_cols = [col for col in row.index if col.startswith("contribution_")]
        contributions = row[contribution_cols].sort_values(ascending=False)
        
        # Finding the top-K contributing features
        top_features = contributions.head(self.top_k)
        
        top_contributors = [(feature.replace("contribution_", ""), value) for feature, value in top_features.items()]
        return top_contributors
        
        
    def build_alert_from_row(self, row: pd.Series) -> dict:
        """
        Builds an alert dictionary for a single sample.
        
        Args:
            row: A row consisting of metadata and recontruction error data
            
        Returns:
            dict: A structured alert object
        """
        # Creating percential assignment and risk band
        error = row["total_reconstruction_error"]
        percentile = self.compute_percentile(error)
        risk_band = self.assign_risk_band(percentile)
        top_features = self.extract_top_contributors(row)
        
        # Creating explanation statement
        explanation = (
            f"Behavior falls in the {percentile:.2f}th percentile of reconstruction deviation. Primary contributors: "\
            + ", ".join([f"{feat} ({val:.2f})" for feat, val in top_features])
        )
        
        # Creating alert dictionary
        alert = {
            "user": row["user"],
            "pc": row["pc"],
            "day": row["day"],
            "percentile_rank": percentile,
            "risk_band": risk_band,
            "top_contributors": top_features,
            "explanation": explanation
        }
        
        return alert
        
        
    def build_alert_df(self, explanation_df: pd.DataFrame) -> pd.DataFrame:
        """
        Generates an alert DataFrame from a reconstruction error explanation DataFrame.
        
        Args:
            explanation_df: The reconstuction error explanation DataFrame
            
        Returns:
            pd.DataFrame: An alert-ready structured DataFrame
        """
        alerts = []
        
        for _, row in explanation_df.iterrows():
            alert = self.build_alert_from_row(row)
            alerts.append(alert)
            
        return pd.DataFrame(alerts)

### Importing the Reconstruction Error Table:

In [25]:
reconstruction_err_table = pd.read_csv(r"explainability\reconstruction_error\reconstuction_error_table_1.csv")

### Creating the Alert Object Table:

In [26]:
builder = AlertObjectBuilder(top_k=2)
builder.fit_baseline(reconstruction_err_table["total_reconstruction_error"].values)

In [27]:
alert_df = builder.build_alert_df(reconstruction_err_table)

In [30]:
alert_df

Unnamed: 0,user,pc,day,percentile_rank,risk_band,top_contributors,explanation
0,aab0162,pc-6599,2010-01-04,73.234835,LOW,"[(emails_sent, 0.3487802168561951), (off_hours...",Behavior falls in the 73.23th percentile of re...
1,aab0162,pc-6599,2010-01-05,68.489343,LOW,"[(emails_sent, 0.2719235722699175), (unique_re...",Behavior falls in the 68.49th percentile of re...
2,aab0162,pc-6599,2010-01-06,80.836852,MEDIUM,"[(emails_sent, 0.2582589955049517), (unique_re...",Behavior falls in the 80.84th percentile of re...
3,aab0162,pc-6599,2010-01-07,70.978400,LOW,"[(emails_sent, 0.2865364371162134), (off_hours...",Behavior falls in the 70.98th percentile of re...
4,aab0162,pc-6599,2010-01-08,73.272093,LOW,"[(emails_sent, 0.2967255021625118), (attacheme...",Behavior falls in the 73.27th percentile of re...
...,...,...,...,...,...,...,...
1564764,zzo2997,pc-3120,2011-05-24,84.764780,MEDIUM,"[(emails_sent, 0.3716305364512766), (attacheme...",Behavior falls in the 84.76th percentile of re...
1564765,zzo2997,pc-3120,2011-05-25,85.404299,MEDIUM,"[(emails_sent, 0.3048735621158884), (unique_re...",Behavior falls in the 85.40th percentile of re...
1564766,zzo2997,pc-3120,2011-05-26,85.404427,MEDIUM,"[(emails_sent, 0.3053542066212134), (unique_re...",Behavior falls in the 85.40th percentile of re...
1564767,zzo2997,pc-3120,2011-05-27,85.259486,MEDIUM,"[(unique_recipients, 0.2881979501416273), (ema...",Behavior falls in the 85.26th percentile of re...


### Saving the Alert Object Table:

In [None]:
# Defining save path
save_path = r"explainability\alert_table"
os.makedirs(save_path, exist_ok=True)

In [33]:
alert_df.to_csv(os.path.join(save_path, "alert_table_1.csv"), index=False)