In [1]:
import subprocess
import os
import time
from typing import List, TypedDict, Dict
from typing_extensions import Annotated

os.environ["ZHIPUAI_API_KEY"] = "d9ae4d81e22cb9483f5b4d875ba2d1c1.0QNkBJCwDX7rLdI9"
# 用于并行执行 agent 任务
from concurrent.futures import ThreadPoolExecutor, as_completed

# 导入 LangGraph 相关组件
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
# 初始化ChatZhipuAI模型
llm = ChatZhipuAI(
    model="glm-4-plus",
    temperature=0.5,
    streaming=True,
    callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])
)

In [None]:

# =============================================================================
# 定义整体状态数据结构
# =============================================================================
# 初始状态中包含：
# - pcap_file：待分析的 pcap 文件路径；
# - log_files：节点1生成的 .log 文件列表；
# - txt_files：节点2生成的 .txt 文件列表；
# - agent_tasks：经过筛选和拆分后产生的任务列表，每个任务对应一个 agent 的处理工作，
#       任务示例格式：{"agent_id": int, "source_txt": str}
# - messages 字段由 add_messages 管理（但每个 agent在处理时会使用独立的对话内存）
class State(TypedDict):
    pcap_file: str
    log_files: List[str]
    txt_files: List[str]
    agent_tasks: List[Dict]  # 每个任务为 {"agent_id": int, "source_txt": str}
    messages: Annotated[List[dict], add_messages]  # 全局对话历史（此处仅作占位，不用于 agent 间共享）

# =============================================================================
# 创建状态图构建器
# =============================================================================
graph_builder = StateGraph(State)

In [None]:

###############################################################################
# 节点1：调用第一个 shell 脚本，利用 zeek 分析指定 pcap 文件，
#         生成多个 .log 文件（模拟数据）
###############################################################################
def analyze_pcap(state: State) -> State:
    print("【节点1】调用 sh 脚本运行 zeek 分析 pcap 文件...")
    # TODO: 根据实际情况调用 shell 脚本，例如：
    # subprocess.run(["sh", "analyze_pcap.sh", state["pcap_file"]], check=True)
    #
    # 模拟生成 .log 文件列表
    state["log_files"] = ["capture1.log", "capture2.log", "capture3.log"]
    print(f"生成的 log 文件：{state['log_files']}")
    return state

graph_builder.add_node("analyze_pcap", analyze_pcap)
graph_builder.set_entry_point("analyze_pcap")

In [None]:
###############################################################################
# 节点2：调用第二个 shell 脚本，将 .log 文件转换为 .txt 文件，
#         并删除每个文件的前 n 行与最后 m 行（模拟数据）
###############################################################################
def convert_logs(state: State) -> State:
    print("【节点2】调用 sh 脚本转换 .log 文件为 .txt 文件，并删除前 n 行与最后 m 行...")
    # TODO: 遍历 state["log_files"]，调用转换脚本，例如：
    # subprocess.run(["sh", "convert_logs.sh", log_file, str(n), str(m)], check=True)
    #
    # 模拟转换：假设部分转换后的 txt 文件有效（文件名包含 "useful"），部分无效
    simulated_txt = []
    for log in state["log_files"]:
        # 模拟转换结果：将 capture1.log、capture3.log 转换为有用的文件
        if "capture1" in log or "capture3" in log:
            simulated_txt.append(log.replace(".log", "_useful.txt"))
        else:
            simulated_txt.append(log.replace(".log", "_useless.txt"))
    state["txt_files"] = simulated_txt
    print(f"转换得到的 txt 文件：{state['txt_files']}")
    return state

graph_builder.add_node("convert_logs", convert_logs)
graph_builder.add_edge("analyze_pcap", "convert_logs")

In [None]:
###############################################################################
# 节点3：筛选有效的 txt 文件，并基于每个有效文件生成任务列表
#         （注意：一个 txt 文件可能生成多个任务，即对应多个 agent）
###############################################################################
def filter_and_split_tasks(state: State) -> State:
    print("【节点3】筛选有效 txt 文件，并生成 agent 任务列表...")
    agent_tasks = []
    agent_id_counter = 1
    for txt in state["txt_files"]:
        # 仅选取文件名中包含 "useful" 的文件
        if "useful" in txt:
            # TODO: 根据实际需求，可能对同一个 txt 文件生成多个任务
            # 例如：如果文件内容包含多个独立数据块，则每个数据块对应一个任务
            # 这里模拟：对于每个有效文件，随机生成 1~2 个任务
            # 模拟：若文件名中含 "capture1", 生成 2 个任务；否则 1 个任务
            num_tasks = 2 if "capture1" in txt else 1
            for _ in range(num_tasks):
                task = {"agent_id": agent_id_counter, "source_txt": txt}
                agent_tasks.append(task)
                print(f"生成任务：{task}")
                agent_id_counter += 1
    state["agent_tasks"] = agent_tasks
    print(f"共计生成 {len(agent_tasks)} 个 agent 任务")
    return state

graph_builder.add_node("filter_and_split_tasks", filter_and_split_tasks)
graph_builder.add_edge("convert_logs", "filter_and_split_tasks")

In [None]:
###############################################################################
# 节点4：并行处理所有 agent 任务
#         每个任务独立进行：
#           ① 对应的 txt 文件处理生成 y1.txt；
#           ② 将 y1.txt 切分为每 30 行一个数据块；
#           ③ 针对每个数据块调用 llm.invoke（带独立记忆），问答结果写入 answer_agent_x.txt；
#           ④ 动态显示处理进度。
###############################################################################
def process_agent_task(task: Dict) -> None:
    agent_id = task["agent_id"]
    source_txt = task["source_txt"]
    agent_name = f"agent_{agent_id}"
    print(f"[{agent_name}] 开始处理任务，源文件：{source_txt}")

    # ① 模拟对源 txt 进行处理，生成 y1.txt（实际请在此处实现字段提取逻辑）
    y1_file = f"{agent_name}_y1.txt"
    # 模拟读取源文件内容（实际中应读取 source_txt 文件）
    simulated_content = "\n".join([f"Line {i+1} from {source_txt}" for i in range(150)])
    # TODO: 在此处处理 simulated_content 保留特定字段，生成 y1_file 内容
    y1_content = simulated_content  # 模拟处理后结果
    with open(y1_file, "w", encoding="utf-8") as f:
        f.write(y1_content)
    print(f"[{agent_name}] 处理生成 {y1_file}")

    # ② 按每 30 行切分 y1.txt 内容为多个数据块
    lines = y1_content.splitlines()
    data_chunks = ["\n".join(lines[i:i+30]) for i in range(0, len(lines), 30)]
    num_chunks = len(data_chunks)
    print(f"[{agent_name}] 切分为 {num_chunks} 个数据块")

    # ③ 针对每个数据块进行 LLM 问答，每个 agent 使用独立的对话记忆
    conversation_memory = []  # 独立的对话记忆列表
    answer_file = f"answer_{agent_name}.txt"
    with open(answer_file, "w", encoding="utf-8") as f_out:
        for idx, chunk in enumerate(data_chunks):
            if idx == 0:
                prompt = f"语句A: {chunk}"
            else:
                prompt = f"语句B: {chunk}"
            # 添加用户提问到独立记忆中
            conversation_memory.append({"role": "user", "content": prompt})
            # 调用 LLM 进行问答，使用 invoke 方法传入当前对话记忆
            response = llm.invoke(conversation_memory)
            # 将 LLM 响应也加入对话记忆
            conversation_memory.append({"role": "assistant", "content": response})
            # 将回答写入答案文件
            f_out.write(response + "\n")
            # 动态显示进度
            print(f"[{agent_name}] 已处理数据块 {idx+1}/{num_chunks}")
            f_out.flush()
            time.sleep(0.1)  # 模拟处理延时
    print(f"[{agent_name}] 问答结果已保存至 {answer_file}")

def process_agents_parallel(state: State) -> State:
    print("【节点4】并行处理所有 agent 任务...")
    tasks = state["agent_tasks"]
    # 使用线程池并行执行每个 agent 任务
    with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
        futures = {executor.submit(process_agent_task, task): task for task in tasks}
        for future in as_completed(futures):
            # 如果任务内部抛出异常，可在此捕获处理
            try:
                future.result()
            except Exception as e:
                task = futures[future]
                print(f"[agent_{task['agent_id']}] 任务处理异常：{e}")
    print("所有 agent 任务均已处理完成。")
    return state

graph_builder.add_node("process_agents_parallel", process_agents_parallel)
graph_builder.add_edge("filter_and_split_tasks", "process_agents_parallel")

###############################################################################
# 设置流程结束点
###############################################################################
graph_builder.set_finish_point(END)

###############################################################################
# 编译并运行整个流程图（接入 MemorySaver 检查点，用于 LLM 调用记忆，如有需要）
###############################################################################
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

if __name__ == "__main__":
    # 初始化状态，仅需提供 pcap 文件路径，后续流程中各字段逐步填充
    initial_state: State = {"pcap_file": "input.pcap"}
    graph.run(initial_state)
