From 03f7cfcf898c532bb3cd4a50a7d3058e5ee94756 Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Sat, 9 May 2026 20:52:05 -0300 Subject: [PATCH] Add structure dependency graph command --- docs/cli-reference.md | 30 +++- docs/mcp-integration.md | 22 +++ structkit/commands/graph.py | 302 ++++++++++++++++++++++++++++++++++++ structkit/main.py | 2 + structkit/mcp_server.py | 62 ++++++++ tests/test_graph_command.py | 119 ++++++++++++++ 6 files changed, 536 insertions(+), 1 deletion(-) create mode 100644 structkit/commands/graph.py create mode 100644 tests/test_graph_command.py diff --git a/docs/cli-reference.md b/docs/cli-reference.md index 510e123..4567045 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -9,7 +9,7 @@ The `struct` CLI allows you to generate project structures from YAML configurati **Basic Usage:** ```sh -structkit {info,validate,generate,vars,list,generate-schema,mcp,completion,init} ... +structkit {info,validate,generate,explain,vars,graph,list,generate-schema,mcp,completion,init} ... ``` ## Global Options @@ -199,6 +199,34 @@ structkit vars ./my-struct.yaml --json structkit vars python-basic --structures-path ~/custom-structures ``` + +### `graph` + +Visualize dependency relationships between structure definitions. The command follows nested structure references declared in `folders[].struct` or `folders[].structkit`, reports missing references, and detects cycles. + +**Usage:** + +```sh +structkit graph [-h] [-l LOG] [-c CONFIG_FILE] [-i LOG_FILE] [-s STRUCTURES_PATH] [--all] [--format {text,json,mermaid}] [structure_definition] +``` + +**Arguments:** + +- `structure_definition`: Built-in structure name, custom structure name, or local YAML file path. Local `.yaml` and `.yml` files can be passed directly, or with `file://`. +- `-s STRUCTURES_PATH, --structures-path STRUCTURES_PATH`: Path to custom structure definitions. Can be set via the `STRUCTKIT_STRUCTURES_PATH` environment variable. +- `--all`: Graph every available built-in and custom structure. +- `--format {text,json,mermaid}`: Output a human-readable tree, machine-readable JSON, or Mermaid flowchart syntax (default: `text`). + +Examples: + +```sh +structkit graph project/python +structkit graph terraform/apps/generic --format mermaid +structkit graph --all --format json +``` + +Mermaid output starts with `graph TD` and can be pasted into Markdown documentation that supports Mermaid diagrams. + ### `list` List available structures. diff --git a/docs/mcp-integration.md b/docs/mcp-integration.md index 693e0b5..1fb949b 100644 --- a/docs/mcp-integration.md +++ b/docs/mcp-integration.md @@ -124,6 +124,28 @@ Lint one or more structure YAML files or structure names for quality and safety - `lint_all` (optional): Lint all bundled contrib structures (default: false). - `output` (optional): Output format - "text" or "json" (default: "text"). + +### 7. graph_structure +Visualize structure dependencies from `folders[].struct` references as text, JSON, or Mermaid. The tool reports nested dependencies, missing references, and cycles. + +```json +{ + "name": "graph_structure", + "arguments": { + "structure_definition": "project/python", + "structures_path": "/path/to/custom/structures", + "graph_all": false, + "output": "mermaid" + } +} +``` + +**Parameters:** +- `structure_definition` (optional): Structure name or local YAML file to graph. Required unless `graph_all` is true. +- `structures_path` (optional): Custom path to structure definitions. +- `graph_all` (optional): Graph all available structures (default: false). +- `output` (optional): Output format - "text", "json", or "mermaid" (default: "text"). + ## Usage ### Starting the MCP Server (FastMCP stdio / http / sse) diff --git a/structkit/commands/graph.py b/structkit/commands/graph.py new file mode 100644 index 0000000..4ec40b5 --- /dev/null +++ b/structkit/commands/graph.py @@ -0,0 +1,302 @@ +import json +import os +import re +from typing import Any, Dict, List, Optional, Set, Tuple + +import yaml + +from structkit.commands import Command +from structkit.completers import structures_completer + + +class GraphCommand(Command): + """Visualize dependency relationships between StructKit structures.""" + + def __init__(self, parser): + super().__init__(parser) + parser.description = "Visualize structure dependencies from folders[].struct references" + structure_arg = parser.add_argument('structure_definition', nargs='?', type=str, help='Structure name or local YAML file to graph') + structure_arg.completer = structures_completer + parser.add_argument( + '-s', + '--structures-path', + type=str, + help='Path to structure definitions (env: STRUCTKIT_STRUCTURES_PATH)', + default=os.getenv('STRUCTKIT_STRUCTURES_PATH', None) + ) + parser.add_argument('--all', action='store_true', help='Graph all available built-in and custom structures') + parser.add_argument( + '--format', + choices=['text', 'json', 'mermaid'], + default='text', + help='Output format (default: text)' + ) + parser.set_defaults(func=self.execute) + + def execute(self, args): + if not args.all and not args.structure_definition: + self.parser.error('provide a structure name, a YAML file, or --all') + if args.all and args.structure_definition: + self.parser.error('provide either a structure name/YAML file or --all, not both') + + graph = self.build_graph( + structure_definition=args.structure_definition, + structures_path=args.structures_path, + all_structures=args.all, + ) + print(self.format_graph(graph, args.format)) + + def build_graph( + self, + structure_definition: Optional[str] = None, + structures_path: Optional[str] = None, + all_structures: bool = False, + ) -> Dict[str, Any]: + roots = self._list_available_structures(structures_path) if all_structures else [structure_definition] + graph = { + 'roots': roots, + 'nodes': [], + 'edges': [], + 'missing': [], + 'cycles': [], + } + state = { + 'node_names': set(), + 'edge_keys': set(), + 'missing_keys': set(), + 'cycle_keys': set(), + } + + for root in roots: + if root: + self._walk(root, structures_path, graph, state, []) + + return graph + + def format_graph(self, graph: Dict[str, Any], output_format: str = 'text') -> str: + if output_format == 'json': + return json.dumps(graph, indent=2) + if output_format == 'mermaid': + return self._format_mermaid(graph) + return self._format_text(graph) + + def _walk( + self, + structure_definition: str, + structures_path: Optional[str], + graph: Dict[str, Any], + state: Dict[str, Set[Any]], + stack: List[str], + ): + node_name = self._display_name(structure_definition) + config, source = self._load_yaml_config(structure_definition, structures_path) + + if node_name not in state['node_names']: + graph['nodes'].append({ + 'name': node_name, + 'source': source, + 'missing': config is None, + }) + state['node_names'].add(node_name) + + if config is None: + return + if not isinstance(config, dict): + return + + if node_name in stack: + self._add_cycle(stack + [node_name], graph, state) + return + + next_stack = stack + [node_name] + for dependency, folder in self._iter_dependencies(config): + dep_name = self._display_name(dependency) + edge_key = (node_name, dep_name, folder) + if edge_key not in state['edge_keys']: + graph['edges'].append({'from': node_name, 'to': dep_name, 'folder': folder}) + state['edge_keys'].add(edge_key) + + if dep_name in next_stack: + self._add_cycle(next_stack + [dep_name], graph, state) + continue + + dep_config, dep_source = self._load_yaml_config(dependency, structures_path) + if dep_config is None: + if dep_name not in state['node_names']: + graph['nodes'].append({'name': dep_name, 'source': dep_source, 'missing': True}) + state['node_names'].add(dep_name) + missing_key = (node_name, dep_name) + if missing_key not in state['missing_keys']: + graph['missing'].append({'from': node_name, 'to': dep_name, 'folder': folder, 'source': dep_source}) + state['missing_keys'].add(missing_key) + continue + + self._walk(dependency, structures_path, graph, state, next_stack) + + def _add_cycle(self, cycle: List[str], graph: Dict[str, Any], state: Dict[str, Set[Any]]): + key = tuple(cycle) + if key not in state['cycle_keys']: + graph['cycles'].append(cycle) + state['cycle_keys'].add(key) + + def _iter_dependencies(self, config: Dict[str, Any]) -> List[Tuple[str, str]]: + dependencies = [] + for item in config.get('folders', []) or []: + if not isinstance(item, dict): + continue + for folder, content in item.items(): + if not isinstance(content, dict): + continue + structs = content.get('struct') or content.get('structkit') + if isinstance(structs, str): + structs = [structs] + if not isinstance(structs, list): + continue + for nested_struct in structs: + if isinstance(nested_struct, str) and nested_struct: + dependencies.append((nested_struct, str(folder))) + return dependencies + + def _load_yaml_config(self, structure_definition: str, structures_path: Optional[str]): + file_path = self._resolve_structure_path(structure_definition, structures_path) + if not file_path or not os.path.exists(file_path): + return None, file_path + try: + with open(file_path, 'r') as f: + return yaml.safe_load(f) or {}, file_path + except (OSError, yaml.YAMLError) as exc: + self.logger.error(f"❗ Failed to load {file_path}: {exc}") + return None, file_path + + def _resolve_structure_path(self, structure_definition: str, structures_path: Optional[str]) -> Optional[str]: + if structure_definition.startswith('file://'): + return structure_definition[7:] + if structure_definition.endswith(('.yaml', '.yml')): + return structure_definition + + this_file = os.path.dirname(os.path.realpath(__file__)) + contribs_path = os.path.join(this_file, '..', 'contribs') + candidates = [] + if structures_path: + candidates.append(os.path.join(structures_path, f'{structure_definition}.yaml')) + candidates.append(os.path.join(structures_path, f'{structure_definition}.yml')) + candidates.append(os.path.join(contribs_path, f'{structure_definition}.yaml')) + candidates.append(os.path.join(contribs_path, f'{structure_definition}.yml')) + for candidate in candidates: + if os.path.exists(candidate): + return candidate + return candidates[0] if candidates else None + + def _list_available_structures(self, structures_path: Optional[str]) -> List[str]: + this_file = os.path.dirname(os.path.realpath(__file__)) + contribs_path = os.path.join(this_file, '..', 'contribs') + paths = [] + if structures_path: + paths.append(structures_path) + paths.append(contribs_path) + + names = set() + for path in paths: + if not path or not os.path.exists(path): + continue + for root, _, files in os.walk(path): + for file_name in files: + if not file_name.endswith(('.yaml', '.yml')): + continue + rel = os.path.relpath(os.path.join(root, file_name), path) + rel = os.path.splitext(rel)[0] + names.add(rel) + return sorted(names) + + def _display_name(self, structure_definition: str) -> str: + if structure_definition.startswith('file://'): + return structure_definition[7:] + return structure_definition + + def _format_text(self, graph: Dict[str, Any]) -> str: + children: Dict[str, List[Dict[str, str]]] = {} + for edge in graph['edges']: + children.setdefault(edge['from'], []).append(edge) + + lines = ['Dependency graph:'] + for root in graph['roots']: + if not root: + continue + root_name = self._display_name(root) + lines.extend(self._format_text_node(root_name, children, set(), '')) + + if graph['missing']: + lines.append('') + lines.append('Missing references:') + for missing in graph['missing']: + lines.append(f" - {missing['from']} -> {missing['to']} (folder: {missing['folder']})") + + if graph['cycles']: + lines.append('') + lines.append('Cycles:') + for cycle in graph['cycles']: + lines.append(f" - {' -> '.join(cycle)}") + + if not graph['missing'] and not graph['cycles']: + lines.append('') + lines.append('No missing references or cycles detected.') + + return '\n'.join(lines) + + def _format_text_node(self, node: str, children: Dict[str, List[Dict[str, str]]], stack: Set[str], prefix: str) -> List[str]: + lines = [node] if not prefix else [f'{prefix}{node}'] + if node in stack: + lines[-1] += ' (cycle)' + return lines + + next_stack = set(stack) + next_stack.add(node) + edges = children.get(node, []) + for index, edge in enumerate(edges): + is_last = index == len(edges) - 1 + lines.extend(self._format_text_child(edge['to'], children, next_stack, '', is_last)) + return lines + + def _format_text_child( + self, + node: str, + children: Dict[str, List[Dict[str, str]]], + stack: Set[str], + prefix: str, + is_last: bool, + ) -> List[str]: + connector = '└── ' if is_last else '├── ' + line = f'{prefix}{connector}{node}' + if node in stack: + return [line + ' (cycle)'] + + next_stack = set(stack) + next_stack.add(node) + edges = children.get(node, []) + lines = [line] + child_prefix = prefix + (' ' if is_last else '│ ') + for index, edge in enumerate(edges): + lines.extend(self._format_text_child(edge['to'], children, next_stack, child_prefix, index == len(edges) - 1)) + return lines + + def _format_mermaid(self, graph: Dict[str, Any]) -> str: + node_ids = {node['name']: f'n{index}' for index, node in enumerate(graph['nodes'])} + lines = ['graph TD'] + if not graph['nodes']: + return '\n'.join(lines) + + for node in graph['nodes']: + label = self._escape_mermaid_label(node['name']) + lines.append(f' {node_ids[node["name"]]}["{label}"]') + + for edge in graph['edges']: + lines.append(f' {node_ids[edge["from"]]} --> {node_ids[edge["to"]]}') + + missing_nodes = [node_ids[node['name']] for node in graph['nodes'] if node.get('missing')] + if missing_nodes: + lines.append(' classDef missing fill:#ffe6e6,stroke:#cc0000,color:#660000') + lines.append(f' class {",".join(missing_nodes)} missing') + return '\n'.join(lines) + + def _escape_mermaid_label(self, value: str) -> str: + return re.sub(r'"', r'\\"', value) diff --git a/structkit/main.py b/structkit/main.py index cc4cd09..f272ff8 100644 --- a/structkit/main.py +++ b/structkit/main.py @@ -11,6 +11,7 @@ from structkit.commands.lint import LintCommand from structkit.commands.list import ListCommand from structkit.commands.search import SearchCommand +from structkit.commands.graph import GraphCommand from structkit.commands.generate_schema import GenerateSchemaCommand from structkit.commands.mcp import MCPCommand from structkit.logging_config import configure_logging @@ -42,6 +43,7 @@ def get_parser(): ExplainCommand(subparsers.add_parser('explain', help='Explain structure resolution without generating files')) ListCommand(subparsers.add_parser('list', help='List available structures')) SearchCommand(subparsers.add_parser('search', help='Search available structures by keyword')) + GraphCommand(subparsers.add_parser('graph', help='Visualize structure dependencies')) GenerateSchemaCommand(subparsers.add_parser('generate-schema', help='Generate JSON schema for available structures')) MCPCommand(subparsers.add_parser('mcp', help='MCP (Model Context Protocol) support')) diff --git a/structkit/mcp_server.py b/structkit/mcp_server.py index bc62804..2f5c905 100644 --- a/structkit/mcp_server.py +++ b/structkit/mcp_server.py @@ -9,6 +9,7 @@ 5. Inspecting structure variables 6. Explaining structure resolution without generating files 7. Linting structure definitions for quality and safety issues +8. Visualizing structure dependency graphs """ import asyncio import logging @@ -24,6 +25,7 @@ from structkit.commands.lint import LintCommand from structkit.commands.vars import VarsCommand from structkit.commands.explain import ExplainCommand +from structkit.commands.graph import GraphCommand from structkit import __version__ @@ -268,6 +270,26 @@ def _get_structure_vars_logic( finally: sys.stdout = old + def _graph_structure_logic( + self, + structure_definition: Optional[str] = None, + structures_path: Optional[str] = None, + graph_all: bool = False, + output: str = "text", + ) -> str: + if not graph_all and not structure_definition: + return "Error: structure_definition is required unless graph_all is true" + + import argparse + dummy_parser = argparse.ArgumentParser() + graph_command = GraphCommand(dummy_parser) + graph = graph_command.build_graph( + structure_definition=structure_definition, + structures_path=structures_path, + all_structures=graph_all, + ) + return graph_command.format_graph(graph, output) + def _lint_structure_logic( self, @@ -399,6 +421,27 @@ async def generate_structure( return result + @self.app.tool(name="graph_structure", description="Visualize folders[].struct dependencies as text, JSON, or Mermaid") + async def graph_structure( + structure_definition: Optional[str] = None, + structures_path: Optional[str] = None, + graph_all: bool = False, + output: str = "text", + ) -> str: + self.logger.debug( + "MCP request: graph_structure args=%s", + { + "structure_definition": structure_definition, + "structures_path": structures_path, + "graph_all": graph_all, + "output": output, + }, + ) + result = self._graph_structure_logic(structure_definition, structures_path, graph_all, output) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: graph_structure len={len(result)} preview=\n{preview}") + return result + @self.app.tool(name="lint_structure", description="Lint structure YAML files for quality and safety issues") async def lint_structure( targets: Optional[List[str]] = None, @@ -550,6 +593,25 @@ def __init__(self, content): return MockResult([MockContent(result_text)]) + async def _handle_graph_structure(self, params: Dict[str, Any]): + """Compatibility method for tests that expect MCP-style responses.""" + result_text = self._graph_structure_logic( + params.get('structure_definition'), + params.get('structures_path'), + params.get('graph_all', False), + params.get('output', 'text'), + ) + + class MockContent: + def __init__(self, text): + self.text = text + + class MockResult: + def __init__(self, content): + self.content = content + + return MockResult([MockContent(result_text)]) + async def _handle_lint_structure(self, params: Dict[str, Any]): """Compatibility method for tests that expect MCP-style responses.""" result_text = self._lint_structure_logic( diff --git a/tests/test_graph_command.py b/tests/test_graph_command.py new file mode 100644 index 0000000..44d25b5 --- /dev/null +++ b/tests/test_graph_command.py @@ -0,0 +1,119 @@ +import argparse +import asyncio +import json +import os + +import yaml + +from structkit.commands.graph import GraphCommand +from structkit.mcp_server import StructMCPServer + + +def write_structure(base, name, folders=None): + path = base / f"{name}.yaml" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump({'folders': folders or []})) + return path + + +def build_command(): + parser = argparse.ArgumentParser() + return GraphCommand(parser) + + +def test_graph_single_dependency(tmp_path): + write_structure(tmp_path, 'app', [{'lib': {'struct': 'library'}}]) + write_structure(tmp_path, 'library') + + graph = build_command().build_graph('app', structures_path=str(tmp_path)) + + assert graph['roots'] == ['app'] + assert {'from': 'app', 'to': 'library', 'folder': 'lib'} in graph['edges'] + assert graph['missing'] == [] + assert graph['cycles'] == [] + + +def test_graph_multiple_dependencies(tmp_path): + write_structure(tmp_path, 'app', [{'deps': {'struct': ['api', 'web']}}]) + write_structure(tmp_path, 'api') + write_structure(tmp_path, 'web') + + graph = build_command().build_graph('app', structures_path=str(tmp_path)) + + assert {'from': 'app', 'to': 'api', 'folder': 'deps'} in graph['edges'] + assert {'from': 'app', 'to': 'web', 'folder': 'deps'} in graph['edges'] + + +def test_graph_nested_dependencies(tmp_path): + write_structure(tmp_path, 'app', [{'service': {'struct': 'api'}}]) + write_structure(tmp_path, 'api', [{'database': {'struct': 'db'}}]) + write_structure(tmp_path, 'db') + + graph = build_command().build_graph('app', structures_path=str(tmp_path)) + + assert {'from': 'app', 'to': 'api', 'folder': 'service'} in graph['edges'] + assert {'from': 'api', 'to': 'db', 'folder': 'database'} in graph['edges'] + + +def test_graph_missing_dependency(tmp_path): + write_structure(tmp_path, 'app', [{'missing': {'struct': 'does-not-exist'}}]) + + graph = build_command().build_graph('app', structures_path=str(tmp_path)) + + assert graph['missing'][0]['from'] == 'app' + assert graph['missing'][0]['to'] == 'does-not-exist' + assert any(node['name'] == 'does-not-exist' and node['missing'] for node in graph['nodes']) + + +def test_graph_cycle_detection(tmp_path): + write_structure(tmp_path, 'a', [{'b-folder': {'struct': 'b'}}]) + write_structure(tmp_path, 'b', [{'a-folder': {'struct': 'a'}}]) + + graph = build_command().build_graph('a', structures_path=str(tmp_path)) + + assert ['a', 'b', 'a'] in graph['cycles'] + + +def test_graph_local_yaml_json_and_mermaid(tmp_path): + root = tmp_path / 'root.yaml' + root.write_text(yaml.safe_dump({'folders': [{'child': {'struct': 'leaf'}}]})) + write_structure(tmp_path, 'leaf') + + command = build_command() + graph = command.build_graph(str(root), structures_path=str(tmp_path)) + json_text = command.format_graph(graph, 'json') + mermaid = command.format_graph(graph, 'mermaid') + + data = json.loads(json_text) + assert data['roots'] == [str(root)] + assert {'from': str(root), 'to': 'leaf', 'folder': 'child'} in data['edges'] + assert mermaid.startswith('graph TD') + assert '-->' in mermaid + + +def test_graph_all_lists_available_structures(tmp_path): + write_structure(tmp_path, 'one', [{'two-folder': {'struct': 'two'}}]) + write_structure(tmp_path, 'two') + + graph = build_command().build_graph(structures_path=str(tmp_path), all_structures=True) + + assert 'one' in graph['roots'] + assert 'two' in graph['roots'] + assert {'from': 'one', 'to': 'two', 'folder': 'two-folder'} in graph['edges'] + + +def test_mcp_graph_structure_logic_and_handler(tmp_path): + write_structure(tmp_path, 'app', [{'lib': {'struct': 'library'}}]) + write_structure(tmp_path, 'library') + server = StructMCPServer() + + json_text = server._graph_structure_logic('app', structures_path=str(tmp_path), output='json') + data = json.loads(json_text) + assert {'from': 'app', 'to': 'library', 'folder': 'lib'} in data['edges'] + + result = asyncio.run(server._handle_graph_structure({ + 'structure_definition': 'app', + 'structures_path': str(tmp_path), + 'output': 'mermaid', + })) + assert result.content[0].text.startswith('graph TD')