In [2]:
import shap
import boto3
import json
import joblib
import numpy as np
from datetime import datetime
import time
import io
import logging
import pickle
import pandas as pd
import shap

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [3]:
# Set up CloudWatch Logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)

start_time = time.time()

# Initialize boto3 S3 client
s3_client = boto3.client('s3')


bucket_name = 'zero-trust-ml-dataset'
model_key = 'models/svm_pipeline.pkl'  
X_train_key = 'X_train_ext.csv'
X_test_key = 'X_test_ext.csv'
y_train_key = 'y_train_ext.csv'
y_test_key = 'y_test_ext.csv'


model_obj = s3_client.get_object(Bucket=bucket_name, Key=model_key)
model_data = model_obj['Body'].read()  


svm_pipeline = joblib.load(io.BytesIO(model_data))

logger.info(f"[Model] Model loaded directly from S3")

# Function to load CSV data from S3
def load_csv_from_s3(bucket, key):
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_csv(obj['Body'])

# Download the CSV data from S3
X_train_bin = load_csv_from_s3(bucket_name, X_train_key)
X_test_bin = load_csv_from_s3(bucket_name, X_test_key)
y_train_bin = load_csv_from_s3(bucket_name, y_train_key)
y_test_bin = load_csv_from_s3(bucket_name, y_test_key)

logger.info(f"[Data] Datasets loaded from S3: X_train={X_train_bin.shape}, X_test={X_test_bin.shape}")

In [4]:
background_data = shap.sample(X_train_bin, 1)
explainer = shap.KernelExplainer(svm_pipeline.predict_proba, background_data)

sample_data = X_test_bin[0:1]

# SHAP value calculation
shap_values = explainer.shap_values(sample_data)

logger.info(f"[SHAP] SHAP values for the sample: {shap_values}")

  0%|          | 0/1 [00:00<?, ?it/s]

In [5]:
# Load the datasets
X_train_bin = load_csv_from_s3(bucket_name, X_train_key)
X_test_bin = load_csv_from_s3(bucket_name, X_test_key)
y_train_bin = load_csv_from_s3(bucket_name, y_train_key)
y_test_bin = load_csv_from_s3(bucket_name, y_test_key)

logger.info(f"[Data] Datasets loaded from S3: {X_train_bin.shape}, {X_test_bin.shape}")

In [6]:
simulated_probability = 0.9  #define threat level
threat_level = round(0.1 + (0.9 * simulated_probability), 2)  

assert threat_level > 0.8, "Simulated threat level is not high enough."

logger.info(f"[SHAP] Threat level calculated as: {threat_level} based on simulated probability of {simulated_probability}")


logger.info(f"[SHAP] Explanation of threat level {threat_level}:")
for i, feature in enumerate(feature_names):
    feature_impact = shap_values[0][i]
    if isinstance(feature_impact, np.ndarray):
        feature_impact = feature_impact[0]
    logger.info(f"[SHAP] Feature '{feature}' contributed {feature_impact:.3f} to the model prediction")

In [7]:
def policy_action(threat_score):
    if threat_score < 0.5:
        return "Allow"
    elif 0.5 <= threat_score < 0.8:
        return "Require MFA"
    else:
        return "Revoke/Terminate Session"

# action based on threat level
action = policy_action(threat_level)

simulated_threat_data = {
    'threat_level': threat_level,
    'user_id': 'user123',
    'prediction': svm_pipeline.predict(X_test_bin[0:1])[0],  # Get the prediction
    'shap_values': shap_values[0].tolist(),
    'recommended_action': action
}

logger.info(f"[SHAP] Simulated threat data: {simulated_threat_data}")





In [8]:
# Get the prediction (svm_pipeline)
prediction = svm_pipeline.predict(X_test_bin[0:1])[0]  

simulated_threat_data = {
    'threat_level': threat_level,  
    'user_id': 'user123',
    'prediction': prediction,  
    'shap_values': shap_values[0].tolist(),  
    'recommended_action': action 
}

logger.info(f"[SHAP] Simulated threat data: {simulated_threat_data}")




In [9]:
# Convert numpy.int64 
def convert_to_native_types(obj):
    if isinstance(obj, np.int64):
        return int(obj)
    elif isinstance(obj, np.float64):
        return float(obj)
    elif isinstance(obj, list):
        return [convert_to_native_types(i) for i in obj]
    elif isinstance(obj, dict):
        return {key: convert_to_native_types(value) for key, value in obj.items()}
    return obj


simulated_threat_data = convert_to_native_types(simulated_threat_data)

lambda_client = boto3.client('lambda')


def invoke_lambda(threat_data):
    response = lambda_client.invoke(
        FunctionName='External-test',  
        InvocationType='Event',  
        Payload=json.dumps(threat_data)  
    )
    logger.info(f"[SHAP] Lambda invoked: {response}")  

# Trigger the Lambda 
invoke_lambda(simulated_threat_data)

end_time = time.time()
logger.info(f"[SHAP] Time for simulation: {end_time - start_time:.2f} seconds")

dt = datetime.now()
logger.info(f"[SHAP] Current system time: {dt}")