<a href="https://colab.research.google.com/github/ACTP2002/EVIDENCE/blob/behavior_model/test_model_v2_json_input.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --no-cache-dir -U numpy pandas scipy scikit-learn joblib tensorflow shap h2o

Collecting numpy
  Downloading numpy-2.4.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (6.6 kB)
Collecting pandas
  Downloading pandas-3.0.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.5/79.5 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
Collecting scipy
  Downloading scipy-1.17.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m198.8 MB/s[0m eta [36m0:00:00[0m
Collecting scikit-learn
  Downloading scikit_learn-1.8.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (11 kB)
Collecting tensorflow
  Downloading tensorflow-2.20.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting h2o
  Downloading h2o-3.46.0.9-py2.py3-none-any.whl.metadata (2.1 kB)
Collecting tensorboard~=2.20.0 (from tensorflow)

In [None]:
import numpy as np
import pandas as pd
from scipy.stats import median_abs_deviation
import random
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

import h2o
from h2o.estimators import H2OExtendedIsolationForestEstimator
from h2o.estimators import H2OPrincipalComponentAnalysisEstimator

import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

import shap
import joblib

import os
import json


In [None]:
class BehaviorInferenceEngine:
  RISK_MAPPING = {

    # Monetary Behavior
    "mod_z_score_abs": {
        "category": "Monetary Deviation",
        "template": "Transaction amount deviates from user's historical behavior."
    },
    "ewma_resid": {
        "category": "Monetary Deviation",
        "template": "Transaction differs from recent spending trend."
    },
    "net_flow_1d": {
        "category": "Liquidity Shift",
        "template": "Unusual daily net cash flow movement detected."
    },

    # Temporal Behavior
    "gap_log": {
        "category": "Temporal Anomaly",
        "template": "Transaction timing gap is inconsistent with prior activity."
    },

    # Access Risk
    "login_count_1h": {
        "category": "Access Risk",
        "template": "Abnormal login frequency observed."
    },
    "failed_login_ratio_1h": {
        "category": "Access Risk",
        "template": "Elevated failed login attempts detected."
    },
    "new_ip_1d": {
        "category": "Access Risk",
        "template": "Transaction initiated from a new IP address."
    },

    # Geographic Risk
    "is_cross_border": {
        "category": "Geolocation Risk",
        "template": "Transaction occurred outside user's residence country."
    }
}

  def __init__(self, model_dir="behavior_assets"):
    self.model_dir = model_dir
    self.cat_features = ["currency", "channel", "event_type", "geo_country"]
    self.num_features = ["mod_z_score_abs", "ewma_resid", "gap_log", "net_flow_1d", "login_count_1h", "failed_login_ratio_1h", "new_ip_1d", "is_cross_border"]
    self.lstm_features = ["amount_abs", "gap_log", "amount_to_income_ratio", "net_flow_1d", "deposit_to_income_ratio", "mod_z_score_abs", "ewma_resid"]

  # HELPER FUNCTIONS FOR SHAP
  def _classify_severity(self, score, is_anomaly):
      if score < 0.30 or not is_anomaly:
          return "LOW"
      elif score < 0.70:
          return "MEDIUM"
      else:
          return "HIGH"

  def _apply_confidence(self, df, threshold, std):
      k = 3.0 / (std + 1e-9) # Using 3.0 makes the curve slightly smoother

      # Calculate distance from threshold
      # If positive, it's an anomaly; if negative, it's normal
      diff = df["final_score"] - threshold

      # Apply Sigmoid
      # This maps scores:
      # Much higher than threshold -> ~1.0
      # Exactly threshold -> 0.5
      # Much lower than threshold -> ~0.0 (meaning 100% confident it is NORMAL)
      conf = 1 / (1 + np.exp(-k * diff))

      # Confidence in the decision (0.5 to 1)
      return np.where(conf >= 0.5, conf, 1 - conf)

  def _generate_human_explanation(self, feature, value, shap_value):

      direction = "increased" if shap_value > 0 else "reduced"

      def safe_float(v):
          try:
              return float(v)
          except:
              return None

      value = safe_float(value)

      explanations = {
          "mod_z_score_abs": (
              f"Transaction amount deviates significantly from user's normal behavior "
              f"(Z-score={value:.2f}). This {direction} anomaly risk."
              if value is not None else
              f"Transaction amount deviates significantly from user's normal behavior. "
              f"This {direction} anomaly risk."
          ),

          "ewma_resid": (
              f"Recent transaction amount differs from short-term trend "
              f"(EWMA residual={value:.2f}). This {direction} anomaly risk."
              if value is not None else
              f"Recent transaction amount differs from short-term trend. "
              f"This {direction} anomaly risk."
          ),

          "gap_log": (
              f"Transaction timing gap is unusual compared to prior activity "
              f"(gap_log={value:.2f}). This {direction} anomaly risk."
              if value is not None else
              f"Transaction timing gap is unusual compared to prior activity. "
              f"This {direction} anomaly risk."
          ),

          "net_flow_1d": (
              f"Daily net cash flow shift detected "
              f"(net_flow_1d={value:.2f}). This {direction} anomaly risk."
              if value is not None else
              f"Daily net cash flow shift detected. "
              f"This {direction} anomaly risk."
          ),

          "login_count_1h": (
              f"Abnormal login frequency in past hour "
              f"(count={int(value)}). This {direction} anomaly risk."
              if value is not None else
              f"Abnormal login frequency in past hour. "
              f"This {direction} anomaly risk."
          ),

          "failed_login_ratio_1h": (
              f"Elevated failed login attempts ratio "
              f"({value:.2f}). This {direction} anomaly risk."
              if value is not None else
              f"Elevated failed login attempts detected. "
              f"This {direction} anomaly risk."
          ),

          "new_ip_1d":
              f"New IP address detected in last 24h. "
              f"This {direction} anomaly risk.",

          "is_cross_border":
              f"Transaction occurred outside user's residence country. "
              f"This {direction} anomaly risk."
      }

      return explanations.get(
          feature,
          f"{feature} contributed to anomaly score and {direction} risk."
      )

  def _compute_shap_batch(self, df_input):

      if isinstance(df_input, dict):
          df_input = pd.DataFrame([df_input])

      h2o_frame = h2o.H2OFrame(df_input)

      for col in self.cat_features:
          if col in df_input.columns:
              h2o_frame[col] = h2o_frame[col].asfactor()

      shap_val = self.surrogate.predict_contributions(h2o_frame)
      shap_df = shap_val.as_data_frame().drop(columns=["BiasTerm"])

      return shap_df

  def _convert_to_risk_evidence(self, feature, shap_value, raw_value):

      if feature not in self.RISK_MAPPING:
          return None

      risk_info = self.RISK_MAPPING[feature]
      direction = "increased" if shap_value > 0 else "reduced"

      explanation_text = self._generate_human_explanation(
          feature, raw_value, shap_value
      )

      return {
          "risk_category": risk_info["category"],
          "feature": feature,
          "impact": direction,
          "contribution": float(shap_value),
          "explanation": explanation_text
      }


  def _build_behavior_output(self, df_input, top_n=2, min_abs_contribution=1e-4):

      if isinstance(df_input, dict):
          df_input = pd.DataFrame([df_input])

      shap_df = self._compute_shap_batch(df_input)

      results = []

      for idx in range(len(df_input)):

          row = df_input.iloc[idx]
          shap_series = shap_df.iloc[idx]

          # Remove tiny noise
          shap_series = shap_series[shap_series.abs() > min_abs_contribution]

          # Sort by absolute contribution
          shap_series = shap_series.reindex(
              shap_series.abs().sort_values(ascending=False).index
          )

          # Top drivers
          top_features = shap_series.head(top_n)

          evidence_list = []

          for feature, shap_value in top_features.items():

              raw_value = row.get(feature, None)

              evidence = self._convert_to_risk_evidence(
                  feature, shap_value, raw_value
              )

              if evidence:
                  evidence_list.append(evidence)

          # -------- Highest impact signal --------
          if evidence_list:
              highest_signal = evidence_list[0]["risk_category"]
          else:
              highest_signal = "No significant anomaly drivers detected."

          # -------- Severity --------
          anomaly_level = float(row["final_score"]/self.threshold)
          severity = self._classify_severity(anomaly_level,row["is_anomaly"])

          # -------- Output --------

          result = {
              "event_time": str(row["event_time"]),
              "txn_id": str(row["txn_id"]),
              "user_id": str(row["user_id"]),
              "is_anomaly": int(row["is_anomaly"]),
              "detector_type": "BEHAVIOR",
              "signal": highest_signal,
              "severity": severity,
              "confidence": str(row["confidence_score"]),
              "evidence": evidence_list
          }

          results.append(result)

      if len(results) == 1:
          return results[0]

      return results

  def save_assets(self, eif, surrogate, lstm, seq_scaler, feature_scaler, cohort_stats, global_stats, threshold, weights, scores_std):
    if not os.path.exists(self.model_dir): os.makedirs(self.model_dir)
    h2o.save_model(model=eif, path=self.model_dir, force=True)
    h2o.save_model(model=surrogate, path=self.model_dir, force=True)
    lstm.save(f"{self.model_dir}/lstm_model.h5")
    joblib.dump(seq_scaler, f"{self.model_dir}/seq_scaler.pkl")
    joblib.dump(feature_scaler, f"{self.model_dir}/feature_scaler.pkl")
    joblib.dump(cohort_stats, f"{self.model_dir}/cohort_stats.pkl")
    joblib.dump(global_stats, f"{self.model_dir}/global_stats.pkl")
    joblib.dump({"threshold": threshold, "weights": weights, "eif_id": eif.model_id, "surr_id": surrogate.model_id, "scores_std": scores_std}, f"{self.model_dir}/meta.pkl")

  def load_assets(self):
    h2o.init()
    meta = joblib.load(f"{self.model_dir}/meta.pkl")
    self.threshold = meta['threshold']
    self.weights = meta['weights']
    self.scores_std = meta['scores_std']
    self.eif = h2o.load_model(f"{self.model_dir}/{meta['eif_id']}")
    self.surrogate = h2o.load_model(f"{self.model_dir}/{meta['surr_id']}")
    self.lstm = tf.keras.models.load_model(f"{self.model_dir}/lstm_model.h5", custom_objects={"mse": tf.keras.losses.MeanSquaredError()})
    self.seq_scaler = joblib.load(f"{self.model_dir}/seq_scaler.pkl")
    self.feature_scaler = joblib.load(f"{self.model_dir}/feature_scaler.pkl")
    self.cohort_stats = joblib.load(f"{self.model_dir}/cohort_stats.pkl")
    self.global_stats = joblib.load(f"{self.model_dir}/global_stats.pkl")

  def _engineer(self, df):
    df = df.copy()
    df["event_time"] = pd.to_datetime(df["event_time"])
    df = df.sort_values("event_time")
    for c in ["currency", "channel", "residence_country", "geo_country", "event_type"]:
        if c in df.columns: df[c] = df[c].astype(str).str.strip().str.lower()

    # Financial ratios calculation
    df["amount_abs"] = df["amount"].abs()
    df["amount_to_income_ratio"] = df["amount_abs"] / (df["declared_income"] + 1e-9)
    df["deposit_to_income_ratio"] = df["account_deposit"] / (df["declared_income"] + 1e-9)
    df["net_flow_1d"] = df["amount_in_1d"] - df["amount_out_1d"]

    # Location
    df["is_cross_border"] = (df["residence_country"] != df["geo_country"]).astype(int)
    df["failed_login_ratio_1h"] = df["failed_login_1h"] / (df["login_count_1h"] + 1e-9)
    df["new_ip_1d"] = df["new_ip_1d"].fillna(0)
    df["geo_change_1d"] = df["geo_change_1d"].fillna(0)

    # Transaction gap
    df["gap_seconds"] = df["event_time"].diff().dt.total_seconds().fillna(self.global_stats['median_gap'])
    df["gap_log"] = np.log1p(df["gap_seconds"])

    # Window period
    df["user_median_15"] = df["amount_abs"].rolling(window=15, min_periods=1).median()
    df["user_mad_15"] = df["amount_abs"].rolling(window=15, min_periods=1).apply(lambda x: median_abs_deviation(x, scale='normal') if len(x)>1 else self.global_stats['mad'], raw=False)

    for col in ["cohort_median", "cohort_mad"]:
      if col in df.columns:
          df = df.drop(columns=col)

    df = df.merge(self.cohort_stats, on=["currency", "geo_country", "channel", "event_type"], how="left")
    b_med = df["user_median_15"].fillna(df["cohort_median"]).fillna(self.global_stats['median_amt'])
    b_mad = df["user_mad_15"].fillna(df["cohort_mad"]).fillna(self.global_stats['mad'])
    df["mod_z_score_abs"] = (0.6745 * (df["amount_abs"] - b_med) / (b_mad + 1e-9)).abs()
    df["ewma_resid"] = (df["amount_abs"] - df["amount_abs"].ewm(span=8).mean()).abs()
    return df

  def predict(self, raw_json):
    # 1. Parse & Engineer
    input_df = pd.DataFrame(json.loads(raw_json))
    feat_df = self._engineer(input_df)

    # 2. Points Score (EIF)
    h2o_fr = h2o.H2OFrame(feat_df[self.cat_features + self.num_features])
    for c in self.cat_features: h2o_fr[c] = h2o_fr[c].asfactor()
    feat_df["iforest_score"] = self.eif.predict(h2o_fr)["anomaly_score"].as_data_frame().iloc[:, 0].values

    # 3. Sequence Score (LSTM)
    scaled_lstm = self.seq_scaler.transform(feat_df[self.lstm_features].fillna(0))
    if len(scaled_lstm) >= 20:
        seq = np.array([scaled_lstm[-20:]])
        feat_df.loc[feat_df.index[-1], "lstm_score"] = np.mean((seq - self.lstm.predict(seq, verbose=0))**2)
    else:
        feat_df["lstm_score"] = self.global_stats['lstm_median']

    # 4. Ensemble
    detector_cols = ["mod_z_score_abs", "ewma_resid", "iforest_score", "lstm_score"]
    n_vals = self.feature_scaler.transform(feat_df[detector_cols].fillna(0))
    feat_df["final_score"] = np.sum([self.weights[c] * n_vals[:, i] for i, c in enumerate(detector_cols)], axis=0)
    feat_df["is_anomaly"] = (feat_df["final_score"] >= self.threshold).astype(int)
    feat_df["confidence_score"] = self._apply_confidence(feat_df, self.threshold, self.scores_std)

    # 5. SHAP & Formatting
    final_prediction = self._build_behavior_output(feat_df)

    return final_prediction


In [None]:
import json


class EventMerger:

    def __init__(self, auth_file, profile_file, status_file, txn_file):
        self.auth = self._load_json(auth_file)
        self.profile = self._load_json(profile_file)
        self.status = self._load_json(status_file)
        self.txn = self._load_json(txn_file)

    @staticmethod
    def _load_json(file_path):
        with open(file_path, "r") as f:
            return json.load(f)

    @staticmethod
    def _safe_number(value):
        if value is None:
            return None
        try:
            return float(value)
        except (ValueError, TypeError):
            return None

    def merge(self):
        user_id = self.txn.get("user_id")

        output = {
            "user_id": user_id,

            # Transaction fields
            "txn_id": self.txn.get("data", {}).get("txn_id"),
            "event_time": self.txn.get("event_time"),
            "event_type": self.txn.get("event_type"),
            "amount": self.txn.get("data", {}).get("amount"),
            "currency": str(self.txn.get("data", {}).get("currency", "")).lower(),
            "channel": self.txn.get("data", {}).get("channel"),

            # Profile fields
            "declared_income": self._safe_number(
                self.profile.get("kyc", {}).get("income")
            ),
            "account_deposit": self._safe_number(
                self.profile.get("account", {}).get("account_deposit")
            ),
            "residence_country": str(
                self.profile.get("kyc", {}).get("residence_country", "")
            ).lower(),

            # Auth / Geo fields
            "geo_country": str(
                self.auth.get("data", {}).get("geo", {}).get("country", "")
            ).lower(),

            # Status - Transaction Metrics
            "amount_in_1d": self.status.get("txn", {}).get("amount_in_1d"),
            "amount_out_1d": self.status.get("txn", {}).get("amount_out_1d"),

            # Status - Auth Metrics
            "login_count_1h": self.status.get("auth", {}).get("login_count_1h"),
            "failed_login_1h": self.status.get("auth", {}).get("failed_login_1h"),
            "new_ip_1d": bool(self.status.get("auth", {}).get("new_ip_1d")),
            "geo_change_1d": bool(self.status.get("network", {}).get("geo_change_1d"))
        }
        output_format = [output]
        return json.dumps(output_format, indent=4)




In [None]:
# 1. Load the model (Imagine this is a fresh server)
prod_engine = BehaviorInferenceEngine()
prod_engine.load_assets()

# 2. Input JSON
# test_data_path = "test_transactions 2.csv"
# df_test = pd.read_csv(test_data_path)
merger = EventMerger("auth_sample.json","profile_sample.json","status_sample.json","transaction_sample.json")

json_anomaly_test = merger.merge()

# raw_input = df_test[df_test['user_id'] == 'U1001'].to_json(orient='records')
# raw_input = df_test.iloc[9:10].to_json(orient='records')
# print(raw_input)
print(json_anomaly_test)

# 3. Predict!
result_json = prod_engine.predict(json_anomaly_test)
print(json.dumps(result_json, indent=2))

Checking whether there is an H2O instance running at http://localhost:54321. connected.


0,1
H2O_cluster_uptime:,17 mins 31 secs
H2O_cluster_timezone:,Etc/UTC
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.46.0.9
H2O_cluster_version_age:,2 months and 20 days
H2O_cluster_name:,H2O_from_python_unknownUser_saamqs
H2O_cluster_total_nodes:,1
H2O_cluster_free_memory:,3.100 Gb
H2O_cluster_total_cores:,2
H2O_cluster_allowed_cores:,2




[
    {
        "user_id": "u_001",
        "txn_id": "T-883192",
        "event_time": "2026-02-07T09:15:12.345Z",
        "event_type": "buy",
        "amount": 1083.21,
        "currency": "usd",
        "channel": "web",
        "declared_income": 5000.0,
        "account_deposit": 52000.0,
        "residence_country": "my",
        "geo_country": "my",
        "amount_in_1d": 0,
        "amount_out_1d": 50000,
        "login_count_1h": 1,
        "failed_login_1h": 0,
        "new_ip_1d": true,
        "geo_change_1d": false
    }
]
Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
extendedisolationforest prediction progress: |███████████████████████████████████| (done) 100%





Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
contributions progress: |████████████████████████████████████████████████████████| (done) 100%
{
  "event_time": "2026-02-07 09:15:12.345000+00:00",
  "txn_id": "T-883192",
  "user_id": "u_001",
  "is_anomaly": 1,
  "detector_type": "BEHAVIOR",
  "signal": "Liquidity Shift",
  "severity": "HIGH",
  "confidence": "0.986558766168058",
  "evidence": [
    {
      "risk_category": "Liquidity Shift",
      "feature": "net_flow_1d",
      "impact": "increased",
      "contribution": 0.1242170557379722,
      "explanation": "Daily net cash flow shift detected (net_flow_1d=-50000.00). This increased anomaly risk."
    },
    {
      "risk_category": "Monetary Deviation",
      "feature": "ewma_resid",
      "impact": "reduced",
      "contribution": -0.0055617871694266,
      "explanation": "Recent transaction amount differs from short-term trend (EWMA residual=0.00). This reduced anomaly risk."
    


