In [1]:
import json

def extract_event_trace_from_joern(graph_json):
    """
    Extracts a detailed, stepwise event trace from a Joern graph.json file
    for a C/C++ function. Designed for conflict-driven learning (CDL),
    contrastive neuro-symbolic research, or explainable code analysis.

    Args:
        graph_json (dict): Loaded Joern graph.json object.

    Returns:
        List[str]: Stepwise event trace, formatted for symbolic learning.
    """
    nodes = {str(n['id']): n for n in graph_json['nodes']}
    edges = graph_json['edges']

    # Find the main METHOD node (function)
    method_nodes = [n for n in nodes.values() if n['label'].strip('"') == 'METHOD']
    if not method_nodes:
        raise ValueError("No METHOD node found in the graph.")
    method = method_nodes[0]

    # Find the AST root for the function body (BLOCK node)
    ast_body_edges = [e for e in edges if e['src'] == method['id'] and e['label'] == '"AST"']
    body_id = None
    for e in ast_body_edges:
        dst = nodes.get(e['dst'])
        if dst and dst['label'].strip('"') == 'BLOCK':
            body_id = dst['id']
            break
    if not body_id:
        raise ValueError("No BLOCK node (function body) found in the graph.")

    # Helper to clean field values
    def clean(field):
        return field.strip('"') if isinstance(field, str) else str(field)

    # Helper to get AST children
    def get_ast_children(node_id):
        return [e['dst'] for e in edges if e['src'] == node_id and e['label'] == '"AST"']

    trace = []

    def walk_ast(node_id, indent=0):
        node = nodes[str(node_id)]
        label = node['label'].strip('"')
        code = clean(node.get('CODE', ''))

        # --- Variable declarations ---
        if label == "LOCAL":
            trace.append("    " * indent + f"declare {clean(node['TYPE_FULL_NAME'])} {clean(node['NAME'])}")

        # --- Assignment, calls, field access, operators ---
        elif label == "CALL":
            mfn = clean(node.get('METHOD_FULL_NAME', ''))
            if mfn == "<operator>.assignment":
                trace.append("    " * indent + f"assign: {code}")
            elif mfn == "<operator>.indirectFieldAccess":
                trace.append("    " * indent + f"field_access: {code}")
            elif mfn in ["<operator>.conditional", "<operator>.logicalNot", "<operator>.notEquals", "<operator>.equals", "<operator>.and", "<operator>.or"]:
                trace.append("    " * indent + f"condition: {code}")
            elif mfn.startswith("<operator>"):
                trace.append("    " * indent + f"operator: {mfn} ({code})")
            else:  # Regular function call
                trace.append("    " * indent + f"call: {mfn} ({code})")

        # --- Variable usage ---
        elif label == "IDENTIFIER":
            tname = clean(node.get("TYPE_FULL_NAME", ""))
            trace.append("    " * indent + f"use {code} as {tname}")

        # --- Constants/literals ---
        elif label == "LITERAL":
            trace.append("    " * indent + f"literal: {code}")

        # --- Field identifiers ---
        elif label == "FIELD_IDENTIFIER":
            field_name = clean(node.get('CANONICAL_NAME', ''))
            trace.append("    " * indent + f"field: {field_name}")

        # --- Control structures: if, else, loops, etc ---
        elif label == "CONTROL_STRUCTURE":
            cs_code = code.lower()
            children = get_ast_children(node_id)
            if cs_code.startswith('if'):
                # Try to extract the condition node and then/else blocks
                if len(children) >= 2:
                    cond_id = children[0]
                    then_id = children[1]
                    cond_code = nodes[str(cond_id)].get('CODE', '').strip()
                    trace.append("    " * indent + f"if: {cond_code}")
                    walk_ast(then_id, indent + 1)
                    # Check for 'else' block
                    if len(children) > 2:
                        else_id = children[2]
                        trace.append("    " * indent + "else")
                        walk_ast(else_id, indent + 1)
                else:
                    trace.append("    " * indent + f"if: {code}")
            elif cs_code.startswith('else if'):
                trace.append("    " * indent + f"else if: {code}")
            elif cs_code.startswith('else'):
                trace.append("    " * indent + "else")
                if children:
                    walk_ast(children[0], indent + 1)
            elif cs_code.startswith('for'):
                trace.append("    " * indent + f"for: {code}")
                if children:
                    walk_ast(children[-1], indent + 1)
            elif cs_code.startswith('while'):
                trace.append("    " * indent + f"while: {code}")
                if children:
                    walk_ast(children[-1], indent + 1)
            elif cs_code.startswith('switch'):
                trace.append("    " * indent + f"switch: {code}")
                if children:
                    for cid in children[1:]:
                        walk_ast(cid, indent + 1)
            elif cs_code.startswith('return'):
                trace.append("    " * indent + f"return: {code}")
            else:
                trace.append("    " * indent + f"control: {code}")

        # --- Return statements ---
        elif label == "RETURN":
            trace.append("    " * indent + f"return: {code}")

        # --- Recursively process children (except CONTROL_STRUCTURE, which we handled above) ---
        if label not in ["CONTROL_STRUCTURE"]:
            for child_id in get_ast_children(node_id):
                walk_ast(child_id, indent)

    walk_ast(body_id)
    return trace


diverse

In [2]:
import pandas as pd

df = pd.read_csv("diverse_test.csv")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_diverse/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("diverse_with_event_trace_graph.csv", index=False,quoting=1)

bigvul

In [4]:
import pandas as pd

df = pd.read_csv("big_vultest.csv")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_big/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("big_vul_with_event_trace.csv", index=False,quoting=1)

djuliet

In [5]:
import pandas as pd

df = pd.read_csv("djuliet_test.csv")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_djuliet/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("djuliet_with_event_trace.csv", index=False,quoting=1)

cve-fixes

In [6]:
import pandas as pd

df = pd.read_json("test_512.json")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_fun/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("cvefixes_with_event_trace.csv", index=False,quoting=1)

mix_vul

In [7]:
import pandas as pd

df = pd.read_csv("mix_test_vultest.csv")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_mix/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("mixvul_with_event_trace.csv", index=False,quoting=1)

reveal_with-event trace

In [8]:
import pandas as pd

df = pd.read_csv("Reveal_vultest.csv")



def get_graph_path(idx):
    return f"/mnt/c/Users/user01/PycharmProjects/PythonProject2/joern_output_reveal/sample_{idx}/json/graph.json"

df["graph_path"] = df.index.map(get_graph_path)

df["event_trace"] = df["graph_path"].apply(lambda p: extract_event_trace_from_joern(json.load(open(p))))


df.to_csv("reveal_with_event_trace.csv", index=False,quoting=1)