# Alert Prioritization Use Case
Automating the prioritization of critical alerts from a large volume of security alerts using SageMaker endpoint.

**Note**: Update the `endpoint_name` and `aws_region` variables below to match your deployment.

## Configuration
Update these variables to match your SageMaker deployment:

In [None]:
# UPDATE THESE VARIABLES TO MATCH YOUR DEPLOYMENT
endpoint_name = 'foundation-sec-8b-endpoint'  # Your SageMaker endpoint name
aws_region = 'us-east-1'  # Your AWS region

print(f"Configuration:")
print(f"Endpoint: {endpoint_name}")
print(f"Region: {aws_region}")

In [None]:
import boto3
import json
import pandas as pd
from datetime import datetime
from IPython.display import display, Markdown

# Initialize SageMaker runtime client
sagemaker_runtime = boto3.client('sagemaker-runtime', region_name=aws_region)

print(f"Connected to SageMaker endpoint: {endpoint_name}")

In [None]:
def invoke_endpoint(prompt, max_new_tokens=1024, temperature=0.3):
    """Invoke the SageMaker endpoint with the given prompt"""
    payload = {
        "inputs": prompt,
        "parameters": {
            "max_new_tokens": max_new_tokens,
            "temperature": temperature,
            "do_sample": temperature > 0,
            "repetition_penalty": 1.2
        }
    }
    
    try:
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType='application/json',
            Body=json.dumps(payload)
        )
        
        result = json.loads(response['Body'].read().decode())
        
        # Handle different response formats from TGI
        if isinstance(result, list) and len(result) > 0:
            return result[0].get('generated_text', str(result))
        elif isinstance(result, dict):
            return result.get('generated_text', str(result))
        else:
            return str(result)
            
    except Exception as e:
        print(f"Error invoking endpoint: {str(e)}")
        return f"Error: {str(e)}"

# Test the endpoint connection
test_response = invoke_endpoint("Hello, can you help with cybersecurity?", max_new_tokens=50)
print("Test Response:")
print(test_response)

## Sample Security Alerts
Here are mock security alerts to analyze:

In [None]:
alerts = """
{
"alert_uuid": "3a4bd7e4-f8a2-4b7e-9d3b-7e2fa3f98b6c",
"timestamp": "2025-07-15T10:55:00Z",
"user": "security_admin",
"source_ip": "198.51.100.33",
"message": "The host at 198.51.100.33 has been permanently added to the ssh allowed list.",
"code": "SEC3007"
},
{
"alert_uuid": "d7e4f8a2-9b3c-4f1e-8a7d-2b6c9f1e3a4b",
"timestamp": "2025-07-15T10:05:33Z",
"user": "automation_bot",
"source_ip": "192.168.1.20",
"message": "Startup script backup_script.sh exited with error: Disk quota exceeded. System services failed to start, causing immediate operational outage.",
"code": "SYS2002"
},
{
"alert_uuid": "f1a6c8b9-d2b7-4c1e-9d3b-7e2fa3f98b6c",
"timestamp": "2025-07-15T10:15:00Z",
"user": "unknown",
"source_ip": "203.0.113.45",
"message": "The host at 203.0.113.45 has been added to the blocked list because of an SSH DOS attack.",
"code": "SEC3003"
},
{
"alert_uuid": "e2f1a6c8-b9d3-4c1e-9d3b-7e2fa3f98b6c",
"timestamp": "2025-07-15T10:35:00Z",
"user": "jdoe",
"source_ip": "198.51.100.22",
"message": "Account locked due to 5 failed login attempts. Last login attempt from 198.51.100.22.",
"code": "SEC3006"
},
{
"alert_uuid": "c8b9d3a4-b7e2-4f1e-9d3b-7e2fa3f98b6c",
"timestamp": "2025-07-15T10:30:12Z",
"user": "app_service",
"source_ip": "192.168.1.30",
"message": "An application fault occurred: segmentation fault in module auth_handler.",
"code": "APP4005"
}
"""

In [None]:
def make_prompt(alerts):
    return f"""You are a security expert in charge of analyzing security alerts and prioritizing critical ones.
    
    Analyze the alerts below and prioritize which ones are critical.
    
    ## ALERTS
    {alerts}
    
    Provide your response with:
    - Which alerts are CRITICAL and why
    - Which alerts are HIGH priority and why
    - Recommended immediate actions for critical alerts
    - Summary of overall threat landscape
    
    Format your response with clear headings and bullet points.
    """

## Alert Analysis

In [None]:
# Analyze all alerts together
print("=== COMPREHENSIVE ALERT ANALYSIS ===")
response = invoke_endpoint(make_prompt(alerts), max_new_tokens=1500, temperature=0.1)
display(Markdown(response))

## Individual Alert Analysis
Let's analyze each alert individually for detailed insights:

In [None]:
import json as json_module

# Parse individual alerts
alert_list = []
for line in alerts.strip().split('\n'):
    if line.strip() and line.strip() != ',' and line.strip().startswith('{'):
        try:
            clean_line = line.strip().rstrip(',')
            alert = json_module.loads(clean_line)
            alert_list.append(alert)
        except:
            continue

print(f"Found {len(alert_list)} alerts to analyze individually\n")

# Analyze each alert
for i, alert in enumerate(alert_list):
    individual_prompt = f"""Analyze this single security alert:
    
    Alert Details:
    - UUID: {alert['alert_uuid']}
    - Time: {alert['timestamp']}
    - User: {alert['user']}
    - Source IP: {alert['source_ip']}
    - Code: {alert['code']}
    - Message: {alert['message']}
    
    Provide:
    1. Risk Level: Critical/High/Medium/Low
    2. Threat Category
    3. Potential Impact
    4. Immediate Actions Required
    
    Be concise but thorough."""
    
    response = invoke_endpoint(individual_prompt, max_new_tokens=300, temperature=0.1)
    
    print(f"=== Alert {i+1}: {alert['code']} ===")
    print(f"Message: {alert['message']}")
    print(f"\nAI Analysis:")
    print(response)
    print("-" * 80)
    print()

## Custom Alert Analysis
Add your own alerts here for analysis:

In [None]:
# Add your custom alert here
custom_alert = "Multiple failed SSH login attempts from IP 10.0.0.15 targeting admin accounts"

custom_prompt = f"""Analyze this security event and provide a risk assessment:

Event: {custom_alert}

Provide risk level, potential threats, and recommended actions."""

custom_response = invoke_endpoint(custom_prompt, max_new_tokens=200, temperature=0.2)
print("=== Custom Alert Analysis ===")
print(f"Event: {custom_alert}")
print(f"\nAnalysis:")
print(custom_response)