# AI-Powered Security Agent

## Workflow for Handling Alerts

<img src="Workflow_for_Handling_Alerts.png" style="border: 2px solid black; display: block; margin: auto;" width="1500">


## Workflow for OpenAI Analysis and Priority Determination using Agentic Tools

<img src="Workflow_for_open_ai_analysis.png" style="border: 2px solid black; display: block; margin: auto;" width="1200">


# Step 1: Install Required Libraries


In [3]:
!pip install openai


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Step 2: Set Up OpenAI API Key

In [12]:
import openai
import getpass
import os

#openai.api_key = getpass.getpass("Enter your OpenAI API key: ")
os.environ["OPENAI_API_KEY"] = getpass.getpass()


 ········


# Step 3: Initialize SQLite Database


In [5]:
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect("soc_alerts.db")
cursor = conn.cursor()

# Create a table for storing alerts
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    alert_data TEXT,
    analysis TEXT,
    response_plan TEXT,
    response_mode TEXT, -- manual, semi_autonomous, or autonomous
    status TEXT -- pending, resolved
)
''')
conn.commit()

print("SQLite database and table initialized.")


SQLite database and table initialized.


# Step 4: Simulate Receiving an Alert


In [6]:
import json

def receive_alert(alert_data):
    """
    Simulate receiving an alert and insert it into the SQLite database.
    """
    cursor.execute(
        "INSERT INTO alerts (alert_data, status) VALUES (?, ?)", 
        (json.dumps(alert_data), "pending")
    )
    conn.commit()
    return cursor.lastrowid

# Example alert data
alert = {
    "alert_type": "suspicious_login",
    "details": "Login attempt from an unrecognized IP address."
}

# Insert the alert into the database
alert_id = receive_alert(alert)
print(f"Alert received and stored in the database with ID: {alert_id}")


Alert received and stored in the database with ID: 1


# Step 5: Analyze the Alert

In [13]:
from openai import OpenAI
import json

# Initialize the OpenAI client
client = OpenAI()

# Define tools (functions) for the OpenAI model
tools = [
    {
        "type": "function",
        "function": {
            "name": "analyze_alert",
            "parameters": {
                "type": "object",
                "properties": {
                    "alert_data": {"type": "string"},
                },
                "required": ["alert_data"],
            },
        },
    }
]

# Alert data to analyze
alert_data = {
    "alert_type": "suspicious_login",
    "details": "Login attempt from an unrecognized IP address.",
}

# Call OpenAI to analyze the alert
completion = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a SOC AI assistant."},
        {"role": "user", "content": f"Analyze this alert: {json.dumps(alert_data)}"},
    ],
    tools=tools,
)

# Extract and print the tool calls (function responses)
tool_calls = completion.choices[0].message.tool_calls
print("Tool Calls Output:")
print(tool_calls)


Tool Calls Output:
[ChatCompletionMessageToolCall(id='call_M70PjMOvENk8FDtvpnW3QAgm', function=Function(arguments='{"alert_data":"{\\"alert_type\\": \\"suspicious_login\\", \\"details\\": \\"Login attempt from an unrecognized IP address.\\"}"}', name='analyze_alert'), type='function')]


In [14]:
# Extract the tool call from the output
tool_call = completion.choices[0].message.tool_calls[0]  # First tool call

# Parse the arguments from the function call
function_arguments = json.loads(tool_call.function.arguments)

# Extract and print the alert data analyzed
alert_data_analyzed = json.loads(function_arguments["alert_data"])

print("Analyzed Alert Data:")
print(json.dumps(alert_data_analyzed, indent=2))


Analyzed Alert Data:
{
  "alert_type": "suspicious_login",
  "details": "Login attempt from an unrecognized IP address."
}


# Step 6: Generate a Response Plan and Handle Response Modes

In [15]:
# Define updated tools for generating response plans
tools = [
    {
        "type": "function",
        "function": {
            "name": "analyze_alert",
            "parameters": {
                "type": "object",
                "properties": {
                    "alert_data": {"type": "string"}
                },
                "required": ["alert_data"]
            },
        },
    }
]

# Simulate analyzing the alert and generating a response plan
completion = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a SOC AI assistant."},
        {"role": "user", "content": f"Analyze this alert: {json.dumps(alert_data)}"}
    ],
    tools=tools,
)

# Extract and parse the tool call arguments
tool_call = completion.choices[0].message.tool_calls[0]
function_arguments = json.loads(tool_call.function.arguments)

# Simulated response plan
response_plan = f"""
Root Cause Analysis:
Alert Type: {function_arguments['alert_data']}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.
"""

print("Generated Response Plan:")
print(response_plan)


Generated Response Plan:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.



In [16]:
# Choose the response mode: manual, semi_autonomous, or autonomous
response_mode = "manual"  

if response_mode == "manual":
    print("Manual Mode:")
    print(response_plan)
    input("Press Enter once you’ve executed the response.")
elif response_mode == "semi_autonomous":
    execute = input("Execute response plan? (yes/no): ").strip().lower()
    if execute == "yes":
        print("Executing response...")
        print(response_plan)
elif response_mode == "autonomous":
    print("Autonomously executing response...")
    print(response_plan)


Manual Mode:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.



Press Enter once you’ve executed the response. 


# Step 7: Save the Response Plan and Update Alert Status in SQLite


In [19]:
def save_analysis_and_response(alert_id, analysis, response_plan, response_mode):
    """
    Save analysis, response plan, and response mode to the SQLite database and update the alert status.
    """
    cursor.execute(
        "UPDATE alerts SET analysis = ?, response_plan = ?, response_mode = ?, status = ? WHERE id = ?",
        (analysis, response_plan, response_mode, "resolved", alert_id)
    )
    conn.commit()
    print(f"Analysis and response plan saved for Alert ID: {alert_id}")

# Example Analysis
analysis = "The alert was caused by an unrecognized IP login, indicating possible credential compromise."

# Save with manual mode
save_analysis_and_response(alert_id, analysis, response_plan, "manual")


Analysis and response plan saved for Alert ID: 1


### Verify the Update


In [20]:
updated_alert = get_alert_from_db(alert_id)
print("Updated Alert Record with Analysis and Response Mode:")
print(updated_alert)


Updated Alert Record with Analysis and Response Mode:
(1, '{"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}', 'The alert was caused by an unrecognized IP login, indicating possible credential compromise.', '\nRoot Cause Analysis:\nAlert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}\n\nResponse Plan:\n1. Verify the user\'s login location and IP address.\n2. Notify the user about suspicious activity.\n3. Temporarily lock the account if deemed necessary.\n', 'manual', 'resolved')


# Step 8: Implement Semi-Autonomous and Autonomous Modes

In [21]:
def process_alert(alert_id, response_mode):
    """
    Process the alert based on the selected response mode: manual, semi_autonomous, or autonomous.
    """
    # Retrieve the alert details
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    print(f"Processing Alert ID: {alert_id} with Mode: {response_mode}")
    
    if response_mode == "manual":
        print("Manual Mode:")
        print(response_plan)
        input("Press Enter once you’ve executed the response.")
    elif response_mode == "semi_autonomous":
        print("Semi-Autonomous Mode:")
        print(response_plan)
        execute = input("Execute response plan? (yes/no): ").strip().lower()
        if execute == "yes":
            print("Executing response...")
            execute_response_plan(alert_id)
        else:
            print("Response execution skipped.")
    elif response_mode == "autonomous":
        print("Autonomous Mode:")
        print("Executing response automatically...")
        execute_response_plan(alert_id)
    else:
        print("Invalid response mode selected.")

def execute_response_plan(alert_id):
    """
    Simulate the execution of the response plan.
    """
    # Retrieve and display the response plan
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    # Simulated execution
    print("Executing Response Plan:")
    print(response_plan)
    print("All steps completed successfully.")
    
    # Update alert status to resolved
    cursor.execute("UPDATE alerts SET status = ? WHERE id = ?", ("resolved", alert_id))
    conn.commit()
    print(f"Alert ID: {alert_id} marked as resolved.")


In [22]:
# Example: Process the alert in semi-autonomous mode
response_mode = "semi_autonomous"  # Change to "manual" or "autonomous" as needed
process_alert(alert_id, response_mode)


Processing Alert ID: 1 with Mode: semi_autonomous
Semi-Autonomous Mode:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.



Execute response plan? (yes/no):  yes


Executing response...
Executing Response Plan:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.

All steps completed successfully.
Alert ID: 1 marked as resolved.


# Step 9: Add Real-World Integrations


In [23]:
import requests

def notify_user(alert_id, user_email):
    """
    Simulate notifying the user via a mock API call.
    """
    api_url = "https://example.com/notify"
    payload = {"alert_id": alert_id, "user_email": user_email, "message": "Suspicious login detected."}

    try:
        response = requests.post(api_url, json=payload)
        print(f"Notification sent to {user_email}: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Error notifying user: {e}")

def lock_account(user_id):
    """
    Simulate locking the user's account via a mock API call.
    """
    api_url = "https://example.com/lock_account"
    payload = {"user_id": user_id}

    try:
        response = requests.post(api_url, json=payload)
        print(f"Account lock request sent for User ID {user_id}: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Error locking account: {e}")

def execute_response_plan(alert_id, user_email, user_id):
    """
    Execute the response plan, including notifying the user and locking the account.
    """
    # Retrieve and display the response plan
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    print("Executing Response Plan:")
    print(response_plan)

    # Notify the user
    notify_user(alert_id, user_email)
    
    # Lock the user's account if necessary
    lock_account(user_id)

    # Mark the alert as resolved in the database
    cursor.execute("UPDATE alerts SET status = ? WHERE id = ?", ("resolved", alert_id))
    conn.commit()
    print(f"Alert ID: {alert_id} marked as resolved.")


# Step 10: Test with Semi-Autonomous and Autonomous Modes


In [24]:
def process_alert(alert_id, response_mode, user_email=None, user_id=None):
    """
    Process the alert based on the selected response mode: manual, semi_autonomous, or autonomous.
    """
    # Retrieve the alert details
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    print(f"Processing Alert ID: {alert_id} with Mode: {response_mode}")
    
    if response_mode == "manual":
        print("Manual Mode:")
        print(response_plan)
        input("Press Enter once you’ve executed the response.")
    elif response_mode == "semi_autonomous":
        print("Semi-Autonomous Mode:")
        print(response_plan)
        execute = input("Execute response plan? (yes/no): ").strip().lower()
        if execute == "yes":
            print("Executing response...")
            execute_response_plan(alert_id, user_email, user_id)
        else:
            print("Response execution skipped.")
    elif response_mode == "autonomous":
        print("Autonomous Mode:")
        print("Executing response automatically...")
        execute_response_plan(alert_id, user_email, user_id)
    else:
        print("Invalid response mode selected.")


# Step 11: Test the Full Workflow


In [25]:
# Example alert details
alert_id = 1 
user_email = "user@example.com"
user_id = 12345

# Process the alert in semi-autonomous  mode
response_mode = "semi_autonomous" 
process_alert(alert_id, response_mode, user_email=user_email, user_id=user_id)


Processing Alert ID: 1 with Mode: semi_autonomous
Semi-Autonomous Mode:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.



Execute response plan? (yes/no):  yes


Executing response...
Executing Response Plan:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.

Notification sent to user@example.com: 405
Account lock request sent for User ID 12345: 405
Alert ID: 1 marked as resolved.


# Step 12: Add Logging for Execution Tracking


In [26]:
import logging

# Configure logging
logging.basicConfig(
    filename="soc_alerts.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)


In [27]:
def notify_user(alert_id, user_email):
    """
    Simulate notifying the user via a mock API call.
    """
    api_url = "https://example.com/notify"
    payload = {"alert_id": alert_id, "user_email": user_email, "message": "Suspicious login detected."}

    try:
        response = requests.post(api_url, json=payload)
        logging.info(f"Notification sent to {user_email} for Alert ID {alert_id}: Status {response.status_code}")
        print(f"Notification sent to {user_email}: {response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"Error notifying user {user_email} for Alert ID {alert_id}: {e}")
        print(f"Error notifying user: {e}")


In [28]:
def lock_account(user_id):
    """
    Simulate locking the user's account via a mock API call.
    """
    api_url = "https://example.com/lock_account"
    payload = {"user_id": user_id}

    try:
        response = requests.post(api_url, json=payload)
        logging.info(f"Account lock request sent for User ID {user_id}: Status {response.status_code}")
        print(f"Account lock request sent for User ID {user_id}: {response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"Error locking account for User ID {user_id}: {e}")
        print(f"Error locking account: {e}")


In [29]:
def process_alert(alert_id, response_mode, user_email=None, user_id=None):
    """
    Process the alert based on the selected response mode: manual, semi_autonomous, or autonomous.
    """
    # Retrieve the alert details
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    logging.info(f"Processing Alert ID {alert_id} with Mode: {response_mode}")
    print(f"Processing Alert ID: {alert_id} with Mode: {response_mode}")
    
    if response_mode == "manual":
        logging.info(f"Manual Mode activated for Alert ID {alert_id}.")
        print("Manual Mode:")
        print(response_plan)
        input("Press Enter once you’ve executed the response.")
    elif response_mode == "semi_autonomous":
        logging.info(f"Semi-Autonomous Mode activated for Alert ID {alert_id}.")
        print("Semi-Autonomous Mode:")
        print(response_plan)
        execute = input("Execute response plan? (yes/no): ").strip().lower()
        if execute == "yes":
            logging.info(f"User confirmed execution for Alert ID {alert_id}.")
            print("Executing response...")
            execute_response_plan(alert_id, user_email, user_id)
        else:
            logging.info(f"User skipped execution for Alert ID {alert_id}.")
            print("Response execution skipped.")
    elif response_mode == "autonomous":
        logging.info(f"Autonomous Mode activated for Alert ID {alert_id}. Executing automatically.")
        print("Autonomous Mode:")
        print("Executing response automatically...")
        execute_response_plan(alert_id, user_email, user_id)
    else:
        logging.error(f"Invalid response mode {response_mode} for Alert ID {alert_id}.")
        print("Invalid response mode selected.")


In [30]:
# Example alert details
alert_id = 1 
user_email = "user@example.com"
user_id = 12345

# Process the alert in semi-autonomous mode
response_mode = "semi_autonomous" 
process_alert(alert_id, response_mode, user_email=user_email, user_id=user_id)

Processing Alert ID: 1 with Mode: semi_autonomous
Semi-Autonomous Mode:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.



Execute response plan? (yes/no):  yes


Executing response...
Executing Response Plan:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.

Notification sent to user@example.com: 405
Account lock request sent for User ID 12345: 405
Alert ID: 1 marked as resolved.


In [None]:
!cat soc_alerts.log

# Step 14: Autonomous resolution and Add Notification for Alert Resolution


In [32]:
def notify_resolution(alert_id, user_email):
    """
    Notify stakeholders about the resolution of an alert via a mock API call.
    """
    api_url = "https://example.com/notify_resolution"
    payload = {
        "alert_id": alert_id,
        "user_email": user_email,
        "message": f"Alert ID {alert_id} has been resolved successfully."
    }

    try:
        response = requests.post(api_url, json=payload)
        logging.info(f"Resolution notification sent to {user_email} for Alert ID {alert_id}: Status {response.status_code}")
        print(f"Resolution notification sent to {user_email}: {response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"Error notifying resolution for Alert ID {alert_id}: {e}")
        print(f"Error notifying resolution: {e}")


In [33]:
def execute_response_plan(alert_id, user_email, user_id):
    """
    Execute the response plan, including notifying the user, locking the account, and notifying resolution.
    """
    # Retrieve and display the response plan
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    print("Executing Response Plan:")
    print(response_plan)

    # Notify the user
    notify_user(alert_id, user_email)
    
    # Lock the user's account if necessary
    lock_account(user_id)

    # Mark the alert as resolved in the database
    cursor.execute("UPDATE alerts SET status = ? WHERE id = ?", ("resolved", alert_id))
    conn.commit()
    logging.info(f"Alert ID {alert_id} marked as resolved.")
    print(f"Alert ID: {alert_id} marked as resolved.")

    # Notify resolution
    notify_resolution(alert_id, user_email)


In [34]:
# Example alert details
alert_id = 1  
user_email = "user@example.com"
user_id = 12345

# Process the alert in autonomous mode for testing
response_mode = "autonomous"
process_alert(alert_id, response_mode, user_email=user_email, user_id=user_id)


Processing Alert ID: 1 with Mode: autonomous
Autonomous Mode:
Executing response automatically...
Executing Response Plan:

Root Cause Analysis:
Alert Type: {"alert_type": "suspicious_login", "details": "Login attempt from an unrecognized IP address."}

Response Plan:
1. Verify the user's login location and IP address.
2. Notify the user about suspicious activity.
3. Temporarily lock the account if deemed necessary.

Notification sent to user@example.com: 405
Account lock request sent for User ID 12345: 405
Alert ID: 1 marked as resolved.
Resolution notification sent to user@example.com: 405


In [35]:
!cat soc_alerts.log

2024-12-31 16:07:02,355 - INFO - Processing Alert ID 1 with Mode: semi_autonomous
2024-12-31 16:07:02,356 - INFO - Semi-Autonomous Mode activated for Alert ID 1.
2024-12-31 16:07:05,954 - INFO - User confirmed execution for Alert ID 1.
2024-12-31 16:07:06,159 - INFO - Notification sent to user@example.com for Alert ID 1: Status 405
2024-12-31 16:07:06,443 - INFO - Account lock request sent for User ID 12345: Status 405
2024-12-31 16:08:37,958 - INFO - Processing Alert ID 1 with Mode: autonomous
2024-12-31 16:08:37,959 - INFO - Autonomous Mode activated for Alert ID 1. Executing automatically.
2024-12-31 16:08:38,249 - INFO - Notification sent to user@example.com for Alert ID 1: Status 405
2024-12-31 16:08:38,351 - INFO - Account lock request sent for User ID 12345: Status 405
2024-12-31 16:08:38,352 - INFO - Alert ID 1 marked as resolved.
2024-12-31 16:08:38,547 - INFO - Resolution notification sent to user@example.com for Alert ID 1: Status 405


# Step 15: Add More Agentic Tools


In [None]:
# Mock server 

# from flask import Flask, request, jsonify

# app = Flask(__name__)

# @app.route('/notify', methods=['POST'])
# def notify():
#     data = request.json
#     return jsonify({"status": "success", "message": "Notification sent"}), 200

# @app.route('/lock_account', methods=['POST'])
# def lock_account():
#     data = request.json
#     return jsonify({"status": "success", "message": "Account locked"}), 200

# if __name__ == "__main__":
#     app.run(port=5000)

# python serv.py

In [110]:
tools.append(
    {
        "type": "function",
        "function": {
            "name": "prioritize_alerts",
            "parameters": {
                "type": "object",
                "properties": {
                    "alert_data": {"type": "string"},
                    "priority": {"type": "string", "enum": ["high", "low"]}  
                },
                "required": ["alert_data", "priority"] 
            }
        }
    }
)


In [111]:
refined_prompt = (
    "You are a SOC AI assistant. High-priority alerts include critical breaches, "
    "ransomware attacks, multiple failed login attempts on admin consoles, or any event that poses "
    "an immediate risk to sensitive systems or data. Respond with a JSON object in this format: "
    '{"alert_data": "...", "priority": "high"} or {"alert_data": "...", "priority": "low"}.'
)


def prioritize_alert(alert_data):
    """
    Use OpenAI agentic tool to prioritize an alert, with explicit schema and logging for missing fields.
    """
    completion = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": refined_prompt},
            {"role": "user", "content": f"Prioritize this alert: {json.dumps(alert_data)}"}
        ],
        tools=tools
    )

    # Log the full agent response
    print("Full Agent Response:", completion)

    # Extract tool calls
    tool_calls = completion.choices[0].message.tool_calls
    if not tool_calls:
        print("Error: No tool call generated by the agent.")
        return "low"

    # Parse the priority from the response
    tool_call = tool_calls[0]
    print("Agent Raw Response:", tool_call.function.arguments)
    response_data = json.loads(tool_call.function.arguments)
    priority = response_data.get("priority")
    if not priority:
        print("Error: Missing 'priority' in response. Defaulting to 'low'.")
        return "low"
    print(f"Alert Priority: {priority}")
    return priority


In [112]:
tools.append(
    {
        "type": "function",
        "function": {
            "name": "escalate_alert",
            "parameters": {
                "type": "object",
                "properties": {
                    "alert_id": {"type": "integer"},
                    "priority": {"type": "string"}
                },
                "required": ["alert_id", "priority"]
            }
        }
    }
)


In [113]:
def escalate_alert(alert_id, priority):
    """
    Use OpenAI agentic tool to escalate a high-priority alert.
    """
    completion = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a SOC escalation assistant."},
            {"role": "user", "content": f"Escalate this alert with ID {alert_id} and priority {priority}."}
        ],
        tools=tools
    )
    tool_call = completion.choices[0].message.tool_calls[0]
    escalation_steps = json.loads(tool_call.function.arguments).get("escalation_steps", [])
    print(f"Escalation Steps for Alert ID {alert_id}:")
    for step in escalation_steps:
        print(f"- {step}")
    return escalation_steps


# Step 16: Integrate Agentic Tools into Workflow


In [73]:
def notify_user(alert_id, user_email):
    """
    Simulate notifying the user via a mock API call.
    """
    payload = {"alert_id": alert_id, "user_email": user_email, "message": "Unauthorized access attempt detected."}
    response = requests.post(NOTIFY_API_URL, json=payload)
    if response.status_code == 200:
        print(f"Notification sent to {user_email}: {response.status_code}")
    else:
        print(f"Failed to notify {user_email}: {response.status_code}, Details: {response.text}")

def lock_account(user_id):
    """
    Simulate locking the user's account via a mock API call.
    """
    payload = {"user_id": user_id}
    response = requests.post(LOCK_ACCOUNT_API_URL, json=payload)
    if response.status_code == 200:
        print(f"Account lock request sent for User ID {user_id}: {response.status_code}")
    else:
        print(f"Failed to lock account for User ID {user_id}: {response.status_code}, Details: {response.text}")


In [74]:
def execute_response_plan(alert_id, user_email, user_id):
    """
    Execute the response plan, including notifying the user and locking the account.
    """
    # Retrieve the response plan from the database
    cursor.execute("SELECT response_plan FROM alerts WHERE id = ?", (alert_id,))
    response_plan = cursor.fetchone()[0]
    
    print("Executing Response Plan:")
    print(response_plan)

    # Notify the user
    notify_user(alert_id, user_email)

    # Lock the user's account
    lock_account(user_id)

    # Mark the alert as resolved in the database
    cursor.execute("UPDATE alerts SET status = ? WHERE id = ?", ("resolved", alert_id))
    conn.commit()
    print(f"Alert ID: {alert_id} marked as resolved.")


In [84]:
def process_alert(alert_id, response_mode, user_email=None, user_id=None):
    """
    Process the alert with prioritization and escalation based on response mode.
    """
    # Retrieve the alert details
    cursor.execute("SELECT alert_data, response_plan FROM alerts WHERE id = ?", (alert_id,))
    alert_data, response_plan = cursor.fetchone()
    alert_data = json.loads(alert_data)

    # Generate the response plan if missing
    if response_plan is None:
        analysis = "The alert was caused by unauthorized access attempt to a secure area."
        response_plan = """
        Root Cause Analysis:
        Alert Type: Unauthorized Access

        Response Plan:
        1. Verify the unauthorized access attempt.
        2. Escalate to security personnel if necessary.
        3. Lock the affected resources to prevent further access.
        """
        save_analysis_and_response(alert_id, analysis, response_plan, response_mode)

    # Determine the alert priority
    priority = prioritize_alert(alert_data)
    print(f"Alert Priority: {priority}")

    # Handle high-priority alerts with escalation
    if priority == "high":
        escalation_steps = escalate_alert(alert_id, priority)
        print("Escalation Steps Executed:")
        for step in escalation_steps:
            print(f"- {step}")
        logging.info(f"Escalation steps executed for Alert ID {alert_id}: {escalation_steps}")

    # Continue processing based on response mode
    print(f"Processing Alert ID: {alert_id} with Mode: {response_mode}")

    if response_mode == "manual":
        print("Manual Mode:")
        print(response_plan)
        input("Press Enter once you’ve executed the response.")
    elif response_mode == "semi_autonomous":
        print("Semi-Autonomous Mode:")
        print(response_plan)
        execute = input("Execute response plan? (yes/no): ").strip().lower()
        if execute == "yes":
            print("Executing response...")
            execute_response_plan(alert_id, user_email, user_id)
        else:
            print("Response execution skipped.")
    elif response_mode == "autonomous":
        print("Autonomous Mode:")
        print("Executing response automatically...")
        execute_response_plan(alert_id, user_email, user_id)
    else:
        print("Invalid response mode selected.")


# Step 17: Test the Enhanced Workflow with high and low priority issues


In [67]:
NOTIFY_API_URL = "http://localhost:5000/notify"
LOCK_ACCOUNT_API_URL = "http://localhost:5000/lock_account"


In [117]:
alert = {
    "alert_type": "unauthorized_access",
    "details": "Unauthorized access attempt to a secure area."
}
receive_alert(alert)


19

In [118]:
alert_id = 19  
process_alert(alert_id, "autonomous", user_email="security@example.com", user_id=67890)


Analysis and response plan saved for Alert ID: 19
Full Agent Response: ChatCompletion(id='chatcmpl-AkhTRe7auao64MwNogQ3AEdnFy6K4', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, refusal=None, role='assistant', audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_IombfDZ0TatjjQx8KNtswZYo', function=Function(arguments='{"alert_data":"{\\"alert_type\\": \\"unauthorized_access\\", \\"details\\": \\"Unauthorized access attempt to a secure area.\\"}"}', name='analyze_alert'), type='function')]))], created=1735693565, model='gpt-4o-2024-08-06', object='chat.completion', service_tier=None, system_fingerprint='fp_d28bcae782', usage=CompletionUsage(completion_tokens=38, prompt_tokens=185, total_tokens=223, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_toke

In [115]:
alert = {
    "alert_type": "critical_breach",
    "details": "Unauthorized access detected on the admin console with multiple failed attempts in the past 5 minutes."
}
receive_alert(alert)


18

In [116]:
alert_id = 18 
process_alert(alert_id, "autonomous", user_email="security@example.com", user_id=67890)


Analysis and response plan saved for Alert ID: 18
Full Agent Response: ChatCompletion(id='chatcmpl-AkhSzu7ltno8mPovH1q3OoaviQsfR', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, refusal=None, role='assistant', audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_7be6S74U8J6iBPFUA98f6dKh', function=Function(arguments='{"alert_data":"{\\"alert_type\\": \\"critical_breach\\", \\"details\\": \\"Unauthorized access detected on the admin console with multiple failed attempts in the past 5 minutes.\\"}","priority":"high"}', name='prioritize_alerts'), type='function')]))], created=1735693537, model='gpt-4o-2024-08-06', object='chat.completion', service_tier=None, system_fingerprint='fp_d28bcae782', usage=CompletionUsage(completion_tokens=54, prompt_tokens=195, total_tokens=249, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected

In [121]:
alert = {
    "alert_type": "critical_breach",
    "details": "Unauthorized access detected on the admin console with multiple failed attempts in the past 5 minutes."
}
receive_alert(alert)


21

In [124]:
alert_id = 21 
process_alert(alert_id, "autonomous", user_email="security@example.com", user_id=67890)


Full Agent Response: ChatCompletion(id='chatcmpl-AkiwCWjFJeL04CKkdyjeOt4rUYAdO', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, refusal=None, role='assistant', audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_jdo8s3MGE0Ihj5LRs0oyoT6L', function=Function(arguments='{"alert_data":"{\\"alert_type\\": \\"critical_breach\\", \\"details\\": \\"Unauthorized access detected on the admin console with multiple failed attempts in the past 5 minutes.\\"}","priority":"high"}', name='prioritize_alerts'), type='function')]))], created=1735699192, model='gpt-4o-2024-08-06', object='chat.completion', service_tier=None, system_fingerprint='fp_d28bcae782', usage=CompletionUsage(completion_tokens=54, prompt_tokens=195, total_tokens=249, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=Promp