# Chapter 11: SOC 분석 에이전트 (SOC Analyst Agent)

이 노트북에서는 보안 운영센터(SOC) 분석 에이전트 워크플로우를 다룹니다.

## 주요 내용
- SOC 분석 워크플로우
- DSPy 최적화
- 위협 분류 및 대응

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/TeeDDub/Building-Applications-with-AI-Agents/blob/main/notebook/ch11_soc_analyst_agent.ipynb)


## 1. 패키지 설치


In [None]:
!pip install -q langchain langchain-openai langgraph traceloop-sdk dspy-ai


## 2. API 키 설정


In [None]:
import os

try:
    from google.colab import userdata
    os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
    print("✅ Colab Secrets에서 API 키를 불러왔습니다.")
except:
    pass

if not os.getenv("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = "sk-your-api-key-here"
    print("⚠️ API 키를 직접 입력해주세요.")


## 3. soc_analyst_agent.py


SOC 분석가 에이전트의 도구 호출 워크플로우를 정의합니다.


In [None]:
from __future__ import annotations
"""
soc_analyst_agent.py
위협 조사, 사고 대응, 로그 분석 및 보안 모니터링을 처리하는 보안 운영 센터(SOC) 분석가 에이전트를 위한 LangGraph 워크플로우입니다.
"""
import os
import json
import operator
import builtins
from typing import Annotated, Sequence, TypedDict, Optional

from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_core.messages.tool import ToolMessage
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

from langchain_core.tools import tool
from langgraph.graph import StateGraph, END

from traceloop.sdk import Traceloop
from src.common.observability.loki_logger import log_to_loki

os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
os.environ["OTEL_EXPORTER_OTLP_INSECURE"] = "true"

@tool
def lookup_threat_intel(indicator: str, type: str, **kwargs) -> str:
    """IP 주소, 파일 해시, URL 및 도메인에 대한 위협 인텔리전스를 조회합니다."""
    print(f"[TOOL] lookup_threat_intel(indicator={indicator}, type={type}, kwargs={kwargs})")
    log_to_loki("tool.lookup_threat_intel", f"indicator={indicator}, type={type}")
    return "threat_intel_retrieved"

@tool
def query_logs(query: str, log_index: str, **kwargs) -> str:
    """인증, 엔드포인트, 네트워크, 방화벽 및 DNS 시스템 전반에 걸쳐 보안 로그를 검색하고 분석합니다."""
    print(f"[TOOL] query_logs(query={query}, log_index={log_index}, kwargs={kwargs})")
    log_to_loki("tool.query_logs", f"query={query}, log_index={log_index}")
    return "log_query_executed"

@tool
def triage_incident(incident_id: str, decision: str, reason: str, **kwargs) -> str:
    """보안 사고를 실제 긍정(True Positive), 오탐지(False Positive) 또는 추가 조사를 위한 에스컬레이션으로 분류합니다."""
    print(f"[TOOL] triage_incident(incident_id={incident_id}, decision={decision}, reason={reason}, kwargs={kwargs})")
    log_to_loki("tool.triage_incident", f"incident_id={incident_id}, decision={decision}")
    return "incident_triaged"

@tool
def isolate_host(host_id: str, reason: str, **kwargs) -> str:
    """측면 이동을 방지하고 보안 사고를 억제하기 위해 손상된 호스트를 격리합니다."""
    print(f"[TOOL] isolate_host(host_id={host_id}, reason={reason}, kwargs={kwargs})")
    log_to_loki("tool.isolate_host", f"host_id={host_id}, reason={reason}")
    return "host_isolated"

@tool
def send_analyst_response(incident_id: str = None, message: str = None) -> str:
    """보안 분석, 사고 업데이트 또는 권장 사항을 이해 관계자에게 전송합니다."""
    print(f"[TOOL] send_analyst_response → {message}")
    log_to_loki("tool.send_analyst_response", f"incident_id={incident_id}, message={message}")
    return "analyst_response_sent"

TOOLS = [
    lookup_threat_intel, query_logs, triage_incident, isolate_host, send_analyst_response
]

llm = init_chat_model(model="gpt-5-mini", callbacks=[StreamingStdOutCallbackHandler()],  
    verbose=True).bind_tools(TOOLS)

class AgentState(TypedDict):
    incident: Optional[dict]  # 보안 사고 정보
    messages: Annotated[Sequence[BaseMessage], operator.add]

def call_model(state: AgentState):
    history = state["messages"]
    
    # 누락되거나 불완전한 사고 데이터를 우아하게 처리
    incident = state.get("incident", {})
    if not incident:
        incident = {"incident_id": "UNKNOWN", "severity": "medium", "status": "investigating", "analyst": "SOC_EVAL"}
    
    incident_json = json.dumps(incident, ensure_ascii=False)
    system_prompt = (
        "당신은 사이버 보안 사고 대응을 전문으로 하는 숙련된 보안 운영 센터(SOC) 분석가입니다.\n"
        "당신의 전문 분야는 다음과 같습니다:\n"
        "- 위협 인텔리전스 분석 및 침해 지표(IOC) 조사\n"
        "- 여러 시스템에 걸친 보안 로그 분석 및 상관관계 분석\n"
        "- 사고 분류 및 선별(triage) (실제 긍정/오탐지)\n"
        "- 악성코드 분석 및 위협 사냥(Threat Hunting)\n"
        "- 네트워크 보안 모니터링 및 이상 징후 탐지\n"
        "- 사고 봉쇄 및 대응 조정\n"
        "- SIEM/SOAR 플랫폼 운영\n"
        "\n"
        "조사 방법론:\n"
        "  1) 보안 경고를 분석하고 초기 지표를 수집합니다.\n"
        "  2) lookup_threat_intel을 사용하여 IP, 해시, URL 및 도메인을 조사합니다.\n"
        "  3) query_logs를 사용하여 증거를 찾기 위해 관련 로그 소스를 검색합니다.\n"
        "  4) triage_incident를 사용하여 결과를 실제 긍정/오탐지로 분류합니다.\n"
        "  5) 확산을 방지하기 위해 격리가 필요한 경우 isolate_host를 사용합니다.\n"
        "  6) 결과를 문서화하기 위해 send_analyst_response로 후속 조치를 취합니다.\n"
        "\n"
        "항상 신속한 위협 봉쇄와 정확한 사고 분류를 최우선으로 하세요.\n\n"
        f"사고(INCIDENT): {incident_json}"
    )

    full = [SystemMessage(content=system_prompt)] + history

    first: ToolMessage | BaseMessage = llm.invoke(full)
    messages = [first]

    if getattr(first, "tool_calls", None):
        for tc in first.tool_calls:
            print(first)
            print(tc['name'])
            fn = next(t for t in TOOLS if t.name == tc['name'])
            out = fn.invoke(tc["args"])
            messages.append(ToolMessage(content=str(out), tool_call_id=tc["id"]))

        second = llm.invoke(full + messages)
        messages.append(second)

    return {"messages": messages}

def construct_graph():
    g = StateGraph(AgentState)
    g.add_node("assistant", call_model)
    g.set_entry_point("assistant")
    return g.compile()

graph = construct_graph()

if __name__ == "__main__":
    Traceloop.init(disable_batch=True, app_name="soc_analyst_agent")
    example = {"incident_id": "INC-12345", "severity": "high", "type": "suspicious_login", "analyst": "J.Smith"}
    convo = [HumanMessage(content='경고가 발생했습니다: "IP 203.0.113.45에서 관리자 계정으로 의심스러운 로그인 시도." 어떻게 해야 하나요?')]
    result = graph.invoke({"incident": example, "messages": convo})
    for m in result["messages"]:
        print(f"{m.type}: {m.content}") 


## 4. optimize_soc_react_agent.py


DSPy로 SOC 리액트 모듈을 최적화합니다.


In [None]:
import dspy
dspy.configure(lm=dspy.LM("gpt-5-mini"))

def lookup_threat_intel(indicator: str) -> str:
    """모의: 인디케이터에 대한 위협 인텔리전스를 조회합니다."""
    return f"Mock intel for {indicator}: potentially malicious"

def query_logs(query: str) -> str:
    """모의: 보안 로그를 검색하고 분석합니다."""
    return f"Mock logs for '{query}': suspicious activity detected"

# 소수의 합성 테스트 케이스(경보 → 기대 응답)
# 실제로는 실제 로그에서 파생하거나 실패 사례에 주석을 달아 수집합니다.
# 더 나은 최적화를 위해 100개+를 목표로 하세요.
trainset = [
    dspy.Example(alert='''Suspicious login attempt from IP 203.0.113.45 to 
                 admin account.''',
                 response='''Lookup threat intel for IP, query logs for activity, 
                     triage as true positive, isolate host if malicious.''')
                     .with_inputs('alert'),
    dspy.Example(alert="Unusual file download from URL example.com/malware.exe.",
                 response='''Lookup threat intel for URL and hash, query logs 
                     for endpoint activity, triage as true positive, isolate 
                     host.''').with_inputs('alert'),
    dspy.Example(alert="High network traffic to domain suspicious-site.net.",
                 response='''Lookup threat intel for domain, query logs for 
                     network and firewall, triage as false positive if 
                     benign.''').with_inputs('alert'),
    dspy.Example(alert='''Alert: Potential phishing email with attachment 
                 hash abc123.''',
                 response='''Lookup threat intel for hash, query logs for email 
                     and endpoint, triage as true positive, send analyst 
                     response.''').with_inputs('alert'),
    dspy.Example(alert='''Anomaly in user behavior: multiple failed logins from 
                 new device.''',
                 response='''Query logs for authentication, lookup threat intel 
                     for device IP, triage as true positive if pattern matches 
                     attack.''').with_inputs('alert'),
]

# SOC 인시던트 처리를 위한 리액트 모듈 정의
react = dspy.ReAct("alert -> response", tools=[lookup_threat_intel, query_logs])

# 단순 지표를 사용하는 옵티마이저
# (예시를 위해 exact match 사용. 프로덕션에서는
# 시맨틱 유사도 같은 더 정교한 지표를 사용하세요)
tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", 
                  num_threads=24)
optimized_react = tp.compile(react, trainset=trainset)


## 5. optimize_threat_classifier.py


DSPy로 위협 분류 모델을 최적화합니다.


In [None]:
import dspy
dspy.configure(lm=dspy.LM("gpt-5-mini"))

# 위협 분류 작업을 위한 DSPy 시그니처 정의
class ThreatClassifier(dspy.Signature):
   """주어진 인디케이터(IP, URL, 해시 등)의 위협 수준을 
   'benign', 'suspicious', 'malicious' 중 하나로 분류합니다."""
   indicator: str = dspy.InputField(desc="IP 주소, URL, 파일 해시 등 분류할 인디케이터.")
   threat_level: str = dspy.OutputField(desc="분류된 위협 수준: 'benign', 'suspicious', 또는 'malicious'.")

# 사려 있는 분류를 위한 ChainOfThought 기반 DSPy 모듈
class ThreatClassificationModule(dspy.Module):
   def __init__(self):
       super().__init__()
       self.classify = dspy.ChainOfThought(ThreatClassifier)
  
   def forward(self, indicator):
       return self.classify(indicator=indicator)

# 최적화를 위한 합성/수기 주석 데이터셋(실무에서는 실제 SOC 로그에서 50~200+ 예시 사용)
# 각 예시는 인디케이터와 정답 위협 수준을 포함합니다.
trainset = [
   dspy.Example(indicator="203.0.113.45", 
       threat_level="suspicious").with_inputs('indicator'),  # 알려진 악성 IP
   dspy.Example(indicator="example.com/malware.exe", 
       threat_level="malicious").with_inputs('indicator'),  # 악성 URL
   dspy.Example(indicator="benign-site.net", 
       threat_level="benign").with_inputs('indicator'),  # 안전한 도메인
   dspy.Example(indicator="abc123def456", 
       threat_level="malicious").with_inputs('indicator'),  # 멀웨어 해시
   dspy.Example(indicator="192.168.1.1", 
       threat_level="benign").with_inputs('indicator'),  # 로컬 IP
   dspy.Example(indicator="obfuscated.url/with?params", 
       threat_level="suspicious").with_inputs('indicator'),  # 난독화 URL 엣지 케이스
   dspy.Example(indicator="new-attack-vector-hash789", 
       threat_level="malicious").with_inputs('indicator'),  # 새로운 위협
]

# 평가 지표(위협 수준의 exact match.
# 프로덕션에서는 시맨틱 매치나 커스텀 스코어러 권장)
def threat_match_metric(example, pred, trace=None):
   return example.threat_level.lower() == pred.threat_level.lower()

# 모듈 최적화(다양한 사례 처리를 위한 내부 프롬프트를 정제)
optimizer = dspy.BootstrapFewShotWithRandomSearch(metric=threat_match_metric, 
    max_bootstrapped_demos=4, max_labeled_demos=4)
optimized_module = optimizer.compile(ThreatClassificationModule(), 
                                     trainset=trainset)

# 도구에서의 사용 예: 최적화 후 classify_threat에 사용
def classify_threat(indicator: str) -> str:
   """최적화된 DSPy 모듈을 사용해 위협 수준을 분류합니다."""
   prediction = optimized_module(indicator=indicator)
   return prediction.threat_level
