In [None]:
"""
code_data_network.py  ──  Build & visualise “scripts ↔ datasets” networks
--------------------------------------------------------------------------

• Reads ignore-patterns from  .nwignore   (git-style glob rules)
• Reads dataset metadata   from  dmp.json
• Recognises source files   *.py, *.R, *.m, *.do, *.sas   anywhere inside src/
• Creates a DiGraph with
      – script   nodes        (type='script')
      – dataset  *hub* nodes  (type='ds')      coloured by data_type
      – data-file nodes       (type='file')    same colour / marker as hub
• Edges:
      dataset-hub ──► each data-file in that dataset
      script      ──► data-file   if basename(data-file) is referenced in script
• Legend shows one entry per dataset (shape) *grouped* by colour (=data_type)
"""

from __future__ import annotations

import json
import itertools
import fnmatch
import os
import re
from pathlib import Path
from typing import Dict, List, Tuple

import matplotlib.pyplot as plt
import networkx as nx


# -------------------------------------------------------------------------
# CONSTANTS
# -------------------------------------------------------------------------
SCRIPT_EXTS = {".py", ".r", ".m", ".do", ".sas"}       # case-insensitive
DEFAULT_COLOURS = {                                    # data_type ▸ colour
    "00_raw":      "#64b5f6",   # blue
    "01_interim":  "#ffb74d",   # orange
    "02_processed": "#81c784"   # green
}
SHAPES = ["o", "s", "^", "v", "D", "p", "h", "8"]      # Matplotlib markers


# -------------------------------------------------------------------------
# Ignore-file helpers
# -------------------------------------------------------------------------
def load_ignore_patterns(ignore_path: str | Path) -> List[str]:
    """Return list of glob patterns read from an ignore file."""
    patterns: List[str] = []
    if Path(ignore_path).is_file():
        for line in Path(ignore_path).read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if line and not line.startswith("#"):
                patterns.append(line)
    return patterns


def is_ignored(path: str | Path,
               patterns: List[str],
               base_dir: str | Path) -> bool:
    """True if *path* matches any pattern."""
    rel = Path(path).resolve().relative_to(Path(base_dir).resolve())\
                    .as_posix()
    for pat in patterns:
        # A. literal “component”  (e.g. ".git")
        if all(c not in pat for c in "*?[]/"):
            if pat in rel.split("/"):
                return True
        # B. fnmatch against the entire relative path
        if fnmatch.fnmatch(rel, pat):
            return True
    return False


# -------------------------------------------------------------------------
# Script discovery  (handles nested module folders)
# -------------------------------------------------------------------------
def collect_scripts(src_dir: str | Path,
                    ignore_patterns: List[str],
                    base_dir: str | Path) -> List[str]:

    scripts: List[str] = []

    for root, dirs, files in os.walk(src_dir):
        # prune ignored directories
        dirs[:] = [d for d in dirs
                   if not is_ignored(Path(root, d), ignore_patterns, base_dir)]
        for fn in files:
            full = Path(root, fn)
            if is_ignored(full, ignore_patterns, base_dir):
                continue
            if full.suffix.lower() in SCRIPT_EXTS:
                scripts.append(str(full))

    # order by prefix  s00_*, s01_*, … or folder prefix
    def _prefix(p: str) -> int:
        bn = Path(p).name.lower()
        folder_bn = Path(p).parent.name.lower()
        for cand in (bn, folder_bn):
            if re.match(r"s\d{2}[_\-]", cand):
                return int(cand[1:3])
        return 999

    return sorted(scripts, key=_prefix)


# -------------------------------------------------------------------------
# Dataset loader
# -------------------------------------------------------------------------
def load_datasets(json_path: str | Path) -> List[dict]:
    if not Path(json_path).is_file():
        raise FileNotFoundError(f"No datasets file at {json_path}")
    return json.loads(Path(json_path).read_text(encoding="utf-8"))


# -------------------------------------------------------------------------
# Build graph
# -------------------------------------------------------------------------
def build_script_data_graph(src_dir: str = "src",
                            dataset_json: str = "dmp.json",
                            ignore_file: str = ".nwignore") -> nx.DiGraph:
    """Return a DiGraph where         script ──> data_file  and  ds_hub ──> data_file."""

    base_dir = Path().resolve()
    ignore   = load_ignore_patterns(Path(ignore_file))

    G = nx.DiGraph()

    # 1. scripts ----------------------------------------------------------
    scripts = collect_scripts(src_dir, ignore, base_dir)
    for s in scripts:
        G.add_node(s, type="script")

    # 2. datasets + files -------------------------------------------------
    entries = load_datasets(dataset_json)

    # colour map for data_type  (use default colour if unknown type)
    colour_for: Dict[str, str] = DEFAULT_COLOURS.copy()

    # generate marker cycles *per* data_type
    marker_cycle_by_type: Dict[str, itertools.cycle] = {}

    # helper maps
    ds_to_files: Dict[str, List[str]] = {}
    file_to_ds: Dict[str, str] = {}  # data_file → ds_hub

    for entry in entries:
        d_type   = entry.get("data_type", "unknown")
        ds_name  = entry.get("data_name",  "unnamed")
        marker_cycle_by_type.setdefault(d_type, itertools.cycle(SHAPES))
        marker = next(marker_cycle_by_type[d_type])

        # Create / remember hub node (destination)
        ds_path = Path(entry["destination"]).resolve()
        G.add_node(str(ds_path),
                   label=ds_name,
                   type="ds",
                   colour=colour_for.get(d_type, "#cccccc"),
                   shape=marker,
                   data_type=d_type)
        ds_to_files[str(ds_path)] = []

        # Create file nodes and connect hub → file
        for file_path in entry.get("data_files", []):
            f_path = Path(file_path).resolve()
            if is_ignored(f_path, ignore, base_dir):
                continue
            G.add_node(str(f_path),
                       label=f_path.name,
                       type="file",
                       colour=colour_for.get(d_type, "#cccccc"),
                       shape=marker,
                       data_type=d_type)
            G.add_edge(str(ds_path), str(f_path))
            ds_to_files[str(ds_path)].append(str(f_path))
            file_to_ds[str(f_path)] = str(ds_path)

    # 3. script → file / hub edges  (avoid double-linking same dataset)
    # ------------------------------------------------------------------
    for script in scripts:
        try:
            text = Path(script).read_text(encoding="utf-8", errors="ignore")
        except OSError:
            text = ""

        hit_datasets: set[str] = set()          # hubs reached via files first

        # A) link to individual data-files
        for file_node in file_to_ds:            # iterate over file nodes only
            if Path(file_node).name in text:
                G.add_edge(script, file_node)
                hit_datasets.add(file_to_ds[file_node])   # remember its hub

        # B) link directly to hub only if it wasn’t reached via a file
        for ds_node in ds_to_files:
            if ds_node not in hit_datasets and Path(ds_node).name in text:
                G.add_edge(script, ds_node)

    return G


# -------------------------------------------------------------------------
# Drawing utilities
# -------------------------------------------------------------------------
def plot_script_data_graph(G: nx.DiGraph) -> None:
    """Visualise the graph with scripts (x=0), dataset hubs (x=1), files (x=2)."""

    # --- ordering -------------------------------------------------------
    scripts = sorted([n for n, d in G.nodes(data=True) if d["type"] == "script"])

    ds_hubs  = sorted([n for n, d in G.nodes(data=True) if d["type"] == "ds"])
    files    = sorted([n for n, d in G.nodes(data=True) if d["type"] == "file"])

    # keep files ordered under their datasets (appearance order)
    ordered_files: List[str] = []
    for ds in ds_hubs:
        ordered_files.extend(nx.descendants(G, ds))   # children are files
    # append any stray files not connected to hub (shouldn’t happen)
    ordered_files.extend(f for f in files if f not in ordered_files)

    # --- layout ---------------------------------------------------------
    pos: Dict[str, Tuple[int, int]] = {}
    for i, s in enumerate(scripts):
        pos[s] = (0, -i)
    for i, ds in enumerate(ds_hubs):
        pos[ds] = (1, -i)
    for i, f in enumerate(ordered_files):
        pos[f] = (2, -i)

    # --- draw nodes -----------------------------------------------------
    plt.figure(figsize=(12, max(len(scripts), len(ordered_files))*0.35 + 1))

    # scripts
    nx.draw_networkx_nodes(G, pos,
                           nodelist=scripts,
                           node_color="#9fa8da",
                           node_shape="s",
                           label="Scripts")

    # datasets + file nodes grouped by (colour, shape)
    done: set[Tuple[str, str]] = set()
    for node in ds_hubs + ordered_files:
        colour = G.nodes[node]["colour"]
        shape  = G.nodes[node]["shape"]
        key    = (colour, shape)
        nodes_with_style = [n for n in ds_hubs + ordered_files
                            if (G.nodes[n]["colour"], G.nodes[n]["shape"]) == key]
        if key not in done:
            nx.draw_networkx_nodes(G, pos,
                                   nodelist=nodes_with_style,
                                   node_color=colour,
                                   node_shape=shape,
                                   label=G.nodes[nodes_with_style[0]]["label"])
            done.add(key)

    # --- edges & labels -------------------------------------------------
    nx.draw_networkx_edges(G, pos,
                           arrowstyle="-|>",
                           arrowsize=12)

    labels = {n: G.nodes[n].get("label", Path(n).name) for n in G.nodes}
    nx.draw_networkx_labels(G, pos, labels, font_size=8)

    plt.axis("off")
    plt.legend(title="Datasets (shape per dataset; colour per data_type)",
               scatterpoints=1, fontsize=8)
    plt.tight_layout()
    plt.show()


# -------------------------------------------------------------------------
# CLI helper
# -------------------------------------------------------------------------
def main():
    import argparse

    parser = argparse.ArgumentParser(
        description="Build & plot Script ↔ Dataset network")
    parser.add_argument("--src",  default="src",  help="source code directory")
    parser.add_argument("--data", default="dmp.json",
                        help="dmp.json path")
    parser.add_argument("--ignore", default=".nwignore",
                        help="ignore file with glob patterns")
    args = parser.parse_args()

    G = build_script_data_graph(src_dir=args.src,
                                dataset_json=args.data,
                                ignore_file=args.ignore)

    if G.number_of_edges():
        plot_script_data_graph(G)
    else:
        print("No links found between scripts and datasets.")



In [None]:
  main()