In [1]:
import pandas as pd
import numpy as np
from datetime import datetime
import re
from sklearn.ensemble import IsolationForest
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
import joblib
import warnings
warnings.filterwarnings('ignore')

# Load the dataset
data = pd.read_csv('synthetic_access_data_10000.csv')

# Function to convert time_in_position to months
def convert_to_months(time_str):
    if pd.isna(time_str):
        return 0
    time_str = str(time_str).lower().strip()
    if 'year' in time_str or 'years' in time_str:
        years = int(re.search(r'\d+', time_str).group())
        return years * 12
    elif 'month' in time_str or 'months' in time_str:
        months = int(re.search(r'\d+', time_str).group())
        return months
    try:
        return int(time_str)  # Handle cases where it's already a number
    except ValueError:
        return 0  # Default to 0 for unparseable values

# Function to calculate days since a date
def calculate_days_since(date_str, current_date="2025-04-05"):
    if pd.isna(date_str) or date_str == "Never" or date_str == "invalid_date":
        return 365 * 5  # Default to 5 years if missing
    try:
        date_obj = datetime.strptime(date_str, "%Y-%m-%d")
        current_date_obj = datetime.strptime(current_date, "%Y-%m-%d")
        return (current_date_obj - date_obj).days
    except ValueError:
        return 365 * 5

# Clean and preprocess the data
data['time_in_position'] = data['time_in_position'].apply(convert_to_months)

# Convert date columns to days since
for date_col in ['last_security_training', 'employee_join_date']:
    data[f'days_since_{date_col}'] = data[date_col].apply(calculate_days_since)

# Select relevant features (just the 10 specified features)
categorical_features = [
    "user_role", "department", "employee_status", "resource_type",
    "resource_sensitivity", "request_reason"
]
numerical_features = [
    "time_in_position", "past_violations", 
    "days_since_last_security_training", "days_since_employee_join_date"
]

# Create feature set
X = data[categorical_features + numerical_features]
y = data["is_approved"]

# Filter approved cases (is_approved == 1)
X_approved = X[y == 1]

# Define preprocessor
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features),
        ('num', MinMaxScaler(), numerical_features)
    ]
)

# Fit preprocessor and transform approved data
X_approved_transformed = preprocessor.fit_transform(X_approved)

# Train Isolation Forest
anomaly_model = IsolationForest(
    contamination=0.1,  # 10% of observations considered anomalies
    max_samples='auto',
    random_state=42,
    n_estimators=100
)
anomaly_model.fit(X_approved_transformed)

# Save the model and preprocessor
joblib.dump(anomaly_model, 'anomaly_model.pkl')
joblib.dump(preprocessor, 'anomaly_preprocessor.pkl')
print("Anomaly model and preprocessor saved successfully")

# Function to predict anomalies
def predict_anomaly(request, preprocessor, model):
    # Convert request to DataFrame
    request_df = pd.DataFrame([request])
    
    # Convert time_in_position to months if needed
    if isinstance(request['time_in_position'], str):
        request_df['time_in_position'] = request_df['time_in_position'].apply(convert_to_months)
    
    # Convert dates to days_since
    for date_col in ['last_security_training', 'employee_join_date']:
        request_df[f'days_since_{date_col}'] = request_df[date_col].apply(calculate_days_since)
    
    # Select features in correct order
    request_features = request_df[categorical_features + numerical_features]
    
    # Transform with preprocessor
    request_transformed = preprocessor.transform(request_features)
    
    # Predict anomaly
    anomaly_score = model.score_samples(request_transformed)[0]
    is_anomaly = model.predict(request_transformed)[0]
    
    return {
        'is_anomaly': is_anomaly == -1,
        'anomaly_score': anomaly_score,
        'decision': "Anomaly" if is_anomaly == -1 else "Normal"
    }

# Test with example data
example_data = {
    "user_role": "Employee",
    "department": "Sales",
    "employee_status": "Full-time",
    "resource_type": "pdf",
    "resource_sensitivity": "confidential",
    "request_reason": "Routine check",
    "time_in_position": 48,  # months
    "past_violations": 90,
    "last_security_training": "2024-07-01",
    "employee_join_date": "2019-03-01"
}

# Check if the example is an anomaly
result = predict_anomaly(example_data, preprocessor, anomaly_model)
print("\nExample Request Evaluation:")
print(f"Decision: {result['decision']}")
print(f"Anomaly Score: {result['anomaly_score']:.4f}")
print(f"Is Anomaly: {result['is_anomaly']}")

# Additional example with a likely anomaly
anomaly_example = {
    "user_role": "Intern",
    "department": "Finance",
    "employee_status": "Temporary",
    "resource_type": "financial_report",
    "resource_sensitivity": "restricted",
    "request_reason": "Personal use",
    "time_in_position": 1,  # months
    "past_violations": 3,
    "last_security_training": "2024-02-01",
    "employee_join_date": "2025-03-01"
}

# Check if this second example is an anomaly
result = predict_anomaly(anomaly_example, preprocessor, anomaly_model)
print("\nSecond Example (Likely Anomaly) Evaluation:")
print(f"Decision: {result['decision']}")
print(f"Anomaly Score: {result['anomaly_score']:.4f}")
print(f"Is Anomaly: {result['is_anomaly']}")

Anomaly model and preprocessor saved successfully

Example Request Evaluation:
Decision: Normal
Anomaly Score: -0.4942
Is Anomaly: False

Second Example (Likely Anomaly) Evaluation:
Decision: Normal
Anomaly Score: -0.5006
Is Anomaly: False
