# AUTOSAR Classic ASW 분석용 LangGraph 파이프라인 (SADS/SUDS 지원)

이 노트북은 **C 소스 코드**를 입력으로 받아, 다음 항목을 **SWC 단위로 구조화**하여 **CSV**로 내보내는 LangGraph 파이프라인을 제공합니다.

1) C 코드 입력
2) 코드 전처리/분석
3) SWC별 RTE 인터페이스 추출
4) SWC별 함수 추출
5) SWC별 변수 추출
6) CSV 출력

> **정확도 원칙**: C 파싱/심볼 추출은 LLM이 아니라 **정적 분석(가능하면 libclang)** 기반으로 수행합니다. libclang을 쓸 수 없는 환경에서는 정규식 fallback을 제공하지만, 해당 결과는 정확도를 보장할 수 없습니다(검증 필요).

### 필요 라이브러리 설치

- LangGraph: 그래프 오케스트레이션
- pandas: CSV 출력
- (선택) clang python binding + 시스템 libclang: 정확한 AST 추출

> 회사/프로젝트 빌드 설정(Include path, Defines 등)이 있으면 libclang 정확도가 크게 올라갑니다.

In [14]:
!pip -q install langgraph langchain-core pandas pydantic

# (선택) libclang 기반 추출을 쓰려면 아래도 설치가 필요할 수 있습니다.
# 환경마다 설치 방법이 다르므로, 동작하지 않으면 fallback 모드로 진행하세요.
# !pip -q install clang==17.*


[notice] A new release of pip is available: 25.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


## STEP 1. 입력 정의 및 상태(State) 설계

LangGraph는 노드 간에 공유되는 **State**(dict-like)를 중심으로 동작합니다.

아래 State는 다음을 담습니다.
- 입력: 소스 파일(파일명→내용), 빌드 설정
- 중간 산출물: 전처리 결과, SWC 후보, 추출 결과(함수/변수/RTE)
- 출력: CSV 경로, 이슈(정확도/미해결 매핑 등)

In [None]:
from __future__ import annotations

import os
import re
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
from pydantic import BaseModel, Field

from langgraph.graph import StateGraph, END


In [16]:
class FunctionInfo(BaseModel):
    name: str
    signature: str = ""
    file: str = ""
    line: int = 0
    storage: str = ""          # static/global/unknown
    swc: str = ""
    evidence: str = ""
    confidence: str = "high"   # high/medium/low


class VariableInfo(BaseModel):
    name: str
    vartype: str = ""
    file: str = ""
    line: int = 0
    storage: str = ""          # global/static/unknown
    swc: str = ""
    evidence: str = ""
    confidence: str = "high"


class RteInterfaceInfo(BaseModel):
    api: str                   # e.g., Rte_Read_PpSpeed_Speed
    direction: str             # read/write/call/...
    port: str = ""             # best-effort
    data_element: str = ""     # best-effort
    callee: str = ""           # for Rte_Call
    caller_function: str = ""  # enclosing function best-effort
    file: str = ""
    line: int = 0
    swc: str = ""
    evidence: str = ""
    confidence: str = "high"


class PipelineState(BaseModel):
    # Inputs
    source_files: Dict[str, str] = Field(default_factory=dict)   # filename -> content
    build_config: Dict[str, Any] = Field(default_factory=dict)   # include_dirs/defines/flags/output_csv...

    # Intermediate artifacts
    preprocessed_files: Dict[str, str] = Field(default_factory=dict)
    swc_candidates: List[str] = Field(default_factory=list)
    issues: List[str] = Field(default_factory=list)

    # Extraction results
    functions: List[FunctionInfo] = Field(default_factory=list)
    variables: List[VariableInfo] = Field(default_factory=list)
    rte_interfaces: List[RteInterfaceInfo] = Field(default_factory=list)

    # Output
    csv_path: str = ""


## STEP 2. 유틸리티 및 추출 규칙 정의

### 2-1) RTE API 패턴
AUTOSAR Classic에서 일반적으로 사용되는 RTE API 접두어를 기준으로 추출합니다.

### 2-2) SWC 추정 규칙(보수적)
- `Rte_<Swc>.h` 같은 파일명에서 SWC를 추정
- 또는 상위 폴더명을 SWC 후보로 사용

> 프로젝트마다 규칙이 다르므로, 이 단계는 반드시 `evidence`와 `confidence`를 함께 기록합니다.

In [17]:
RTE_PATTERNS = [
    (r"\bRte_Read_([A-Za-z0-9_]+)\b", "read"),
    (r"\bRte_IRead_([A-Za-z0-9_]+)\b", "read"),
    (r"\bRte_Write_([A-Za-z0-9_]+)\b", "write"),
    (r"\bRte_IWrite_([A-Za-z0-9_]+)\b", "write"),
    (r"\bRte_IStatus_([A-Za-z0-9_]+)\b", "status"),
    (r"\bRte_Call_([A-Za-z0-9_]+)\b", "call"),
    (r"\bRte_IrvRead_([A-Za-z0-9_]+)\b", "irvread"),
    (r"\bRte_IrvWrite_([A-Za-z0-9_]+)\b", "irvwrite"),
    (r"\bRte_Prm_([A-Za-z0-9_]+)\b", "prm"),
    (r"\bRte_Mode_([A-Za-z0-9_]+)\b", "mode"),
    (r"\bRte_Switch_([A-Za-z0-9_]+)\b", "switch"),
]

COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
COMMENT_LINE = re.compile(r"//.*?$", re.MULTILINE)

def strip_comments(code: str) -> str:
    code = re.sub(COMMENT_BLOCK, "", code)
    code = re.sub(COMMENT_LINE, "", code)
    return code

def guess_swc_from_filename(path: str) -> Optional[str]:
    base = os.path.basename(path)
    m = re.match(r"Rte_([A-Za-z0-9_]+)\.(h|c)$", base)
    if m:
        return m.group(1)

    parts = path.replace("\\", "/").split("/")
    if len(parts) >= 2:
        parent = parts[-2]
        if re.match(r"^[A-Za-z][A-Za-z0-9_]*$", parent):
            return parent
    return None

def best_effort_parse_rte_name(api: str, direction: str) -> Tuple[str, str, str]:
    # Common pattern assumption:
    #   Rte_Read_<Port>_<DataElement>
    #   Rte_Write_<Port>_<DataElement>
    #   Rte_Call_<Port>_<Operation>
    # 프로젝트별 네이밍이 다르면 100% 보장할 수 없습니다.
    if "_" not in api:
        return "", "", ""
    tail = api[4:] if api.startswith("Rte_") else api  # drop "Rte_"
    chunks = tail.split("_", 1)
    if len(chunks) != 2:
        return "", "", ""
    rest = chunks[1]
    parts = rest.split("_")
    if len(parts) < 2:
        return "", "", ""
    port = parts[0]
    de_or_op = "_".join(parts[1:])
    if direction == "call":
        return port, "", de_or_op
    else:
        return port, de_or_op, ""

def line_number_at_offset(text: str, offset: int) -> int:
    return text.count("\n", 0, offset) + 1


## STEP 3. 심볼(함수/변수) 추출기

### 3-1) libclang 기반(권장)
- 빌드 설정(include/define/flags)이 있을수록 정확도가 올라갑니다.

### 3-2) 정규식 fallback(비권장)
- 전처리/매크로/typedef/조건부 컴파일이 많은 프로젝트에서는 정확도가 떨어질 수 있어 `confidence=low`로 표시합니다.

In [18]:
# fallback regex (정확도 보장 불가)
FUNC_DEF_REGEX = re.compile(
    r"""(?P<storage>\bstatic\b\s+)?(?P<rtype>[A-Za-z_][\w\s\*\(\)]*?)\s+
        (?P<name>[A-Za-z_]\w*)\s*\((?P<params>[^;]*?)\)\s*\{""",
    re.VERBOSE | re.MULTILINE
)

GLOBAL_VAR_REGEX = re.compile(
    r"""^(?P<storage>\bstatic\b\s+)?(?P<type>[A-Za-z_][\w\s\*]*?)\s+
        (?P<name>[A-Za-z_]\w*)\s*(=\s*[^;]+)?\s*;""",
    re.VERBOSE | re.MULTILINE
)

def extract_with_regex_fallback(preprocessed_files: Dict[str, str], issues: List[str]) -> Tuple[List[FunctionInfo], List[VariableInfo]]:
    functions: List[FunctionInfo] = []
    variables: List[VariableInfo] = []

    for path, code in preprocessed_files.items():
        for m in FUNC_DEF_REGEX.finditer(code):
            name = m.group("name")
            storage = "static" if m.group("storage") else "unknown"
            rtype = " ".join(m.group("rtype").split())
            params = " ".join(m.group("params").split())
            sig = f"{rtype} {name}({params})"
            line = line_number_at_offset(code, m.start())
            functions.append(FunctionInfo(
                name=name,
                signature=sig,
                file=path,
                line=line,
                storage=storage,
                confidence="low",
                evidence="regex fallback (function def pattern)"
            ))

        for m in GLOBAL_VAR_REGEX.finditer(code):
            name = m.group("name")
            vartype = " ".join(m.group("type").split())
            storage = "static" if m.group("storage") else "unknown"
            line = line_number_at_offset(code, m.start())
            variables.append(VariableInfo(
                name=name,
                vartype=vartype,
                file=path,
                line=line,
                storage=storage,
                confidence="low",
                evidence="regex fallback (global var pattern)"
            ))

    issues.append("Fallback(regex) 모드로 심볼을 추출했습니다. (정확도 보장 불가: 매크로/헤더/조건부 컴파일 영향)")
    return functions, variables


def try_extract_with_libclang(preprocessed_files: Dict[str, str], build_config: Dict[str, Any], issues: List[str]) -> Tuple[bool, List[FunctionInfo], List[VariableInfo]]:
    try:
        from clang.cindex import Index, TranslationUnit, Config, CursorKind, StorageClass
    except Exception:
        return False, [], []

    libclang_path = build_config.get("libclang_path")
    if libclang_path:
        try:
            Config.set_library_file(libclang_path)
        except Exception:
            issues.append(f"libclang_path 설정 실패: {libclang_path}")

    include_dirs: List[str] = build_config.get("include_dirs", [])
    defines: Dict[str, str] = build_config.get("defines", {})
    extra_flags: List[str] = build_config.get("extra_flags", [])

    clang_args: List[str] = []
    for inc in include_dirs:
        clang_args += ["-I", inc]
    for k, v in defines.items():
        if v is None or v == "":
            clang_args += [f"-D{k}"]
        else:
            clang_args += [f"-D{k}={v}"]
    clang_args += extra_flags

    idx = Index.create()
    functions: List[FunctionInfo] = []
    variables: List[VariableInfo] = []

    c_files = [p for p in preprocessed_files.keys() if p.endswith(".c")]
    if not c_files:
        c_files = list(preprocessed_files.keys())

    for path in c_files:
        code = preprocessed_files[path]
        unsaved = [(path, code)]
        try:
            tu = idx.parse(
                path,
                args=clang_args,
                unsaved_files=unsaved,
                options=TranslationUnit.PARSE_DETAILED_PROCESSING_RECORD
            )
        except Exception as e:
            issues.append(f"libclang parse 실패({path}): {e}")
            return False, [], []

        for d in tu.diagnostics:
            if d.severity >= 3:
                issues.append(f"libclang diagnostic({path}): {d}")

        def walk(cursor):
            nonlocal functions, variables
            for c in cursor.get_children():
                if c.kind == CursorKind.FUNCTION_DECL and c.is_definition():
                    loc = c.location
                    storage = "static" if c.storage_class == StorageClass.STATIC else "global"
                    sig = f"{c.result_type.spelling} {c.spelling}(" + ", ".join(
                        [f"{a.type.spelling} {a.spelling}".strip() for a in c.get_arguments()]
                    ) + ")"
                    functions.append(FunctionInfo(
                        name=c.spelling,
                        signature=sig,
                        file=str(loc.file) if loc.file else path,
                        line=loc.line or 0,
                        storage=storage,
                        confidence="high",
                        evidence="libclang AST"
                    ))

                if c.kind == CursorKind.VAR_DECL:
                    loc = c.location
                    storage = "static" if c.storage_class == StorageClass.STATIC else "global"
                    variables.append(VariableInfo(
                        name=c.spelling,
                        vartype=c.type.spelling,
                        file=str(loc.file) if loc.file else path,
                        line=loc.line or 0,
                        storage=storage,
                        confidence="high",
                        evidence="libclang AST"
                    ))
                walk(c)

        walk(tu.cursor)

    return True, functions, variables


## STEP 4. RTE 인터페이스 추출기

- 정규식으로 RTE API 호출 위치를 찾고
- `caller_function`은 **같은 파일에서 해당 라인 위에 있는 가장 가까운 함수 정의**를 기준으로 보수적으로 추정합니다.

> 정확한 enclosing function 추출은 AST 기반 call-site 분석이 필요하지만, 이 노트북에서는 파이프라인 가이드를 위해 best-effort로 구현합니다.

In [19]:
def find_enclosing_function_by_line(functions: List[FunctionInfo], file: str, line: int) -> str:
    candidates = [f for f in functions if f.file == file and f.line <= line]
    if not candidates:
        return ""
    return sorted(candidates, key=lambda x: x.line)[-1].name

def extract_rte_calls(preprocessed_files: Dict[str, str], functions: List[FunctionInfo]) -> List[RteInterfaceInfo]:
    rte_list: List[RteInterfaceInfo] = []
    for path, code in preprocessed_files.items():
        for pat, direction in RTE_PATTERNS:
            for m in re.finditer(pat, code):
                api = m.group(0)
                line = line_number_at_offset(code, m.start())
                port, de, callee = best_effort_parse_rte_name(api, direction)

                caller = find_enclosing_function_by_line(functions, path, line)
                conf = "high" if caller else "low"
                ev = f"regex match: {pat}"
                if not caller:
                    ev += " | caller function unresolved"

                rte_list.append(RteInterfaceInfo(
                    api=api,
                    direction=direction,
                    port=port,
                    data_element=de,
                    callee=callee,
                    caller_function=caller,
                    file=path,
                    line=line,
                    confidence=conf,
                    evidence=ev
                ))
    return rte_list


## STEP 5. LangGraph 노드(Node) 구현

요청하신 결과 순서(1~6)를 충족하도록 노드를 구성합니다.

> 내부 실행 순서는 `caller_function` 채움 때문에 **심볼 추출 → RTE 추출**이 더 안전합니다.

In [20]:
def node_preprocess(state: PipelineState) -> PipelineState:
    pre: Dict[str, str] = {}
    for path, code in state.source_files.items():
        norm = code.replace("\r\n", "\n").replace("\r", "\n")
        pre[path] = strip_comments(norm)
    state.preprocessed_files = pre
    return state


def node_swc_candidates(state: PipelineState) -> PipelineState:
    swcs = set()
    for path in state.preprocessed_files.keys():
        swc = guess_swc_from_filename(path)
        if swc:
            swcs.add(swc)
    state.swc_candidates = sorted(swcs)
    if not state.swc_candidates:
        state.issues.append("SWC 후보를 파일/경로 기반으로 추정하지 못했습니다. (SWC 매핑 정확도 저하 가능)")
    return state


def node_extract_symbols(state: PipelineState) -> PipelineState:
    ok, funcs, vars_ = try_extract_with_libclang(state.preprocessed_files, state.build_config, state.issues)
    if not ok:
        funcs, vars_ = extract_with_regex_fallback(state.preprocessed_files, state.issues)
    state.functions = funcs
    state.variables = vars_
    return state


def node_extract_rte(state: PipelineState) -> PipelineState:
    state.rte_interfaces = extract_rte_calls(state.preprocessed_files, state.functions)
    return state


def node_map_to_swc(state: PipelineState) -> PipelineState:
    def map_item(file_path: str) -> Tuple[str, str, str]:
        swc = guess_swc_from_filename(file_path) or ""
        if swc:
            return swc, "high", f"SWC inferred from path/filename: {file_path}"
        return "", "low", f"SWC unresolved for file: {file_path}"

    for f in state.functions:
        swc, conf, ev = map_item(f.file)
        f.swc = swc
        if f.confidence == "low" and conf == "high":
            f.confidence = "medium"
        elif f.confidence != "low":
            f.confidence = conf
        f.evidence += f" | {ev}"

    for v in state.variables:
        swc, conf, ev = map_item(v.file)
        v.swc = swc
        if v.confidence == "low" and conf == "high":
            v.confidence = "medium"
        elif v.confidence != "low":
            v.confidence = conf
        v.evidence += f" | {ev}"

    for r in state.rte_interfaces:
        swc, conf, ev = map_item(r.file)
        r.swc = swc
        if r.confidence == "low" and conf == "high":
            r.confidence = "medium"
        elif r.confidence != "low":
            r.confidence = conf
        r.evidence += f" | {ev}"

    unresolved = sum(1 for x in (state.functions + state.variables) if not x.swc)
    if unresolved:
        state.issues.append(f"{unresolved}개 심볼이 SWC에 결정적으로 매핑되지 않았습니다(규칙 기반).")
    return state


def node_export_csv(state: PipelineState) -> PipelineState:
    rows: List[Dict[str, Any]] = []

    for f in state.functions:
        rows.append({
            "swc": f.swc,
            "kind": "function",
            "name": f.name,
            "signature": f.signature,
            "scope": f.storage,
            "file": f.file,
            "line": f.line,
            "direction": "",
            "port": "",
            "data_element": "",
            "callee": "",
            "caller_function": "",
            "confidence": f.confidence,
            "evidence": f.evidence,
        })

    for v in state.variables:
        rows.append({
            "swc": v.swc,
            "kind": "variable",
            "name": v.name,
            "signature": v.vartype,
            "scope": v.storage,
            "file": v.file,
            "line": v.line,
            "direction": "",
            "port": "",
            "data_element": "",
            "callee": "",
            "caller_function": "",
            "confidence": v.confidence,
            "evidence": v.evidence,
        })

    for r in state.rte_interfaces:
        rows.append({
            "swc": r.swc,
            "kind": "rte_interface",
            "name": r.api,
            "signature": "",
            "scope": "",
            "file": r.file,
            "line": r.line,
            "direction": r.direction,
            "port": r.port,
            "data_element": r.data_element,
            "callee": r.callee,
            "caller_function": r.caller_function,
            "confidence": r.confidence,
            "evidence": r.evidence,
        })

    df = pd.DataFrame(rows)
    out = state.build_config.get("output_csv", "autosar_swc_extract.csv")
    df.to_csv(out, index=False, encoding="utf-8-sig")
    state.csv_path = out
    return state


def node_quality_report(state: PipelineState) -> PipelineState:
    low_or_med = 0
    for x in state.functions + state.variables:
        if x.confidence in ("low", "medium"):
            low_or_med += 1
    for x in state.rte_interfaces:
        if x.confidence in ("low", "medium"):
            low_or_med += 1

    if low_or_med:
        state.issues.append(f"총 {low_or_med}개 항목이 low/medium confidence 입니다. CSV 결과를 검증하세요.")

    if state.build_config.get("print_issues", True):
        print("[ISSUES]")
        for it in state.issues:
            print("-", it)
        print()

    return state


## STEP 6. 그래프(Graph) 구성 및 실행

In [21]:
def build_graph():
    g = StateGraph(PipelineState)

    g.add_node("preprocess", node_preprocess)
    g.add_node("swc_candidates", node_swc_candidates)
    g.add_node("extract_symbols", node_extract_symbols)
    g.add_node("extract_rte", node_extract_rte)
    g.add_node("map_to_swc", node_map_to_swc)
    g.add_node("export_csv", node_export_csv)
    g.add_node("quality_report", node_quality_report)

    g.set_entry_point("preprocess")
    g.add_edge("preprocess", "swc_candidates")
    g.add_edge("swc_candidates", "extract_symbols")
    g.add_edge("extract_symbols", "extract_rte")
    g.add_edge("extract_rte", "map_to_swc")
    g.add_edge("map_to_swc", "export_csv")
    g.add_edge("export_csv", "quality_report")
    g.add_edge("quality_report", END)

    return g.compile()


def run_pipeline(source_files: Dict[str, str], build_config: Optional[Dict[str, Any]] = None) -> PipelineState:
    build_config = build_config or {}
    graph = build_graph()
    init = PipelineState(source_files=source_files, build_config=build_config)
    return graph.invoke(init)


## STEP 7. 실행 예시

In [26]:
import os

base_dir = os.getcwd()

c_files = []
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith(".c"):
            c_files.append(os.path.join(root, file))

demo_cs = [open(c_file, "r", encoding="utf-8").read() for c_file in c_files]

final_state = run_pipeline(
    source_files={c_file: open(c_file, "r", encoding="utf-8").read() for c_file in c_files},
    build_config={
        "output_csv": "autosar_swc_extract.csv",
        "print_issues": True,
        # Optional for libclang:
        # "include_dirs": ["./include"],
        # "defines": {"SOME_MACRO": "1"},
        # "extra_flags": ["-std=c99"],
        # "libclang_path": "/path/to/libclang.so",
    }
)

print("CSV:", final_state["csv_path"])
pd.read_csv(final_state["csv_path"]).head(30)


[ISSUES]
- Fallback(regex) 모드로 심볼을 추출했습니다. (정확도 보장 불가: 매크로/헤더/조건부 컴파일 영향)
- 총 12개 항목이 low/medium confidence 입니다. CSV 결과를 검증하세요.

CSV: autosar_swc_extract.csv


Unnamed: 0,swc,kind,name,signature,scope,file,line,direction,port,data_element,callee,caller_function,confidence,evidence
0,DemoSwc,function,DemoSwc_MainFunction,void DemoSwc_MainFunction(void),unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,6,,,,,,medium,regex fallback (function def pattern) | SWC in...
1,DemoSwc,function,another_function,void another_function(void),unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,15,,,,,,medium,regex fallback (function def pattern) | SWC in...
2,DemoSwc,function,get_counter,int get_counter(void),unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,21,,,,,,medium,regex fallback (function def pattern) | SWC in...
3,DemoSwc,function,get_global,int get_global(void),static,c:\Users\Administrator\Desktop\ai_project\Asw\...,25,,,,,,medium,regex fallback (function def pattern) | SWC in...
4,HWIOP,function,HWIOPSwc_MainFunction,void HWIOPSwc_MainFunction(void),unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,6,,,,,,medium,regex fallback (function def pattern) | SWC in...
5,IVC_P,function,IVC_PSwc_MainFunction,void IVC_PSwc_MainFunction(void),unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,6,,,,,,medium,regex fallback (function def pattern) | SWC in...
6,DemoSwc,variable,g_counter,int,static,c:\Users\Administrator\Desktop\ai_project\Asw\...,3,,,,,,medium,regex fallback (global var pattern) | SWC infe...
7,DemoSwc,variable,g_global,int,unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,4,,,,,,medium,regex fallback (global var pattern) | SWC infe...
8,HWIOP,variable,g_counter,int,static,c:\Users\Administrator\Desktop\ai_project\Asw\...,3,,,,,,medium,regex fallback (global var pattern) | SWC infe...
9,HWIOP,variable,g_global,int,unknown,c:\Users\Administrator\Desktop\ai_project\Asw\...,4,,,,,,medium,regex fallback (global var pattern) | SWC infe...


## STEP 8. 실무 적용 가이드라인

### 정확도 향상을 위한 체크리스트
1) **빌드 설정 확보**: include path, defines, compiler flags
2) `compile_commands.json`이 있다면 이를 기반으로 TU를 구성하는 방식으로 확장
3) SWC 경계가 애매하면, 가능한 경우 **ARXML 연계**로 Port/DataElement/Operation을 확정

### 검증 전략(권장)
- `confidence`가 low/medium인 항목만 필터링해서 빠르게 리뷰
- RTE 호출은 프로젝트 규칙에 맞게 port/data element split 규칙을 보정

> 이 노트북은 “뼈대 + 동작 예시”입니다. 실제 코드베이스에 맞게 SWC 추정 규칙과 빌드 설정 입력을 강화하는 것이 핵심입니다.