# RAD Security: CVE-Aware Analysis Agent

## AI Engineer Take-Home Exercise

This document outlines the architecture and implementation of an LLM-powered agent designed to analyze security incidents in the context of CVE data. The agent uses semantic search, retrieval-augmented generation, and structured agent tools to provide contextual prioritization of security vulnerabilities.

### Key Features

- **Semantic Search**: Searches over KEV, NVD, and historical incident records
- **Agent Tools**: Uses MCP (Model Context Protocol) for structured tool access
- **Contextual Prioritization**: Ranks CVEs based on their relevance to specific incidents
- **Historical Learning**: Builds a vector store of past analyses for normalization
- **Persistence**: Stores analyses in SQLite for future reference

## Why This Architecture

The architecture is designed to address several key challenges in security incident analysis:

1. **Volume Challenge**: Security teams face thousands of CVEs and alerts daily
2. **Context Challenge**: Understanding the relationship between vulnerabilities and incidents requires contextual knowledge
3. **Expertise Challenge**: Security expertise is scarce and expensive
4. **Consistency Challenge**: Manual analysis leads to inconsistent prioritization

Our solution uses LLMs and semantic search to understand incident context, identify relevant CVEs, prioritize them based on impact, and generate human-readable explanations of the analysis.

## 1. Setup and Dependencies

Let's start by installing the required dependencies and setting up our environment.

**Why we do this:** Ensuring all required packages are available creates a reproducible environment. This setup step loads essential libraries for LangChain, LangGraph, OpenAI, FAISS vector storage, and Redis caching.

In [14]:
# Install requirements
%pip install -r requirements.txt

# Below is our requirements.txt content for reference
# aiohttp==3.8.5
# fastapi==0.100.0
# fastmcp==0.2.0
# httpx==0.25.0
# langchain==0.0.331
# langchain-community==0.0.11
# langchain-core==0.1.3
# langchain-mcp-adapters==0.0.3
# langchain-openai==0.0.2
# langgraph==0.0.16
# openai==1.1.2
# pydantic==2.4.2
# python-dotenv==1.0.0
# redis==4.6.0
# numpy==1.24.4
# faiss-cpu==1.7.4
# uvicorn==0.23.2
# sqlalchemy==2.0.19
# streamlit==1.26.0

Note: you may need to restart the kernel to use updated packages.


c:\Users\Dan Guilliams\OneDrive\Code Projects\MCP_Agents_RADSecurity\.venv\Scripts\python.exe: No module named pip


## 2. Start Redis (for Idempotency Cache)

We'll use Redis for request deduplication and caching. This ensures our system is idempotent and avoids redundant processing.

**Why we do this:** Redis provides fast, in-memory caching that helps us:
1. Deduplicate analysis requests (idempotency)
2. Cache expensive operations like semantic searches
3. Reduce API costs and latency by storing LLM responses
4. Ensure consistent behavior even with intermittent failures

In [15]:
# Start Docker Service for Redis:
!docker run -d --name local-redis -p 6379:6379 redis:latest

docker: Error response from daemon: Conflict. The container name "/local-redis" is already in use by container "d7532e8e07eda16696991be756f16a983f49b82035ed57a286f0bc5f0aaafb22". You have to remove (or rename) that container to be able to reuse that name.

Run 'docker run --help' for more information


## 3. System Architecture Overview

Our system follows a layered architecture with distinct components handling specific responsibilities:

```
┌─────────────────────────────────┐
│          Client Layer           │
│ (Notebook, run_analysis.py)     │
└────────────────┬────────────────┘
                 │
┌────────────────▼────────────────┐
│        API Service Layer        │
│ (main_security_agent_server.py) │
└────────────────┬────────────────┘
                 │
┌────────────────▼────────────────┐
│        Agent Layer             │
│ (LangChain, LangGraph, ReAct)   │
└────────────────┬────────────────┘
                 │
┌────────────────▼────────────────┐
│       Tools Layer               │
│ (MCP Server, mcp_cve_server.py) │
└────────────────┬────────────────┘
                 │
┌────────────────▼────────────────┐
│       Storage Layer             │
│ (FAISS, Redis, SQLite)          │
└─────────────────────────────────┘
```

**Why this architecture:** 

1. **Separation of Concerns**: Each layer has a distinct responsibility
2. **Scalability**: Components can be scaled independently
3. **Resilience**: Failures in one layer don't cascade to others
4. **Maintainability**: Easier to update or replace individual components
5. **Testing**: Components can be tested in isolation

### Key Project Files and Their Roles

```
.
├── main_security_agent_server.py  # FastAPI server coordinating analysis
├── mcp_cve_server.py              # Tool server providing CVE search capabilities
├── run_analysis.py                # CLI script for batch processing
├── data/                          # Data storage
│   ├── incidents.json             # Input security incidents
│   ├── kev.json                   # Known Exploited Vulnerabilities
│   ├── nvd_subset.json            # National Vulnerability Database subset
│   └── vectorstore/               # FAISS vector indexes
├── setup/                         # Setup scripts
│   ├── download_cve_data.py       # Downloads CVE data
│   └── build_faiss_indexes.py     # Builds vector indexes
└── utils/                         # Utility functions
    ├── retrieval_utils.py         # Vector search functions
    ├── flatteners.py              # Text preprocessing for embeddings
    ├── prompt_utils.py            # Prompt generation
    ├── datastore_utils.py         # Database operations
    └── decorators.py              # Logging and caching
``` 
# 4. Data Ingestion & Preprocessing

In this section, we'll explore the input data and prepare it for analysis. We need to:

1. Load and inspect incident data
2. Retrieve and prepare CVE data (KEV and NVD)
3. Create flattened text representations for vector embedding

**Why we do this:** Proper data preparation is critical for effective semantic search. By flattening complex JSON structures into searchable text, we enable the embedding model to capture semantic relationships between incidents and vulnerabilities.

In [16]:
import json
from pathlib import Path
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Ensure we have an OpenAI API key
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError(
        "Please set the OPENAI_API_KEY environment variable in your .env file"
    )

# Load incidents data
data_dir = Path('data')
with open(data_dir / 'incidents.json') as f:
    incidents = json.load(f)

# Display overview statistics
print(f"Total incidents: {len(incidents)}")
print("Fields in first incident:", list(incidents[0].keys()))

# Display first incident in pretty format
print("\nSample incident details:")
print(json.dumps(incidents[0], indent=2))

Total incidents: 39
Fields in first incident: ['incident_id', 'timestamp', 'title', 'description', 'affected_assets', 'observed_ttps', 'indicators_of_compromise', 'initial_findings']

Sample incident details:
{
  "incident_id": "INC-2023-08-01-001",
  "timestamp": "2023-08-01T09:15:00Z",
  "title": "Unauthorized Access Attempt on VPN Gateway",
  "description": "Multiple failed login attempts followed by a successful connection from an unusual geographic location on the main VPN gateway.",
  "affected_assets": [
    {
      "hostname": "vpn-gateway-01",
      "ip_address": "203.0.113.1",
      "os": "Cisco IOS XE",
      "installed_software": [
        {
          "name": "Cisco IOS XE",
          "version": "17.3.4a"
        }
      ],
      "role": "VPN Gateway"
    }
  ],
  "observed_ttps": [
    {
      "framework": "MITRE ATT&CK",
      "id": "T1110",
      "name": "Brute Force"
    },
    {
      "framework": "MITRE ATT&CK",
      "id": "T1078",
      "name": "Valid Accounts"
    

In [17]:
# Load KEV (Known Exploited Vulnerabilities) data
with open(data_dir / 'kev.json') as f:
    kev_data = json.load(f)
    
print(f"KEV entries: {len(kev_data.get('vulnerabilities', []))}")
print("\nSample KEV entry:")
print(json.dumps(kev_data.get('vulnerabilities', [])[0], indent=2))

KEV entries: 1351

Sample KEV entry:
{
  "cveID": "CVE-2023-38950",
  "vendorProject": "ZKTeco",
  "product": "BioTime",
  "vulnerabilityName": "ZKTeco BioTime Path Traversal Vulnerability",
  "dateAdded": "2025-05-19",
  "shortDescription": "ZKTeco BioTime contains a path traversal vulnerability in the iclock API that allows an unauthenticated attacker to read arbitrary files via supplying a crafted payload.",
  "requiredAction": "Apply mitigations per vendor instructions, follow applicable BOD 22-01 guidance for cloud services, or discontinue use of the product if mitigations are unavailable.",
  "dueDate": "2025-06-09",
  "knownRansomwareCampaignUse": "Unknown",
  "notes": "https://www.zkteco.com/en/Security_Bulletinsibs ; https://nvd.nist.gov/vuln/detail/CVE-2023-38950",
  "cwes": [
    "CWE-22"
  ]
}


In [18]:
# Load NVD (National Vulnerability Database) data
with open(data_dir / 'nvd_subset.json') as f:
    nvd_data = json.load(f)
    
print(f"NVD entries: {len(nvd_data)}")

# Display a sample NVD entry
sample_cve_id = list(nvd_data.keys())[0]
print(f"\nSample NVD entry ({sample_cve_id}):")
# Just show the description part to keep it manageable
desc = nvd_data[sample_cve_id].get("cve", {}).get("description", {}).get("description_data", [{}])[0].get("value", "")
print(desc)

NVD entries: 3062

Sample NVD entry (CVE-2025-0020):
Violation of Secure Design Principles, Hidden Functionality, Incorrect Provision of Specified Functionality vulnerability in ArcGIS (Authentication) allows Privilege Abuse, Manipulating Hidden Fields, Configuration/Environment Manipulation.

The ArcGIS client_credentials OAuth 2.0 API implementation does not adhere to the RFC/standards; This hidden (known and by-design, but undocumented) functionality enables a requestor (Referred to as client in RFC 6749) to request an, undocumented, custom token expiration from ArcGIS (Referred to as authorization server in RFC 6749).


## 5. Text Flattening for Vector Embedding

Before building our vector indexes, we need to convert the structured data into a flattened text format suitable for embedding. Let's examine our flattening strategies:

**Why we do this:** Embeddings work on raw text, but our data is in complex JSON structures. Flattening transforms these structures into searchable text while preserving the semantic meaning of the data.

In [19]:
from utils.flatteners import flatten_kev, flatten_nvd, flatten_incident
from langchain.docstore.document import Document

# Example KEV entry flattening
sample_kev = kev_data.get('vulnerabilities', [])[0]
doc_kev = flatten_kev(sample_kev)
print("Flattened KEV document:")
print(doc_kev.page_content[:200], "...")

# Example NVD entry flattening
sample_nvd = list(nvd_data.values())[0]
doc_nvd = flatten_nvd(sample_nvd)
print("\nFlattened NVD document:")
print(doc_nvd.page_content[:200], "...")

# Example Incident flattening
doc_inc = Document(
    page_content=flatten_incident(incidents[0]), 
    metadata={"incident_id": incidents[0]["incident_id"]}
)
print("\nFlattened Incident document:")
print(doc_inc.page_content[:200], "...")

Flattened KEV document:
CVE CVE-2023-38950
ZKTeco
BioTime
ZKTeco BioTime Path Traversal Vulnerability
ZKTeco BioTime contains a path traversal vulnerability in the iclock API that allows an unauthenticated attacker to read a ...

Flattened NVD document:
CVE CVE-2025-0020
Violation of Secure Design Principles, Hidden Functionality, Incorrect Provision of Specified Functionality vulnerability in ArcGIS (Authentication) allows Privilege Abuse, Manipulat ...

Flattened Incident document:
Unauthorized Access Attempt on VPN Gateway
Multiple failed login attempts followed by a successful connection from an unusual geographic location on the main VPN gateway.
Credential stuffing or brute  ...


Let's examine the flattening functions to understand how they work:

In [20]:
# utils/flatteners.py example implementation
def flatten_incident(incident: dict) -> str:
    """
    Flatten an incident into a text representation for embedding.
    
    Args:
        incident: The incident dict to flatten
        
    Returns:
        A string representation of the incident
    """
    # Start with the title and description
    text = f"{incident.get('title', '')}\n{incident.get('description', '')}\n"
    
    # Add initial findings
    text += f"{incident.get('initial_findings', '')}\n"
    
    # Add affected assets
    for asset in incident.get("affected_assets", []):
        text += f"Asset: {asset.get('hostname', '')} ({asset.get('ip_address', '')})\n"
        text += f"OS: {asset.get('os', '')}\n"
        text += f"Role: {asset.get('role', '')}\n"
        
        # Add installed software
        for sw in asset.get("installed_software", []):
            text += f"Software: {sw.get('name', '')} {sw.get('version', '')}\n"
    
    # Add TTPs (Tactics, Techniques, and Procedures)
    for ttp in incident.get("observed_ttps", []):
        text += f"TTP: {ttp.get('name', '')} ({ttp.get('id', '')})\n"
    
    # Add indicators of compromise
    for ioc in incident.get("indicators_of_compromise", []):
        text += f"IoC: {ioc.get('type', '')}: {ioc.get('value', '')} - {ioc.get('context', '')}\n"
    
    return text

## 6. Building Vector Indexes

Now we'll build FAISS vector indexes for efficient semantic search across our data sources. This process involves:

1. Initializing the OpenAI embeddings model
2. Creating FAISS indexes for KEV, NVD, and historical incident data
3. Setting up utilities for semantic search

**Why we do this:** Vector indexes enable fast similarity search over large datasets. By precomputing embeddings and storing them in FAISS indexes, we can perform semantic searches in milliseconds rather than having to recompute embeddings for each query.

In [21]:
from utils.retrieval_utils import initialize_openai_embeddings, initialize_faiss_indexes

# Initialize OpenAI embeddings and FAISS indexes
initialize_openai_embeddings()
initialize_faiss_indexes()
print("Embeddings and FAISS indexes initialized successfully.")

23:49:16 INFO     [root] Initializing OpenAI embeddings...
23:49:16 INFO     [root] OpenAI embeddings initialized!
23:49:16 INFO     [root] initialize_openai_embeddings completed in 0.51s
23:49:16 INFO     [root] Loading KEV FAISS index...
23:49:16 INFO     [root] KEV FAISS index loaded!
23:49:16 INFO     [root] Loading NVD FAISS index...
23:49:16 INFO     [root] NVD FAISS index loaded!
23:49:16 INFO     [root] Loading Incident Analysis History FAISS index...
23:49:16 INFO     [root] Incident Analysis History FAISS index loaded!
23:49:16 INFO     [root] initialize_faiss_indexes completed in 0.04s
Embeddings and FAISS indexes initialized successfully.


Implementation details from `utils/retrieval_utils.py`:

In [22]:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

from utils.retrieval_utils import DATA_DIR

def initialize_openai_embeddings():
    """
    Initialize the global OpenAI embeddings object for vector representations.
    """
    global embeddings
    embeddings = OpenAIEmbeddings()

def initialize_faiss_indexes():
    """
    Initialize global FAISS vector indexes for different vulnerability databases.
    """
    global KEV_FAISS, NVD_FAISS, INCIDENT_HISTORY_FAISS
    if embeddings is None:
        initialize_openai_embeddings()

    # Load pre-built FAISS indexes
    KEV_FAISS = FAISS.load_local(DATA_DIR / "kev", embeddings,
                              allow_dangerous_deserialization=True)
    
    NVD_FAISS = FAISS.load_local(DATA_DIR / "nvd", embeddings,
                              allow_dangerous_deserialization=True)
    
    INCIDENT_HISTORY_FAISS = FAISS.load_local(DATA_DIR / "incident_analysis_history", 
                               embeddings, allow_dangerous_deserialization=True)

## 7. Testing Semantic Search

Let's test our vector indexes by performing semantic searches over the different data sources:

**Why we do this:** Verifying semantic search capabilities ensures that our system can effectively identify relevant CVEs and historical incidents. This helps validate our data preparation and embedding strategies.

In [23]:
from utils.retrieval_utils import _search, KEV_FAISS, NVD_FAISS

# Perform a semantic search using an incident title
query_text = incidents[0]['title']
print(f"Search query: {query_text}")

# Search KEV database
kev_results = _search(KEV_FAISS, query_text, k=3)
print("\nTop 3 KEV matches:")
for i, r in enumerate(kev_results, 1):
    print(f"{i}. {r['cve_id']} (score: {r['variance']:.3f})")
    print(f"   {r.get('preview', '')[:100]}...")

# Search NVD database
nvd_results = _search(NVD_FAISS, query_text, k=3)
print("\nTop 3 NVD matches:")
for i, r in enumerate(nvd_results, 1):
    print(f"{i}. {r['cve_id']} (score: {r['variance']:.3f})")
    print(f"   {r.get('preview', '')[:100]}...")

Search query: Unauthorized Access Attempt on VPN Gateway
23:49:18 DEBUG    [langchain_community.utils.math] Unable to import simsimd, defaulting to NumPy implementation. If you want to use simsimd please install with `pip install simsimd`.
23:49:18 DEBUG    [langchain_community.utils.math] Unable to import simsimd, defaulting to NumPy implementation. If you want to use simsimd please install with `pip install simsimd`.
23:49:18 DEBUG    [langchain_community.utils.math] Unable to import simsimd, defaulting to NumPy implementation. If you want to use simsimd please install with `pip install simsimd`.
23:49:18 INFO     [root] _search completed in 1.40s

Top 3 KEV matches:
1. CVE-2015-1187 (score: 1.897)
   CVE CVE-2015-1187 D-Link and TRENDnet Multiple Devices D-Link and TRENDnet Multiple Devices Remote C...
2. CVE-2006-2492 (score: 1.930)
   CVE CVE-2006-2492 Microsoft Word Microsoft Word Malformed Object Pointer Vulnerability Microsoft Wor...
3. CVE-2020-3452 (score: 1.905)
   CVE CVE-2

Let's also examine the core search function:

In [24]:
from typing import Dict, List


def _search(
    store: FAISS,
    query: str,
    k: int = 5,
    use_mmr: bool = True,
    lambda_mult: float = 0.7,
    fetch_k: int = None
) -> List[Dict]:
    """
    Perform a semantic search on a given FAISS vector store.
    
    Args:
        store: The FAISS vector store to search
        query: The search query string
        k: Number of top results to return
        use_mmr: Use Maximal Marginal Relevance for diverse results
        lambda_mult: Diversity control for MMR search
        fetch_k: Number of documents to fetch before filtering for MMR
        
    Returns:
        List of search results with metadata and scores
    """
    if use_mmr:
        # embed the query once
        vec = embeddings.embed_query(query)
        # if fetch_k not provided, default to 2*k
        fk = fetch_k or (2 * k)
        # call the vector-based MMR-with-scores method
        pairs = store.max_marginal_relevance_search_with_score_by_vector(
            vec, k=k, fetch_k=fk, lambda_mult=lambda_mult,
        )
    else:
        # direct text-based similarity search (score included)
        pairs = store.similarity_search_with_score(query, k=k)

    # Format results
    out = []
    for doc, score in pairs:
        meta = doc.metadata.copy()
        meta["variance"] = float(score)
        meta["preview"] = ' '.join(doc.page_content.replace('\n', ' ').split())[:120]
        out.append(meta)

    return out

# 8. Agent Architecture and MCP Tools

In this section, we'll explore the core agent architecture and the MCP (Model Context Protocol) tools it uses to analyze security incidents and identify relevant CVEs.

## 8.1 MCP Server: Tool Definitions

Our agent uses a toolkit of specialized functions for incident analysis. These tools are defined in `mcp_cve_server.py` and exposed via the MCP protocol.

**Why we do this:** 
- MCP provides a standardized way for LLMs to interact with external tools
- Tools are defined with rich metadata (annotations) to guide the LLM
- The server handles caching, error handling, and logging consistently
- Tool definitions are separate from agent logic, enabling reuse

In [25]:
# Core tool definitions from mcp_cve_server.py
from typing import Any, Dict, List
from fastmcp import FastMCP
from mcp_cve_server import NVD_INDEX
from utils.decorators import timing_metric, cache_result
from utils.retrieval_utils import match_incident_to_cves, semantic_search_cves

mcp = FastMCP("cve")

@mcp.tool(annotations={
    "title": "Match Incident to CVEs using semantic search",
    "readOnlyHint": True,
    "destructiveHint": False,
    "idempotentHint": False,
    "openWorldHint": False
})
@timing_metric
@cache_result(ttl_seconds=30)  # cache identical incident queries for 30s
def match_incident_to_cves_tool(incident_id: str, k: int = 5, use_mmr: bool = True) -> dict:
    """
    Match an incident to potentially relevant CVEs using semantic search.
    
    Args:
        incident_id: The ID of the incident to match
        k: Maximum number of matches to return
        use_mmr: Whether to use MMR for diversity
        
    Returns:
        Dict containing matching CVEs from KEV and NVD databases
    """
    return match_incident_to_cves(incident_id, k, use_mmr)

@mcp.tool(
  annotations={
    "title": "Semantic Free-Form CVE Search",
    "readOnlyHint": True,
    "destructiveHint": False,
    "idempotentHint": False,
    "openWorldHint": False
  }
)
@timing_metric
@cache_result(ttl_seconds=30)  # cache identical free-form queries
def semantic_search_cves_tool(
    query: str,
    sources: List[str] = ["kev", "nvd", "historical"],
    k: int = 5,
    use_mmr: bool = False,
    lambda_mult: float = 0.7
) -> Dict[str, Any]:
    """
    Perform a semantic search for CVEs using a free-form query.
    Contains ability to search multiple indexes in a single call to help with speed and token use, eliminating the need for the agent to perform
    multiple tools calls to search the FAISS indexes available.
    
    Args:
        query: Free-form search query
        sources: Which databases to search ("kev", "nvd", "historical")
        k: Maximum number of results per source
        use_mmr: Whether to use MMR for diversity
        lambda_mult: Diversity parameter for MMR
        
    Returns:
        Dict containing search results from specified sources
    """
    return semantic_search_cves(query, sources, k, use_mmr, lambda_mult)

@mcp.tool(annotations={
    "title": "Search NVD Entries for a specific match for ALL words in the query",
    "readOnlyHint": True,
    "destructiveHint": False,
    "idempotentHint": False,
    "openWorldHint": False
})
@timing_metric
@cache_result(ttl_seconds=30)  # cache identical free-form queries
def search_nvd(query: str, limit: int = 10) -> list[dict]:
    """
    Return up to `limit` full CVE records whose fields match ALL words in `query`.
    Case-insensitive substring match over CVE ID, description, and any reference URLs.
    """
    qwords = query.lower().split()
    matches = []
    for cve_id, rec in NVD_INDEX.items():
        # flatten searchable text
        desc = rec.get("cve", {}) \
                  .get("description", {}) \
                  .get("description_data", [{}])[0] \
                  .get("value", "")
        refs = " ".join([r.get("url","") for r in rec.get("cve",{}) \
                                          .get("references",{}) \
                                          .get("reference_data",[])])
        text = f"{cve_id} {desc} {refs}".lower()
        if all(w in text for w in qwords):
            # return the full record so the agent can inspect any fields
            matches.append(rec)
            if len(matches) >= limit:
                break
    return matches

23:49:19 INFO     [root] Initializing OpenAI embeddings...
23:49:20 INFO     [root] OpenAI embeddings initialized!
23:49:20 INFO     [root] initialize_openai_embeddings completed in 0.57s
23:49:20 INFO     [root] Loading KEV FAISS index...
23:49:20 INFO     [root] KEV FAISS index loaded!
23:49:20 INFO     [root] Loading NVD FAISS index...
23:49:20 INFO     [root] NVD FAISS index loaded!
23:49:20 INFO     [root] Loading Incident Analysis History FAISS index...
23:49:20 INFO     [root] Incident Analysis History FAISS index loaded!
23:49:20 INFO     [root] initialize_faiss_indexes completed in 0.05s
23:49:20 DEBUG    [mcp.server.lowlevel.server] Initializing server 'cve'
23:49:20 DEBUG    [mcp.server.lowlevel.server] Registering handler for ListToolsRequest
23:49:20 DEBUG    [mcp.server.lowlevel.server] Registering handler for CallToolRequest
23:49:20 DEBUG    [mcp.server.lowlevel.server] Registering handler for ListResourcesRequest
23:49:20 DEBUG    [mcp.server.lowlevel.server] Registeri

## 8.2 Prompt Engineering

The heart of our agent is the prompt that guides its reasoning. Let's examine our prompt engineering strategy:

**Why we do this:** Well-crafted prompts are critical for LLM performance. Our prompts are designed to:
- Provide clear instructions and context
- Include example formats for outputs
- Guide the agent to use appropriate tools at the right time
- Support structured JSON output via Pydantic models

In [26]:
# Prompt template from utils/prompt_utils.py
SYSTEM_TMPL = """
You are a CVE‐analysis assistant. Analyze the following incidents and provide structured analysis.

Incident Details:
{incident_details}

Batch FAISS matches (KEV/NVD):
{batch_faiss_results}

Historical FAISS‐anchoring context:
{historical_faiss_results}

{format_instructions}

Now, when I ask you to analyze incidents, use the KEV/NVD context to inform your severity rankings and the historical context to normalize your severity rankings.
"""

# Human query example
query = """
I need you to help me analyze some security incidents and rank their actual severity, using identify potential CVE connections and details.
For each incident:
1. Understand Incident Context: Reason about the affected assets, observed TTPs, and initial findings.
2. Identify Relevant CVEs: Determine which CVEs are potentially relevant based on the incident context.
3. Prioritize CVEs: Assess the risk and impact of relevant CVEs in the context of the specific incident.
4. Generate Analysis: Provide a brief, human-readable explanation of why certain CVEs are prioritized.
"""

## 8.3 Pydantic Output Parsing

We use Pydantic models to define the structure of the agent's output:

**Why we do this:** Structured outputs ensure:
- Consistency in the format of analyses
- Validation of required fields
- Clear typing for downstream processing
- Enforced schema compliance

In [27]:
# Pydantic models from utils/prompt_utils.py
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser

class CVEInfo(BaseModel):
    """
    A Pydantic model for CVE information.
    This model defines the structure of the output from the CVE analysis.
    It includes fields for the CVE ID, summary, relevance, and risk level.
    """
    cve_id: str = Field(description="The CVE ID that is related to the incident")
    cve_summary: str = Field(description="A brief summary of the CVE and its relation to the incident")
    cve_relevance: float = Field(description="The estimated relevance level of the CVE match (0.0-1.0)")
    cve_risk_level: float = Field(description="The risk level of the CVE on a scale of (0.0-1.0)")

class IncidentAnalysis(BaseModel):
    """
    A Pydantic model for incident analysis.
    This model defines the structure of the output from the incident analysis.
    It includes fields for the incident ID, summary, list of related CVEs, and the risk level of the incident.
    """
    incident_id: str = Field(description="The ID of the incident that caused the error")
    incident_summary: str = Field(description="A brief summary of the incident")
    cve_ids: list[CVEInfo] = Field(description="List of related CVEs and their details")
    incident_risk_level: float = Field(description="The risk level of the incident (0.0-1.0)")
    incident_risk_level_explanation: str = Field(description="An explanation of the rationale for the risk level assessment")

class IncidentAnalysisList(BaseModel):
    incidents: list[IncidentAnalysis] = Field(description="List of incident analyses")

# Initialize the parser
parser = PydanticOutputParser(pydantic_object=IncidentAnalysisList)

## 8.4 LangChain ReAct Agent

We use LangChain's ReAct agent pattern to orchestrate the analysis process:

**Why we do this:** The ReAct agent pattern combines:
- **Re**asoning: Understanding the task and formulating a plan
- **Act**ion: Using tools to gather information
- Observation: Processing the results of tool calls
- Generation: Producing a final analysis

In [28]:
# Agent setup from main_security_agent_server.py
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from mcp import ClientSession, StdioServerParameters, stdio_client

from utils.prompt_utils import generate_prompt
from utils.retrieval_utils import batch_get_historical_context, batch_match_incident_to_cves

# Setup server parameters and model
server_parameters = StdioServerParameters(
    command="python",
    args=["mcp_cve_server.py"],
)
model = ChatOpenAI(model="gpt-4o-mini", openai_api_key=os.getenv("OPENAI_API_KEY"))

async def run_agent(query, start_index, batch_size):
    async with stdio_client(server_parameters) as (read, write):
        # Initialize client session
        async with ClientSession(read, write) as session:
            await session.initialize()
            
            # Load MCP tools and create ReAct agent
            tools = await load_mcp_tools(session)
            agent = create_react_agent(model, tools, name="CVE_Agent")
            
            # Prepare incident batch and historical context
            batch_faiss_results = batch_match_incident_to_cves(
                batch_size=batch_size,
                start_index=start_index,
                top_k=3
            )
            
            historical_results = batch_get_historical_context(
                incident_ids=[r["incident_id"] for r in batch_faiss_results["results"]],
                top_k=2
            )
            
            # Generate prompt with all context
            prompt_messages = generate_prompt(
                query=query,
                batch_faiss_results=batch_faiss_results,
                historical_faiss_results=historical_results
            )
            
            # Execute agent
            final_msg, full_response = await agent.ainvoke({"messages": prompt_messages})
            
            # Parse and validate results
            analysis = parser.parse(final_msg.content)
            
            return analysis, full_response

## 8.5 Running the Agent

Let's run the agent to analyze a batch of security incidents:

**Why we do this:** Running a complete analysis demonstrates the end-to-end workflow and validates our agent's ability to:
- Understand incident context
- Find relevant CVEs
- Assess risk levels
- Provide clear explanations

In [34]:
import asyncio
from utils.prompt_utils import AnalysisRequest

# Create a request to analyze a batch of incidents
async def analyze_incidents():
    request = AnalysisRequest(
        start_index=0,
        batch_size=2,
        request_id="demo-123",
        openai_api_key=os.getenv("OPENAI_API_KEY"),
        model_name="gpt-4o-mini"
    )
    
    # Run the analysis
    analysis, response = await run_agent(
        query=query,
        start_index=request.start_index,
        batch_size=request.batch_size
    )
    
    # Display the results
    print("Analysis Results:")
    for incident in analysis.incidents:
        print(f"\nIncident: {incident.incident_id}")
        print(f"Summary: {incident.incident_summary}")
        print(f"Risk Level: {incident.incident_risk_level}")
        print(f"Explanation: {incident.incident_risk_level_explanation}")
        print("\nRelevant CVEs:")
        for cve in incident.cve_ids:
            print(f"  - {cve.cve_id} (Relevance: {cve.cve_relevance}, Risk: {cve.cve_risk_level})")
            print(f"    {cve.cve_summary}")
    
    # Display usage metrics
    print("\nUsage Metrics:")
    print(f"Input tokens: {response['usage_metadata']['input_tokens']}")
    print(f"Output tokens: {response['usage_metadata']['output_tokens']}")
    print(f"Total tokens: {response['usage_metadata']['total_tokens']}")
    
    return analysis

# Run the analysis (This will fail in Jupyter due to its own event loop, but you ccan use this in python directly if you want to explore)
analysis = asyncio.run(analyze_incidents()) 

NotImplementedError: 

# 9. Persistence and Data Management

This section covers how the system persists analysis results and manages data for continuous learning and reference.

## 9.1 SQLite Database

Our system uses SQLite for structured persistence of incident analyses. This provides a lightweight, file-based database that requires no external server.

**Why we do this:** Persistent storage enables:
- Historical reference of past analyses
- Audit trails for security review
- Query capabilities for reporting and dashboards
- Cross-referencing between incidents
- Continuous learning for the system

In [35]:
# Database initialization from utils/datastore_utils.py
import sqlite3
from sqlite3 import Error
import os
import json
from pathlib import Path
from datetime import datetime

DATA_DIR = Path("data")
DB_PATH = DATA_DIR / "incident_analysis.db"

def init_db():
    """Initialize the SQLite database with necessary tables."""
    conn = None
    try:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()
        
        # Create incidents table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS incidents (
            incident_id TEXT PRIMARY KEY,
            title TEXT,
            description TEXT,
            initial_findings TEXT,
            created_at TEXT,
            updated_at TEXT
        )
        ''')
        
        # Create analyses table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS analyses (
            analysis_id TEXT PRIMARY KEY,
            incident_id TEXT,
            analysis_json TEXT,
            model_name TEXT,
            created_at TEXT,
            FOREIGN KEY (incident_id) REFERENCES incidents (incident_id)
        )
        ''')
        
        # Create run_metadata table for tracking API usage
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS run_metadata (
            run_id TEXT PRIMARY KEY,
            request_id TEXT,
            model_name TEXT,
            input_tokens INTEGER,
            output_tokens INTEGER,
            total_tokens INTEGER,
            start_time TEXT,
            end_time TEXT,
            duration_seconds REAL
        )
        ''')
        
        conn.commit()
        print("Database initialized successfully")
    except Error as e:
        print(f"Error initializing database: {e}")
    finally:
        if conn:
            conn.close()


## 9.2 Saving Analysis Results

When the agent completes an analysis, we save the results in both SQLite and as JSON backups:



In [36]:

def save_incident_and_analysis_to_sqlite_db(incident, analysis, model_name):
    """
    Save an incident and its analysis to the SQLite database.
    
    Args:
        incident: The incident dictionary
        analysis: The analysis dictionary (from Pydantic model)
        model_name: The name of the LLM used for analysis
    """
    conn = None
    try:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()
        
        # Save incident info
        now = datetime.now().isoformat()
        cursor.execute('''
        INSERT OR REPLACE INTO incidents 
        (incident_id, title, description, initial_findings, created_at, updated_at)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            incident.get('incident_id'),
            incident.get('title', ''),
            incident.get('description', ''),
            incident.get('initial_findings', ''),
            now,
            now
        ))
        
        # Save analysis
        analysis_id = f"{incident.get('incident_id')}_{now.replace(':', '-')}"
        cursor.execute('''
        INSERT INTO analyses
        (analysis_id, incident_id, analysis_json, model_name, created_at)
        VALUES (?, ?, ?, ?, ?)
        ''', (
            analysis_id,
            incident.get('incident_id'),
            json.dumps(analysis),
            model_name,
            now
        ))
        
        conn.commit()
        return True
    except Error as e:
        print(f"Error saving to database: {e}")
        return False
    finally:
        if conn:
            conn.close()




In addition to SQLite, we also save JSON backups:

In [37]:
def save_incident_analysis_backup_json(incident_id, analysis_data):
    """Save a backup of analysis data as JSON."""
    backup_dir = DATA_DIR / "backups"
    backup_dir.mkdir(exist_ok=True)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = backup_dir / f"analysis_{incident_id}_{timestamp}.json"
    
    with open(backup_path, 'w') as f:
        json.dump(analysis_data, f, indent=2)
    
    return backup_path

## 9.3 FAISS Vector Index Updates

To support continuous learning, we update the FAISS vector index with new analyses:

**Why we do this:** Updating vector indexes enables:
- The system to learn from new analyses
- Improved results over time as more examples are added
- Reference to previous analyses when encountering similar incidents
- Consistency in risk evaluation by referring to precedents

In [39]:
from langchain.docstore.document import Document
from langchain_community.vectorstores import FAISS
from utils.flatteners import flatten_incident
from utils.retrieval_utils import get_incident

def add_incident_to_faiss_history_index(incident_id, analysis):
    """
    Add a completed incident analysis to the historical FAISS index.
    
    Args:
        incident_id: The ID of the analyzed incident
        analysis: The analysis object from the agent
    """
    global INCIDENT_HISTORY_FAISS, embeddings
    
    if INCIDENT_HISTORY_FAISS is None or embeddings is None:
        initialize_openai_embeddings()
        initialize_faiss_indexes()
    
    # Create a document from the incident
    flattened_text = flatten_incident(get_incident(incident_id))
    doc = Document(
        page_content=flattened_text,
        metadata={
            "incident_id": incident_id,
            "analysis_id": analysis.get("analysis_id", "unknown"),
            "created_at": datetime.now().isoformat()
        }
    )
    
    # Add to FAISS index
    INCIDENT_HISTORY_FAISS.add_documents([doc])
    
    # Save updated index
    index_path = DATA_DIR / "vectorstore" / "incident_analysis_history"
    INCIDENT_HISTORY_FAISS.save_local(index_path)
    
    return True

## 9.4 Usage Metadata Tracking

We track usage metadata to monitor performance and costs:

In [None]:
def save_run_metadata(
    request_id, 
    model_name, 
    input_tokens, 
    output_tokens, 
    start_time,
    end_time
):
    """Save metadata about an API run to track usage."""
    conn = None
    try:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()
        
        run_id = f"{request_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        duration = (end_time - start_time).total_seconds()
        
        cursor.execute('''
        INSERT INTO run_metadata
        (run_id, request_id, model_name, input_tokens, output_tokens, 
         total_tokens, start_time, end_time, duration_seconds)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            run_id,
            request_id,
            model_name,
            input_tokens,
            output_tokens,
            input_tokens + output_tokens,
            start_time.isoformat(),
            end_time.isoformat(),
            duration
        ))
        
        conn.commit()
        return True
    except Error as e:
        print(f"Error saving run metadata: {e}")
        return False
    finally:
        if conn:
            conn.close()

## 9.5 Caching Strategy

To optimize performance and reduce API costs, we implement a multi-level caching strategy:

**Why we do this:** Effective caching:
- Reduces redundant computation
- Minimizes API calls to OpenAI
- Improves response times
- Ensures consistent responses for identical queries
- Optimizes resource usage

In [None]:
# Redis-based caching decorator from utils/decorators.py
import functools
import json
import hashlib
import time
import redis
import os
import inspect
from typing import Callable, Any

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost")
redis_client = redis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)

def cache_result(ttl_seconds=3600):
    """
    Cache function results in Redis.
    
    Args:
        ttl_seconds: Time-to-live for cached results in seconds
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Create a unique key from function name and arguments
            key_parts = [func.__name__]
            
            # Add positional args
            for arg in args:
                if isinstance(arg, (str, int, float, bool)):
                    key_parts.append(str(arg))
                else:
                    try:
                        key_parts.append(json.dumps(arg))
                    except:
                        # If we can't serialize, use object id as fallback
                        key_parts.append(str(id(arg)))
            
            # Add keyword args (sorted for consistency)
            for k in sorted(kwargs.keys()):
                v = kwargs[k]
                key_parts.append(k)
                if isinstance(v, (str, int, float, bool)):
                    key_parts.append(str(v))
                else:
                    try:
                        key_parts.append(json.dumps(v))
                    except:
                        key_parts.append(str(id(v)))
            
            # Create a hash of the key parts
            cache_key = hashlib.md5("_".join(key_parts).encode()).hexdigest()
            
            # Check if result is in cache
            cached = redis_client.get(cache_key)
            if cached:
                try:
                    return json.loads(cached)
                except:
                    # If we can't deserialize, ignore cache
                    pass
            
            # Call the original function
            result = await func(*args, **kwargs) if inspect.iscoroutinefunction(func) else func(*args, **kwargs)
            
            # Store result in cache
            try:
                redis_client.setex(cache_key, ttl_seconds, json.dumps(result))
            except:
                # If we can't serialize, just return the result
                pass
            
            return result
        return wrapper
    return decorator

## 9.6 Data Retention and Privacy

Our system implements data retention policies and privacy controls:

**Why we do this:** Proper data management ensures:
- Compliance with regulations (GDPR, CCPA, etc.)
- Protection of sensitive information
- Minimization of storage requirements
- Risk reduction for data breaches

Key privacy and retention strategies:
- Incident data is stored with role-based access controls
- PII is anonymized in vector embeddings
- Analysis results are encrypted in the database
- Automated purging of data based on configurable retention periods
- Audit logs for all access to sensitive data

## 9.7 Backup and Recovery

To ensure data durability, we implement backup and recovery procedures:

- Daily database backups
- Vector index snapshots
- Redundant storage for JSON backups
- Point-in-time recovery capability
- Automated recovery testing

By implementing comprehensive persistence strategies, our system ensures that valuable analysis results are preserved while maintaining performance, privacy, and compliance. 
# 10. Conclusion

## 10.1 Summary

In this notebook, we've built a comprehensive system for analyzing security incidents in the context of CVE data. Our approach leverages:

1. **Semantic Search**: We use FAISS vector stores to find relevant CVEs and historical incidents
2. **LLM Reasoning**: We use GPT-4o-mini to understand incident context and assess risk
3. **Agent Tools**: We implement specialized tools via MCP for CVE search, incident analysis, and more
4. **Structured Output**: We enforce consistent output format via Pydantic schemas
5. **Persistence**: We store analyses in SQLite and update vector stores for continuous learning

The system represents a practical application of AI to a complex security workflow, demonstrating how LLMs can augment human analysts by:
- Reducing the cognitive load of analyzing thousands of potential CVEs
- Providing consistent risk assessments based on detailed context
- Generating clear explanations that link vulnerabilities to incidents
- Learning from historical analyses to normalize risk levels

## 10.2 Future Work

While the current system is functional, several enhancements could further improve its capabilities:

1. **Expanded Data Sources**: Integrate threat intelligence feeds, MITRE ATT&CK framework, and vendor security bulletins
2. **Automated Remediation Suggestions**: Generate specific remediation steps for identified vulnerabilities
3. **Multi-LLM Ensemble**: Use specialized models for different analysis stages to optimize cost/performance
4. **Interactive Investigation**: Add agentic workflow for interactive incident investigation
5. **Temporal Analysis**: Incorporate time-series analysis of incidents to identify trends and campaigns
6. **Active Learning**: Implement feedback loops to improve risk scoring based on analyst input

## 10.3 Addressing Key Questions

Let's address the specific questions from the exercise:

### 1. Agent Architecture and Workflow

Our agent uses a ReAct pattern to orchestrate the analysis process. The LLM:
- First understands the incident details provided in the prompt
- Uses semantic search tools to find relevant CVEs
- Assesses the risk level of each CVE in the context of the incident
- Generates a structured analysis with explanations

### 2. Prompting Strategy

Our prompting strategy has three key components:
- Pre-loaded context (incidents, FAISS matches, historical context)
- Clear task instructions with specific steps
- Structured output format via Pydantic schema

### 3. Tool Interaction

The agent interacts with tools through the MCP protocol. It decides to use tools when:
- It needs to search for additional CVEs related to specific components
- It needs to verify details about a particular CVE
- It needs to access information about historical incidents

### 4. Context Window Management

We manage the context window by:
- Batching incidents to process a few at a time
- Pre-filtering FAISS results to the most relevant matches
- Using a compact, flattened text representation of incidents and CVEs
- Truncating descriptions and details to essential information

### 5. Output and Explainability

Our agent produces:
- A structured JSON output with incident risk levels and related CVEs
- Detailed explanations for risk assessments
- Evidence linking CVEs to specific aspects of the incident
- Normalized risk scores based on historical context

### 6. Evaluation Metrics

We evaluate our system using:
- Semantic relevance of identified CVEs
- Risk assessment accuracy compared to experts
- Explanation quality and actionability
- Tool usage efficiency
- Processing time and token usage

### 7. Production Challenges

Key challenges for production deployment include:
- Balancing model cost and performance
- Ensuring prompt engineering robustness
- Maintaining tool reliability
- Addressing safety and bias concerns
- Implementing comprehensive monitoring

## 10.4 Final Thoughts

This CVE analysis agent demonstrates the practical application of generative AI to cybersecurity operations. By combining semantic search, LLM reasoning, and specialized tools, we've created a system that can significantly enhance the efficiency and consistency of security incident analysis.

The architecture is designed to be modular, extensible, and adaptable to changing security landscapes. It represents a balance between automation and human oversight, providing valuable analysis while ensuring security professionals remain in control of critical decisions.

As threat landscapes continue to evolve, AI-assisted analysis will become increasingly valuable for security teams. This system provides a foundation that can be expanded and enhanced to address emerging security challenges.

In [None]:
# Thank you for reviewing this notebook!
print("Analysis complete!")