In [None]:
import pandas as pd
from openai import OpenAI
from langgraph.graph import StateGraph, START, END
from datetime import datetime
from typing import Dict, List, Any, Optional, TypedDict
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import os
import random
import re

import logging
from typing import List, Dict
from collections import defaultdict
from scipy.stats import dirichlet
from openai import OpenAI  # Ensure OpenAI library is installed

In [None]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

In [3]:
# Define file paths for the CERT dataset
logon_file = "r4.1/logon.csv"
file_access_file = "r4.1/file.csv"
email_file = "r4.1/email.csv"
device_file = "r4.1/device.csv"
http_file = "r4.1/http.csv"
psychometric_file = "r4.1/psychometric.csv"
ground_truth_file = "answers/r4.1-1.csv"

# Load dataset
logon_df = pd.read_csv(logon_file)
file_access_df = pd.read_csv(file_access_file)
email_df = pd.read_csv(email_file)
device_df = pd.read_csv(device_file)
http_df = pd.read_csv(http_file)
psychometric_df = pd.read_csv(psychometric_file)
ground_truth_df = pd.read_csv(ground_truth_file, names=["log_type", "id", "date", "user", "pc", "activity_or_url", "content"], header=None)

# Define the CERT data dictionary
data_dict = {
    'logon': logon_df,
    'file_access': file_access_df,
    'email': email_df,
    'device': device_df,
    'http': http_df
}

In [4]:
# Print the shape and first few rows of each DataFrame to verify
for key, df in data_dict.items():
    print(f"Dataset: {key}, Shape: {df.shape}")
    print(df.head())


Dataset: logon, Shape: (899118, 5)
                         id                 date     user       pc activity
0  {X0W9-Q2DW16EI-1074QDVQ}  01/02/2010 05:02:50  WCR0044  PC-9174    Logon
1  {C2O4-Z2RH12FQ-9176MUEL}  01/02/2010 05:19:09  WCR0044  PC-9174   Logoff
2  {U1J8-P4HX02EV-5327GONH}  01/02/2010 06:22:11  WCR0044  PC-5494    Logon
3  {F1N9-G4ZL24LA-8747VGHG}  01/02/2010 06:33:00  LRG0155  PC-0450    Logon
4  {Y1Q0-U9BN24NB-1906LMVT}  01/02/2010 06:42:00  RHM0148  PC-8152    Logon
Dataset: file_access, Shape: (414556, 6)
                         id                 date     user       pc  \
0  {Q5M2-N0JR22AA-1727ALDU}  01/02/2010 05:15:35  WCR0044  PC-9174   
1  {O2W1-I9JA58XQ-7973YYAK}  01/02/2010 05:16:04  WCR0044  PC-9174   
2  {O0T0-H0OD25UT-0238OGNR}  01/02/2010 05:16:25  WCR0044  PC-9174   
3  {Y5P3-S3DI98OH-7978PRAW}  01/02/2010 05:16:28  WCR0044  PC-9174   
4  {C5T9-R4OO02RR-0266RBKQ}  01/02/2010 05:16:29  WCR0044  PC-9174   

       filename                                

In [13]:
class UncertaintyAwareRLToolBuilderAgent:
    def __init__(self, api_key, learning_rate=0.1, discount_factor=0.9, exploration_rate=0.1, max_retries=10, num_actions=3):
        self.api_key = api_key
        self.client = OpenAI(api_key=api_key)
        self.tools = {}
        self.q_table = defaultdict(lambda: defaultdict(lambda: 1e-3))  # Initialize Q-values to a small positive value
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.max_retries = max_retries
        self.num_actions = num_actions  # Number of available actions
        self.uncertainties = {"vacuity": [], "dissonance": [], "entropy": []}

    def choose_action(self, state: str) -> int:
        """Choose an action based on the state and Q-table using Dirichlet probabilities."""
        action_probabilities = self.get_action_probabilities(state)
        
        # Handle invalid probabilities
        if np.any(np.isnan(action_probabilities)):
            logging.error(f"Action probabilities contain NaN for state '{state}'. Defaulting to uniform probabilities.")
            action_probabilities = np.full(self.num_actions, 1 / self.num_actions)
        
        action = np.random.choice(range(self.num_actions), p=action_probabilities)
        logging.info(f"Chosen action with probabilities {action_probabilities}: {action}")
        return action

    def get_action_probabilities(self, state: str) -> List[float]:
        """Compute action probabilities using Dirichlet distribution."""
        action_counts = [self.q_table[state].get(a, 0) + 1e-3 for a in range(self.num_actions)]
        
        # Ensure all parameters are positive for the Dirichlet distribution
        if any(count <= 0 for count in action_counts):
            logging.warning(f"Invalid Dirichlet parameters for state '{state}': {action_counts}. Fixing to minimum value.")
            action_counts = [max(count, 1e-3) for count in action_counts]
        
        probabilities = dirichlet.rvs(action_counts, size=1).flatten()
        
        # Normalize probabilities to ensure they sum to 1
        probabilities = probabilities / probabilities.sum() if probabilities.sum() > 0 else np.full(self.num_actions, 1 / self.num_actions)
        
        return probabilities


    def execute_with_retry(self, func, df, max_retries=3, **kwargs):
        """
        Executes a function with retry mechanism. Fixes known issues like index mismatch.
        """
        attempt = 0
        while attempt < max_retries:
            try:
                return func(df, **kwargs)
            except ValueError as ve:
                logging.error(f"Execution attempt {attempt + 1} failed: {ve}")
                if "incompatible index of inserted column" in str(ve):
                    df = df.reset_index(drop=True)  # Reset index to fix the mismatch
                    logging.info("Index mismatch detected. Resetting dataframe index and retrying.")
                else:
                    logging.error(f"Unhandled ValueError: {ve}")
                    raise ve
            except Exception as e:
                logging.error(f"Execution attempt {attempt + 1} failed: {e}")
            attempt += 1
        logging.error(f"Function execution failed after {max_retries} attempts.")
        return None

    def update_uncertainty_measures(self, action_probabilities: List[float]):
        """Update vacuity, dissonance, and entropy measures."""
        vacuity = 1 - sum(action_probabilities) / self.num_actions
        dissonance = sum(
            abs(a_i - a_j)
            for i, a_i in enumerate(action_probabilities)
            for j, a_j in enumerate(action_probabilities)
            if i != j
        ) / (2 * self.num_actions)
        entropy = -sum(p * np.log2(p) if p > 0 else 0 for p in action_probabilities) / np.log2(self.num_actions)

        self.uncertainties["vacuity"].append(vacuity)
        self.uncertainties["dissonance"].append(dissonance)
        self.uncertainties["entropy"].append(entropy)

        logging.info(f"Updated uncertainty measures: Vacuity={vacuity}, Dissonance={dissonance}, Entropy={entropy}")

    def generate_tool_code(self, task_name: str, requirements: str, prompt_style: str, columns: List[str], custom_prompt: str = None) -> str:
        """
        Generate code for a specific tool using GPT.
        Accepts a custom prompt if provided.
        """
        prompt = custom_prompt or f"""
        Generate a complete, well-formatted Python function named '{task_name}' to detect {requirements}.
        Ensure column names ({', '.join(columns)}) are validated, with detailed error handling.
        """

        for attempt in range(self.max_retries):
            try:
                response = self.client.chat.completions.create(
                    model="gpt-4",
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=2000
                )
                raw_text = response.choices[0].message.content.strip()
                code = self.extract_code(raw_text)
                if code and self.is_code_valid(code):
                    logging.info(f"Generated code after {attempt + 1} attempt(s)")
                    return code
            except Exception as e:
                logging.error(f"OpenAI API error: {e}")
                continue

            logging.warning(f"Retry {attempt + 1}/{self.max_retries}: Syntax issue, retrying...")
        return None

    @staticmethod
    def extract_code(response_text: str) -> str:
        """Extract Python code block from GPT response."""
        code_match = re.search(r"```python\n(.*?)\n```", response_text, re.DOTALL)
        return code_match.group(1) if code_match else response_text

    @staticmethod
    def is_code_valid(code: str) -> bool:
        """Check if the generated code is syntactically valid."""
        try:
            compile(code, "<string>", "exec")
            return True
        except SyntaxError as e:
            logging.error(f"Syntax issue: {e}")
            return False

    def evaluate_tool(self, tool_name: str):
        tool_func = self.tools.get(tool_name)
        if tool_func:
            test_data = self._get_test_data(tool_name)  # Use the defined method
            try:
                return self.execute_with_retry(tool_func, test_data)
            except Exception as e:
                logging.error(f"Error during evaluation: {e}")
                return -5
        logging.error(f"Tool {tool_name} not found.")
        return -10

    def create_tool(self, task_name: str, requirements: str, columns: List[str]):
        """Main function to create and evaluate a tool."""
        state = task_name
        action = self.choose_action(state)

        tool_code = self.generate_tool_code(task_name, requirements, "default_prompt", columns)
        if tool_code is None:
            logging.error(f"Code generation failed for {task_name} after maximum retries.")
            self.update_q_table(state, action, -10)
            return

        try:
            # Debug: Print the generated code
            print("Generated Code:")
            print(tool_code)
            
            exec(tool_code, globals())
            func_name = task_name.replace(' ', '_')
            # Debug: Check if the function exists in globals
            print(f"Looking for function: {func_name} in globals.")
            self.tools[task_name] = globals().get(func_name)
            
            if self.tools[task_name] is None:
                logging.error(f"Function {func_name} was not created successfully.")
                self.update_q_table(state, action, -10)
                return

            reward = self.evaluate_tool(func_name)
            self.update_q_table(state, action, reward)
        except Exception as e:
            logging.error(f"Tool generation failed for {task_name}: {e}")
        self.update_q_table(state, action, -10)

    def update_q_table(self, state: str, action: int, reward: float):
        """Update Q-table based on the RL update rule, ensuring non-negative Q-values."""
        current_q = self.q_table[state][action]
        best_future_q = max(self.q_table[state].values(), default=0)
        new_q = max(
            current_q + self.learning_rate * (reward + self.discount_factor * best_future_q - current_q),
            1e-3  # Ensure Q-values are at least 1e-3
        )
        self.q_table[state][action] = new_q
        logging.info(f"Q-table updated: State={state}, Action={action}, Q-value={new_q}")

    def _get_test_data(self, tool_name: str):
        # Example test data based on expected input
        return pd.DataFrame({
            "user": ["WCR0044", "LRG0155"],
            "date": ["2024-01-02 05:02:50", "2024-01-02 06:33:00"],
            "pc": ["PC-9174", "PC-0450"],
            "activity": ["Logon", "Logoff"],
            "content": ["Login successful", "User logged out"]
        })


In [None]:


# Initialize logging
logging.basicConfig(level=logging.INFO)

# Assume `UncertaintyAwareRLToolBuilderAgent` is already defined and tested.

# Decomposer Agent
class DecomposerAgent:
    def __init__(self, task_name: str):
        self.task_name = task_name

    def decompose_task(self) -> Dict[str, str]:
        log_types = ['logon', 'psychometric', 'file_access', 'email', 'device']
        subtasks = {log_type: f"{self.task_name}_{log_type}" for log_type in log_types}
        logging.info(f"Decomposed task '{self.task_name}' into subtasks: {list(subtasks.values())}")
        return subtasks


# Executor Node
class ExecutorNode:
    def __init__(self, tool_name: str, tool_func):
        self.tool_name = tool_name
        self.tool_func = tool_func

    def execute(self, df: pd.DataFrame, *args, **kwargs):
        if not callable(self.tool_func):
            logging.error(f"Tool {self.tool_name} is not callable.")
            return None
        try:
            return self.tool_func(df, *args, **kwargs)
        except Exception as e:
            logging.error(f"Error executing tool {self.tool_name}: {e}")
            return None


# Anomaly Aggregator Agent
class AnomalyAggregatorAgent:
    def aggregate(self, results: Dict[str, pd.DataFrame]):
        aggregated_results = {}
        for log_type, result in results.items():
            if isinstance(result, pd.DataFrame):
                if result.empty:
                    aggregated_results[log_type] = {
                        "anomalies": None,
                        "reasons": f"No anomalies detected (empty DataFrame) for {log_type}"
                    }
                else:
                    aggregated_results[log_type] = {
                        "anomalies": result,
                        "reasons": f"Flagged {len(result)} suspicious entries for {log_type}"
                    }
            elif result is None:
                aggregated_results[log_type] = {
                    "anomalies": None,
                    "reasons": f"No anomalies detected or tool failed for {log_type}"
                }
            else:
                logging.warning(f"Unexpected result type for {log_type}: {type(result).__name__}")
                aggregated_results[log_type] = {
                    "anomalies": None,
                    "reasons": f"Tool returned an invalid result for {log_type}"
                }
        logging.info(f"Final Aggregated Anomalies:\n{aggregated_results}")
        return aggregated_results




# Initialize agents
task_name = "Detect_Suspicious_Activity"
decomposer_agent = DecomposerAgent(task_name)
anomaly_aggregator_agent = AnomalyAggregatorAgent()
rl_tool_builder_agent = UncertaintyAwareRLToolBuilderAgent(
    api_key= OPENAI_API_KEY,
    learning_rate=0.1,
    discount_factor=0.9,
    exploration_rate=0.1,
    max_retries=10,
    num_actions=3
)

data_sources = {
    "logon": pd.DataFrame([
        ["WCR0044", "2024-01-02 05:02:50", "PC-9174", "Logon", "Login successful"],
        ["WCR0044", "2024-01-02 06:15:30", "PC-9174", "Logon", "Login successful"],
        ["LRG0155", "2024-01-02 06:33:00", "PC-0450", "Logoff", "User logged out"],
        ["XTR0011", "2024-01-02 07:45:00", "PC-1234", "Logon", "Login successful"],
        ["FLX0909", "2024-01-02 08:15:00", "PC-9987", "Logon", "Multiple failed login attempts"],
    ], columns=["user", "date", "pc", "activity", "content"]),

    "psychometric": pd.DataFrame([
        {"user": "WCR0044", "trait": "openness", "value": 0.8},
        {"user": "LRG0155", "trait": "conscientiousness", "value": 0.6},
        {"user": "FLX0909", "trait": "neuroticism", "value": 0.9},
    ]),

    "file_access": pd.DataFrame([
        {"user": "WCR0044", "file": "confidential.pdf", "access_type": "read", "timestamp": "2024-01-02 05:15:00"},
        {"user": "LRG0155", "file": "project_plan.docx", "access_type": "write", "timestamp": "2024-01-02 06:40:00"},
        {"user": "FLX0909", "file": "budget.xlsx", "access_type": "delete", "timestamp": "2024-01-02 07:30:00"},
    ]),

    "email": pd.DataFrame([
        {"user": "LRG0155", "email_content": "Suspicious email content", "timestamp": "2024-01-02 06:50:00"},
        {"user": "FLX0909", "email_content": "Confidential project details", "timestamp": "2024-01-02 07:15:00"},
        {"user": "XTR0011", "email_content": "Unauthorized access attempt", "timestamp": "2024-01-02 08:00:00"},
    ]),

    "device": pd.DataFrame([
        {"user": "XTR0011", "device": "USB", "action": "connect", "timestamp": "2024-01-02 07:00:00"},
        {"user": "FLX0909", "device": "External HDD", "action": "disconnect", "timestamp": "2024-01-02 08:30:00"},
        {"user": "WCR0044", "device": "Printer", "action": "connect", "timestamp": "2024-01-02 09:00:00"},
    ]),
}

# Standardize column names to lowercase
for log_type, df in data_sources.items():
    data_sources[log_type].columns = df.columns.str.lower()

# Convert relevant columns to datetime format
datetime_columns = {
    "logon": ["date"],
    "file_access": ["timestamp"],
    "email": ["timestamp"],
    "device": ["timestamp"]
}
for log_type, cols in datetime_columns.items():
    for col in cols:
        if col in data_sources[log_type].columns:
            data_sources[log_type][col] = pd.to_datetime(data_sources[log_type][col], errors='coerce')


    
def safe_execute_tool(tool_func, df: pd.DataFrame, **kwargs):
    """
    Safely execute the tool function and ensure it returns a DataFrame.
    """
    try:
        result = tool_func(df, **kwargs)
        if not isinstance(result, pd.DataFrame):  # Ensure the result is a DataFrame
            raise ValueError(f"Tool did not return a valid DataFrame. Got {type(result).__name__} instead.")
        return result
    except Exception as e:
        logging.error(f"Unhandled exception during tool execution for {tool_func.__name__}: {e}")
        return pd.DataFrame()  # Return an empty DataFrame on error




In [33]:
# Updated prompt and schema specification in the pipeline
import logging
import pandas as pd
from typing import List, Dict

# Initialize logging
logging.basicConfig(level=logging.INFO)

# Define Input Schema and Objectives for Each Subtask
tool_specs = {
    "logon": {
        "required_columns": ["user", "date", "pc", "activity", "content"],
        "objective": "Identify suspicious logon activity outside working hours (8 AM to 6 PM) or invalid activity types.",
        "example_input": {
            "user": ["WCR0044", "LRG0155"],
            "date": ["2024-01-02 05:02:50", "2024-01-02 06:33:00"],
            "pc": ["PC-9174", "PC-0450"],
            "activity": ["Logon", "Logoff"],
            "content": ["Login successful", "User logged out"]
        },
        "example_output": "Filtered DataFrame containing rows with suspicious logon activities."
    },
    "file_access": {
        "required_columns": ["user", "file", "access_type", "timestamp"],
        "objective": "Detect file access events classified as suspicious, such as 'delete' operations.",
        "example_input": {
            "user": ["WCR0044", "LRG0155"],
            "file": ["confidential.pdf", "project_plan.docx"],
            "access_type": ["read", "delete"],
            "timestamp": ["2024-01-02 05:15:00", "2024-01-02 06:40:00"]
        },
        "example_output": "Filtered DataFrame with rows flagged for suspicious file access."
    },
    "email": {
        "required_columns": ["user", "email_content", "timestamp"],
        "objective": "Detect suspicious email content containing flagged keywords like 'password', 'urgent', or 'secret'.",
        "example_input": {
            "user": ["WCR0044", "LRG0155"],
            "email_content": ["Request for urgent payment", "Hello!"],
            "timestamp": ["2024-01-02 05:15:00", "2024-01-02 06:40:00"]
        },
        "example_output": "Filtered DataFrame containing rows with suspicious email content."
    },
    "device": {
        "required_columns": ["user", "device", "action", "timestamp"],
        "objective": "Identify unusual device activity, such as connecting unauthorized devices.",
        "example_input": {
            "user": ["WCR0044", "XTR0011"],
            "device": ["USB", "Printer"],
            "action": ["connect", "disconnect"],
            "timestamp": ["2024-01-02 05:15:00", "2024-01-02 06:40:00"]
        },
        "example_output": "Filtered DataFrame containing rows with suspicious device activity."
    },
    "psychometric": {
        "required_columns": ["user", "trait", "value"],
        "objective": "Analyze psychometric traits to detect scores outside valid ranges or suspicious patterns.",
        "example_input": {
            "user": ["WCR0044", "LRG0155"],
            "trait": ["openness", "conscientiousness"],
            "value": [0.8, 1.2]
        },
        "example_output": "Filtered DataFrame highlighting users with psychometric anomalies."
    }
}


def validate_and_prepare_data(df: pd.DataFrame, required_columns: List[str]) -> pd.DataFrame:
    """
    Validate and prepare input data for the tool.
    Ensures all required columns exist and are in correct formats.
    """
    for col in required_columns:
        if col not in df.columns:
            logging.warning(f"Column '{col}' is missing in input data. Filling with NaN.")
            df[col] = None  # Add missing columns with NaN values

    # Ensure datetime columns are properly formatted
    for col in required_columns:
        if "date" in col or "timestamp" in col:
            try:
                df[col] = pd.to_datetime(df[col], errors='coerce')
                df[col] = df[col].fillna(pd.Timestamp.now())  # Correct assignment
            except Exception as e:
                logging.error(f"Error converting column '{col}' to datetime: {e}")
                df[col] = pd.Timestamp.now()  # Default to current time if conversion fails

    return df




def generate_tool_code(agent, task_name: str, requirements: str, log_type: str) -> str:
    """
    Generate tool code with a detailed prompt including schema, examples, and objectives.
    """
    spec = tool_specs[log_type]
    prompt = f"""
    Generate a robust Python function named '{task_name}'.
    Objective: {spec['objective']}
    Input Schema: The DataFrame must include these columns: {', '.join(spec['required_columns'])}.
    Example Input: {spec['example_input']}
    Expected Output: {spec['example_output']}
    Key Requirements:
    - Validate input schema with error handling for missing or invalid columns.
    - Handle edge cases, such as empty DataFrames or columns with incorrect datatypes.
    - Include detailed comments for each step.
    """

    return agent.generate_tool_code(task_name, requirements, "default_prompt", spec["required_columns"], prompt)


def execute_pipeline():
    subtasks = decomposer_agent.decompose_task()
    executor_nodes = {}
    results = {}

    # Generate tools for each subtask
    for log_type, subtask_name in subtasks.items():
        logging.info(f"Creating tool for subtask: {subtask_name}")
        spec = tool_specs.get(log_type, {})
        if not spec:
            logging.error(f"No specification found for log type: {log_type}")
            continue

        df = data_sources.get(log_type, pd.DataFrame())
        df = validate_and_prepare_data(df, spec["required_columns"])

        tool_code = generate_tool_code(rl_tool_builder_agent, subtask_name, spec["objective"], log_type)
        if tool_code is None:
            logging.error(f"Failed to generate tool for {subtask_name}.")
            continue

        try:
            exec(tool_code, globals())
            tool_func = globals().get(subtask_name)
            if tool_func:
                executor_nodes[log_type] = ExecutorNode(subtask_name, tool_func)
            else:
                logging.error(f"Function {subtask_name} not found in globals.")
        except Exception as e:
            logging.error(f"Error loading generated tool for {subtask_name}: {e}")

    # Execute the tools
    for log_type, executor_node in executor_nodes.items():
        df = data_sources.get(log_type, pd.DataFrame())
        results[log_type] = safe_execute_tool(executor_node.tool_func, df) if executor_node.tool_func else None

    # Aggregate results
    return anomaly_aggregator_agent.aggregate(results)



# Run the pipeline
final_results = execute_pipeline()

# Output the final aggregated anomalies
print("Final Aggregated Anomalies:")
print(final_results)


INFO:root:Decomposed task 'Detect_Suspicious_Activity' into subtasks: ['Detect_Suspicious_Activity_logon', 'Detect_Suspicious_Activity_psychometric', 'Detect_Suspicious_Activity_file_access', 'Detect_Suspicious_Activity_email', 'Detect_Suspicious_Activity_device']
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_logon
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_psychometric
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_file_access


      user              trait  value  is_suspicious
1  LRG0155  conscientiousness    1.2           True


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_email
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_device
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Final Aggregated Anomalies:
{'logon': {'anomalies':       user                date       pc activity           content
0  WCR0044 2024-01-02 05:02:50  PC-9174    Logon  Login successful
1  WCR0044 2024-01-02 06:15:30  PC-9174    Logon  Login successful
2  LRG0155 2024-01-02 06:33:00  PC-0450   Logoff   User logged out
3  XTR0011 2024-01-02 07:45:00  PC-1234    Logon  Login successful, 'reasons': 'Flagged 4 suspicious entries for logon'}, 'p

Final Aggregated Anomalies:
{'logon': {'anomalies':       user                date       pc activity           content
0  WCR0044 2024-01-02 05:02:50  PC-9174    Logon  Login successful
1  WCR0044 2024-01-02 06:15:30  PC-9174    Logon  Login successful
2  LRG0155 2024-01-02 06:33:00  PC-0450   Logoff   User logged out
3  XTR0011 2024-01-02 07:45:00  PC-1234    Logon  Login successful, 'reasons': 'Flagged 4 suspicious entries for logon'}, 'psychometric': {'anomalies': None, 'reasons': 'No anomalies detected (empty DataFrame) for psychometric'}, 'file_access': {'anomalies':       user         file access_type           timestamp
2  FLX0909  budget.xlsx      delete 2024-01-02 07:30:00, 'reasons': 'Flagged 1 suspicious entries for file_access'}, 'email': {'anomalies': None, 'reasons': 'No anomalies detected (empty DataFrame) for email'}, 'device': {'anomalies':       user device   action           timestamp
0  XTR0011    USB  connect 2024-01-02 07:00:00, 'reasons': 'Flagged 1 suspicious en

In [36]:
# Reload and validate data_dict
data_dict = {
    'logon': pd.read_csv(logon_file),
    'file_access': pd.read_csv(file_access_file),
    'email': pd.read_csv(email_file),
    'device': pd.read_csv(device_file),
    'http': pd.read_csv(http_file)
}

In [38]:
from tqdm import tqdm
import random

def load_and_subset_data(data_dict, subset_size=None):
    """
    Load and optionally subset data from the provided data dictionary.
    
    Args:
        data_dict (dict): Dictionary containing DataFrames for different log types.
        subset_size (int): Number of rows to select for each DataFrame. If None, uses the entire dataset.
    
    Returns:
        dict: Dictionary containing DataFrames (subset if specified).
    """
    subset_data = {}
    for log_type, df in tqdm(data_dict.items(), desc="Loading and Subsetting Data"):
        if subset_size:
            # Ensure reproducibility for subsets
            random.seed(42)
            subset_data[log_type] = df.sample(n=min(subset_size, len(df)), random_state=42)
        else:
            subset_data[log_type] = df
    return subset_data


def execute_pipeline_with_subset(data_dict, subset_size=None):
    """
    Execute the anomaly detection pipeline with an optional subset size.

    Args:
        data_dict (dict): Dictionary containing DataFrames for different log types.
        subset_size (int): Number of rows to use for each dataset. If None, uses the full dataset.

    Returns:
        dict: Final aggregated anomalies.
    """
    # Filter only the log types present in tool_specs
    filtered_data_dict = {k: v for k, v in data_dict.items() if k in tool_specs}
    
    # Load and subset data
    logging.info(f"Using subset size: {subset_size}")
    subset_data = load_and_subset_data(filtered_data_dict, subset_size)

    # Update data_sources with the subset data
    for log_type, df in subset_data.items():
        data_sources[log_type] = validate_and_prepare_data(df, tool_specs[log_type]["required_columns"])

    # Execute pipeline
    return execute_pipeline()



# Example usage
subset_size = 100  # Adjust subset size as needed
final_results = execute_pipeline_with_subset(data_dict, subset_size=subset_size)

# Output the results
print("Final Aggregated Anomalies:")
print(final_results)


INFO:root:Using subset size: 100
Loading and Subsetting Data: 100%|██████████| 4/4 [00:00<00:00, 33.89it/s]
INFO:root:Decomposed task 'Detect_Suspicious_Activity' into subtasks: ['Detect_Suspicious_Activity_logon', 'Detect_Suspicious_Activity_psychometric', 'Detect_Suspicious_Activity_file_access', 'Detect_Suspicious_Activity_email', 'Detect_Suspicious_Activity_device']
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_logon
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_psychometric
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)
INFO:root:Creating tool for subtask: Detect_Suspicious_Activity_file_access
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Generated code after 1 attempt(s)

Final Aggregated Anomalies:
{'logon': {'anomalies':                                            id     user       pc activity  \
date                                                                       
2010-02-02 19:00:00  {B2L2-P6OR27JE-6488PNDB}  EKM0667  PC-3339   Logoff   
2010-04-07 18:32:00  {C4J0-F6US63RL-9068VERA}  IDM0916  PC-5335   Logoff   
2010-10-05 19:10:00  {Y3D0-X6DR91KR-0063ZZIY}  OTH0763  PC-3402   Logoff   
2010-01-17 07:59:00  {Z9K1-O8DV87AO-8889VKAP}  DMR0116  PC-2025    Logon   
2011-04-07 07:46:00  {J5I5-W4EP35SP-4225IMEC}  ATC0286  PC-0132    Logon   
2011-01-06 19:14:00  {P8X0-Y8ID80MH-9818ONHM}  DSC0751  PC-3002   Logoff   
2010-03-07 07:56:00  {P1R5-J6AL55RO-9895ZMIX}  ECM0004  PC-3295    Logon   
2011-02-12 07:20:00  {C5E9-W6ZJ32HS-7974CRPT}  CMH0589  PC-2101    Logon   
2010-12-21 05:58:57  {X8F6-P4NO31WS-7025SBRQ}  WCR0044  PC-6836    Logon   
2010-04-30 18:30:00  {W9E8-M5JA13TM-7682CAUJ}  IPB0360  PC-0860   Logoff   
2010-01-06 01:16:42  {E6F2-X2CG40VM-

In [35]:
data_dict

{'user': ['WCR0044', 'LRG0155'],
 'trait': ['openness', 'conscientiousness'],
 'value': [0.8, 1.2]}