In [None]:
import csv
import re

def parse_and_generate_csv(input_filename, nodes_csv_filename, edges_csv_filename):
    with \
        open(input_filename, "r") as input_file,\
        open(nodes_csv_filename, "w", newline="") as nodes_csv,\
        open(edges_csv_filename, "w", newline="") as edges_csv:
        input_text = input_file.read()
        nodes_writer = csv.writer(nodes_csv)
        edges_writer = csv.writer(edges_csv)

        nodes_writer.writerow(["id", "name", "body", "kind", "prop", "path"])
        edges_writer.writerow(["source", "target", "weight"])

        node_data = {}
        for line in input_text.strip().split("\n"):
            if line.startswith("N:"):
                match = re.match(r'N:\s+(\d+)\s+"([^"]+)"\s+\[([^\]]+)\]\s*;', line)
                if match:
                    node_id, node_name, node_attributes = match.group(1), match.group(2), match.group(3)
                    attributes = dict(re.findall(r'(\w+)="?(\w+)"?', node_attributes))
                    node_data[node_id] = {"name": node_name, **attributes}
                    data = node_data[node_id]
                    nodes_writer.writerow([
                        node_id,
                        data["name"],
                        data.get("body", ""),
                        data.get("kind", ""),
                        data.get("prop", ""),
                        data.get("path", "")
                    ])

            elif line.startswith("E:"):
                match = re.match(r'E:\s+(\d+)\s+(\d+)\s+\[([^\]]+)\]\s*;', line)
                if match:
                    source_id, target_id, edge_attributes = match.group(1), match.group(2), match.group(3)
                    attributes = dict(re.findall(r'(\w+)=(\w+)', edge_attributes))
                    edges_writer.writerow([source_id, target_id, attributes.get("weight", "")])

# Example usage
parse_and_generate_csv("_build/default/catala.dpd", "data/nodes.csv", "data/edges.csv")

In [None]:
import csv
import networkx as nx

def load_csv_to_networkx(nodes_csv_filename, edges_csv_filename):
    G = nx.DiGraph()

    with open(nodes_csv_filename, "r") as nodes_csv:
        nodes_reader = csv.reader(nodes_csv)
        next(nodes_reader)  # Skip the header
        for row in nodes_reader:
            node_id, name, body, kind, prop, path = row
            G.add_node(node_id, name=name, body=body, kind=kind, prop=prop, path=path)

    with open(edges_csv_filename, "r") as edges_csv:
        edges_reader = csv.reader(edges_csv)
        next(edges_reader)  # Skip the header
        for row in edges_reader:
            source, target, weight = row
            G.add_edge(source, target, weight=weight)

    return G

graph = load_csv_to_networkx("data/nodes.csv", "data/edges.csv")

In [None]:
assert nx.is_directed_acyclic_graph(graph)

roots = [
  "simulation_cred_sred",
  "simulation_sred_cred",
  "preservation",
  "progress",
  "cred_stack_drop",
  "correction_continuations",
  "correction_small_steps",
  "correctness",
  "cred_deterministic",
  "sred_deterministic"
]

u = set()
name_to_id = {v: k for k, v in dict(graph.nodes(data="name")).items()}
for r in roots:
  u |= nx.algorithms.dag.descendants(graph, name_to_id[r])
  u |= {name_to_id[r]}

# print(len(u))
# print(len(graph.nodes()))
# print(len(set(graph.nodes()) - u))
ns = []
for to_clear in set(graph.nodes()) - u:
  n = graph.nodes()[to_clear]
  if n["kind"] == "construct":
    continue
  if n["name"].endswith("_sind"):
    continue
  if n["name"].endswith("_rec"):
    continue
  if n["name"].endswith("_rect"):
    continue
  if n["name"].endswith("_ind"):
    continue
  if n["path"] == "sequences":
    continue
  ns.append(n)

for n in sorted((n["path"] + "." + n["name"] for n in ns)):
  print(n)
