In [1]:
from Generator import Generator
from test_backendclient import TestBackendClient
from typing import List
import logging
import sys

from models import ScenarioCreate

# 기존 핸들러들 제거 (중복 방지)
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# 로깅 설정 추가 (콘솔 + 파일)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),  # 노트북 출력으로 로그 표시
    ]
)

# 루트 로거 레벨 설정
logger = logging.getLogger()
logger.setLevel(logging.INFO)



generator = Generator()
backend_client = TestBackendClient()

def load_artifacts(task_id: str) -> List:
    job_id = "test+job_id"
    artifacts = backend_client.load_artifacts(task_id, job_id)
    return artifacts

def pretty_print_scenario(scenario: ScenarioCreate):
    """시나리오 객체를 받아 가독성 좋은 보고서 형태로 출력합니다."""
    
    print("="*80)
    print(f"📜 시나리오 분석 보고서: {scenario.name}")
    print("="*80)
    
    print("\n[ 보고서 개요 ]")
    print(f"  - {scenario.description}")
    
    print("\n[ 식별 정보 ]")
    print(f"  - Job ID: {scenario.job_id}")
    print(f"  - Task ID: {scenario.task_id}")
    
    print("\n[ 재구성된 공격 단계 (Timeline) ]")
    print("-" * 80)
    
    if not scenario.steps:
        print("  (분석된 단계가 없습니다.)")
    else:
        # 시간 순서대로 정렬 (이미 정렬되어 있지만 안전장치)
        sorted_steps = sorted(scenario.steps, key=lambda s: s.order_no)
        
        for step in sorted_steps:
            # datetime 객체를 보기 좋은 문자열로 포맷팅
            timestamp_str = step.timestamp.strftime('%Y-%m-%d %H:%M:%S') if step.timestamp else ""
            
            # 아티팩트 ID 리스트를 콤마로 구분된 문자열로 변환
            artifacts_str = ", ".join(step.artifact_ids)
            
            print(f"\n▶ Step {step.order_no}: [{timestamp_str}]")
            print(f"  - 내용: {step.description}")
            print(f"  - 연관 아티팩트: [{artifacts_str}]")
    
    print("\n" + "="*80)

2025-10-10 21:25:57,145 - test_backendclient - INFO - Initialized TestBackendClient


In [2]:
from dotenv import load_dotenv
from typing import Annotated, List

from typing_extensions import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage

load_dotenv("../.env")

llm_sm = init_chat_model("google_genai:gemini-2.0-flash", temperature=0)
llm_lg = init_chat_model("google_genai:gemini-2.5-pro", temperature=0)

In [3]:
from typing_extensions import TypedDict
from langchain_core.messages import SystemMessage, HumanMessage, AnyMessage, ToolMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]
    artifacts: list[dict]
    # 최종 보고서가 저장될 전용 필드
    final_report: ScenarioCreate | None

In [4]:
from pydantic import BaseModel, Field
from langchain_core.tools import tool
import json
from langchain_tavily import TavilySearch
from langgraph.prebuilt import ToolNode

def artifacts_to_text(artifacts) -> str:
    # 아티팩트 리스트를 문자열로 변환
    formatted_artifacts = []
    for artifact in artifacts:
        # data 필드의 모든 키-값 쌍을 문자열로 변환합니다.
        # json.dumps를 사용하면 중첩된 구조도 깔끔하게 표현할 수 있습니다.
        data_details = json.dumps(artifact.get('data', {}), ensure_ascii=False)
        
        # 각 아티팩트의 핵심 정보를 하나의 문자열로 결합합니다.
        formatted_string = (
            f"유형: {artifact.get('artifact_type', 'N/A')}, "
            f"요약: {artifact.get('summary', 'N/A')}, "
            f"수집 시각: {artifact.get('collected_at', 'N/A')}, "
            f"세부 데이터: {data_details}"
        )
        formatted_artifacts.append(formatted_string)
    
    # 잘 가공된 문자열 리스트를 하나로 합칩니다.
    artifacts_text = "\n- ".join(formatted_artifacts)
    return artifacts_text

# analyze_artifacts 함수의 입력 타입을 Pydantic으로 정의
class AnalysisInput(BaseModel):
    user_requirements: str = Field(description="사용자가 요청한 분석 요구사항입니다. 보고서 작성의 가이드라인이 됩니다.")
    artifacts_to_analyze: list[dict] = Field(description="분석할 전체 아티팩트 데이터 목록입니다.")

@tool(args_schema=AnalysisInput)
def analyze_artifacts_tool(user_requirements: str, artifacts_to_analyze: list[dict]) -> dict:
    """
    주어진 아티팩트 목록과 사용자 요구사항을 바탕으로,
    공격 시나리오를 상세히 분석하고 구조화된 보고서를 생성합니다.
    이 도구는 최종 보고서를 생성할 준비가 되었을 때 마지막에 사용해야 합니다.
    """
    print(f"--- 🛠️ Tool: analyze_artifacts_tool 실행 ---")
    
    # 아티팩트 텍스트 변환 로직 (기존 analyze_artifacts 함수 내용 활용)
    artifacts_text = artifacts_to_text(artifacts_to_analyze)
    
    # 구조화된 출력을 위한 LLM 호출
    structured_llm = llm_lg.with_structured_output(ScenarioCreate)
    
    prompt = f"""당신은 세계 최고의 디지털 포렌식 전문가입니다.
다음 사용자 요구사항과 아티팩트 데이터를 기반으로 공격 시나리오를 재구성하고 상세 보고서를 생성해주세요.

[사용자 요구사항]:
{user_requirements}

[분석 대상 아티팩트 데이터]:
{artifacts_text}

반드시 요구사항을 철저히 준수하여 최종 보고서를 작성해야 합니다.
"""
    
    # LLM을 호출하여 ScenarioCreate 객체 생성
    analysis_result = structured_llm.invoke(prompt)
    return analysis_result.model_dump(mode='json') # type: ignore

def parse_tool_output(state: AgentState) -> dict:
    """
    가장 마지막 메시지가 analyze_artifacts_tool의 결과인지 확인하고,
    맞다면 그 내용을 final_report 필드에 저장합니다.
    """
    last_message = state["messages"][-1]
    
    # 마지막 메시지가 ToolMessage이고, 이름이 일치하는지 확인
    if isinstance(last_message, ToolMessage) and last_message.name == "analyze_artifacts_tool":
        print("--- 📝 Parsing analysis tool output into state ---")
        
        # ToolMessage의 content는 도구가 반환한 dict 형태의 결과입니다.
        report_data_str = last_message.content
        report_data_dict = json.loads(report_data_str) # type: ignore
        
        # 이제 dict 데이터를 ScenarioCreate 객체로 변환할 수 있습니다.
        final_report_obj = ScenarioCreate(**report_data_dict)
        
        # final_report 필드를 업데이트하여 반환
        return {"final_report": final_report_obj}
    
    # 해당되지 않으면 아무것도 변경하지 않음
    return {}

# --- 웹 검색 도구 ---
search_tool = TavilySearch(max_results=3)

# --- 최종 도구 리스트 ---
tool_list = [analyze_artifacts_tool, search_tool]
tool_node = ToolNode(tool_list)
llm_with_tools = llm_lg.bind_tools(tool_list)

In [5]:
def agent_reasoner(state: AgentState):
    """
    에이전트의 '생각(Reason)'을 담당하는 핵심 노드.
    상황을 판단하고, 도구를 사용할지, 아니면 최종 답변을 할지 결정합니다.
    또한 사용한 툴로 생성된 결과물이 적절한지 검토 합니다.
    """
    print("--- 🤔 Agent: 생각 중... ---")
    
    # 에이전트에게 역할과 목표를 부여하는 시스템 메시지
    system_prompt = f"""당신은 주어진 아티팩트들을 분석하여 보안 사고 보고서를 작성하는 전문 분석가입니다.
당신은 'analyze_artifacts_tool'이라는 강력한 분석 도구를 사용할 수 있습니다.
다음은 당신이 사용할 수 있는 전체 아티팩트 목록입니다. (총 {len(state['artifacts'])}개)
{artifacts_to_text(state["artifacts"])}

사용자의 최종 목표는 아티팩트 분석을 통해 상세한 시나리오 보고서를 받는 것입니다.
스스로 계획을 세우고, 필요하다면 웹 검색으로 추가 정보를 얻거나, 최종적으로 'analyze_artifacts_tool'을 호출하여 보고서를 생성하세요.
분석 도구를 호출할 때는 사용자의 초기 요구사항과 전체 아티팩트 목록을 인자로 전달해야 합니다.
작업을 완료하기 전에 최종 결과물을 검토하고, 내용이 적절한지 판단합니다.
모든 작업이 완료되면, 사용자에게 최종 보고서가 생성되었음을 알리고 작업을 마칩니다.
"""
    
    # 현재 대화 내용과 시스템 메시지를 결합
    messages_with_system_prompt = [SystemMessage(content=system_prompt)] + state["messages"]
    
    response = llm_with_tools.invoke(messages_with_system_prompt)
    return {"messages": [response]}

In [6]:
from langgraph.graph import StateGraph
from langgraph.graph import START, END
from langgraph.prebuilt import tools_condition

# %% [4. 새로운 에이전틱 그래프 구성]

graph_builder = StateGraph(AgentState)

# 노드 추가
graph_builder.add_node("agent", agent_reasoner)
graph_builder.add_node("tools", tool_node)
graph_builder.add_node("parser", parse_tool_output) # 새로운 파서 노드 추가

# 엣지 연결 (ReAct 루프)
graph_builder.add_edge(START, "agent")

# 에이전트의 판단에 따라 분기
graph_builder.add_conditional_edges(
    "agent",
    lambda state: "tools" if state["messages"][-1].tool_calls else END,
    {
        "tools": "tools",
        END: END
    }
)

# [핵심 변경] 도구 실행 후, 바로 에이전트로 돌아가는 대신 파서 노드를 먼저 거치도록 설정
graph_builder.add_edge("tools", "parser")
graph_builder.add_edge("parser", "agent") # 파서가 끝난 후 에이전트로 돌아가서 결과 확인 및 마무리

agent_graph = graph_builder.compile()

In [7]:
# from IPython.display import Image, display
# from langchain_core.runnables.graph import MermaidDrawMethod # 이 부분을 임포트합니다.

# # draw_method를 PYPPETEER로 지정하여 로컬에서 렌더링
# display(Image(agent_graph.get_graph().draw_mermaid_png(
#     draw_method=MermaidDrawMethod.PYPPETEER
# )))

In [8]:
#[5. 그래프 실행]

# 사용자 초기 질문 (요구사항)
query = """다음 요구사항을 바탕으로 분석을 진행하고 최종 보고서를 생성해주세요.

[요구사항]:
- description은 중학생도 이해할 수 있도록 쉬운 단어와 명료한 문장으로 작성해주세요.
- 보고서는 일반 회사 직원을 대상으로 작성되어야 합니다.
- 사실관계에 기반하여 분석하고, 당신의 의견은 명확히 '의견:' 또는 '추정:'이라고 표시해주세요.
- 모든 아티팩트를 분석에 포함할 필요는 없으며, 핵심적인 증거 위주로 구성해주세요.
- 분석 결과, 특별히 의심되는 정황이 없다면 보고서에 명확히 기술해주세요.
"""

# 아티팩트 로드
artifacts = load_artifacts("test_artifacts") # 실제 데이터 로드


# 초기 상태 설정
initial_state = {
    "messages": [HumanMessage(content=query)],
    "artifacts": artifacts
}

# 스트리밍으로 에이전트의 작업 과정 확인
for chunk in agent_graph.stream(initial_state, stream_mode="values"):
    last_message = chunk["messages"][-1]
    if isinstance(last_message, ToolMessage):
        print("\n--- 🔧 Tool 실행 결과 ---")
        print(f"Tool: {last_message.name}")
        # 결과가 길 수 있으므로 일부만 출력
        print(f"Output (shortened): {str(last_message.content)[:500]}...")
    else:
        last_message.pretty_print()

2025-10-10 21:25:57,821 - test_backendclient - INFO - [Test] load_artifacts called for task test_artifacts, job test+job_id

다음 요구사항을 바탕으로 분석을 진행하고 최종 보고서를 생성해주세요.

[요구사항]:
- description은 중학생도 이해할 수 있도록 쉬운 단어와 명료한 문장으로 작성해주세요.
- 보고서는 일반 회사 직원을 대상으로 작성되어야 합니다.
- 사실관계에 기반하여 분석하고, 당신의 의견은 명확히 '의견:' 또는 '추정:'이라고 표시해주세요.
- 모든 아티팩트를 분석에 포함할 필요는 없으며, 핵심적인 증거 위주로 구성해주세요.
- 분석 결과, 특별히 의심되는 정황이 없다면 보고서에 명확히 기술해주세요.

--- 🤔 Agent: 생각 중... ---

네, 알겠습니다. 제공된 아티팩트 목록과 요구사항을 바탕으로 보안 사고 분석을 시작하겠습니다.

분석 계획은 다음과 같습니다.

1.  **초기 침투 경로 파악:** 공격자가 시스템에 처음 접근한 방법을 확인합니다. (예: 웹 다운로드, 메신저 파일 등)
2.  **악성 행위 분석:** 시스템 내부에서 어떤 악의적인 활동이 있었는지 파악합니다. (예: 악성 파일 실행, 명령어 실행, 증거 인멸 시도 등)
3.  **정보 유출 정황 확인:** 내부 데이터가 외부로 유출되었는지 증거를 찾습니다. (예: 메신저 파일 전송, USB 연결 등)
4.  **종합 및 결론:** 위의 분석 내용을 종합하여 공격 시나리오를 재구성하고, 최종 보고서 생성을 위해 `analyze_artifacts_tool`을 호출합니다.

이제 분석을 시작하겠습니다. 아티팩트들을 시간 순서대로 재구성하여 공격의 흐름을 파악했습니다.

- **(의심) 초기 침투 (04:55:59):** 사용자가 웹 브라우저를 통해 의심스러운 웹사이트(`https://malicious-site.com/file.exe`)에 접속하여 `malicious_

In [9]:
# 최종 상태 확인
final_state = agent_graph.invoke(initial_state)

# final_state에서 최종 보고서 추출
final_report_object = final_state.get('final_report')

# 다른 함수(pretty_print_scenario)에 결과 전달
if final_report_object:
    print("\n\n--- 최종 보고서 출력 ---")
    pretty_print_scenario(final_report_object)
else:
    print("최종 보고서가 생성되지 않았습니다.")

--- 🤔 Agent: 생각 중... ---
--- 🛠️ Tool: analyze_artifacts_tool 실행 ---
--- 📝 Parsing analysis tool output into state ---
--- 🤔 Agent: 생각 중... ---


--- 최종 보고서 출력 ---
📜 시나리오 분석 보고서: 악성 파일 다운로드 및 내부 정보 유출 공격 시나리오

[ 보고서 개요 ]
  - 이 보고서는 사용자의 컴퓨터에서 발생한 사이버 공격 의심 정황을 분석한 결과입니다. 분석 결과, 외부 공격자가 악성코드를 사용하여 내부 정보를 외부로 유출하고, 그 흔적을 지우려고 시도한 것으로 보입니다. 각 단계별 상세 분석 내용은 아래와 같습니다.

[ 식별 정보 ]
  - Job ID: J01
  - Task ID: T01

[ 재구성된 공격 단계 (Timeline) ]
--------------------------------------------------------------------------------

▶ Step 1: [2025-09-23 04:55:59]
  - 내용: 공격의 시작은 사용자가 인터넷을 사용하던 중 악성코드가 포함된 파일을 다운로드하면서 시작된 것으로 보입니다. 2025년 9월 23일 오전 4시 55분경, 사용자는 'Suspicious Website'라는 제목의 의심스러운 웹사이트에 방문했으며, 'malicious_file.exe'라는 파일을 컴퓨터에 저장했습니다. 의견: 공격자가 악성 파일을 유포하기 위해 의심스러운 웹사이트를 미리 만들어 놓은 것으로 추정됩니다.
  - 연관 아티팩트: [BROWSER_HISTORY_ARTIFACT_ID, FILE_DOWNLOAD_ARTIFACT_ID]

▶ Step 2: [2025-09-23 04:56:11]
  - 내용: 다운로드된 악성 파일 또는 유사한 악성 프로그램('malicious_app.exe')이 2025년 9월 23일 오전 4시 56분경 컴퓨터에서 실행되었습니다. 또한, 공격자는 