In [3]:
import os
os.chdir("/home/david/Documents/projects/app-local-rag-repo/")

In [4]:
import ast
import os
from typing import Dict, List, Optional, Set
from pathlib import Path

# Node Definitions

In [5]:
from typing import List, Optional, Set

class Node:
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None):
        self.name = name.strip()
        self.path = path.strip()  # Full relative path (e.g., "folder1/folder2/script.py")
        self.parent = parent      # Parent node in the hierarchy
        self.children: List[Node] = []

    def add_child(self, node: 'Node'):
        if node not in self.children:
            self.children.append(node)
            node.parent = self  # Set parent when adding child

    def __repr__(self):
        return f"{self.__class__.__name__}({self.name}, {self.path})"


class FolderNode(Node):
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None):
        super().__init__(name, path, parent)

    def __setattr__(self, key, value):
        if isinstance(value, Node):
            self.__dict__[key] = value
        else:
            super().__setattr__(key, value)

    def add_child(self, node: 'Node'):
        super().add_child(node)
        # Dynamically add an attribute for easy access
        if isinstance(node, FolderNode) or isinstance(node, ScriptNode):
            setattr(self, node.name, node)


class ScriptNode(Node):
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None):
        super().__init__(name, path, parent)
        # New fields for tracking dependencies/aliases
        self.script_dependencies: List['ScriptNode'] = []
        self.class_dependencies: List['ClassNode'] = []
        self.function_dependencies: List['FunctionNode'] = []
        # Map alias -> fully qualified node path (e.g. "CA" -> "folder/script1.py::classA")
        self.aliases: Dict[str, str] = {}

class FunctionNode(Node):
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None):
        super().__init__(name, path, parent)

class ClassNode(Node):
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None):
        super().__init__(name, path, parent)
        self.aliases: Set[str] = set()  # Aliases from imports (e.g., "ClassA" for "classAmpere")

class MethodNode(Node):
    def __init__(self, name: str, path: str, parent: Optional['Node'] = None, is_static: bool = False):
        super().__init__(name, path, parent)
        self.is_static = is_static
        self.dependencies: List[Node] = []  # Method call dependencies

    def add_dep(self, node: 'Node'):
        if node not in self.dependencies:
            self.dependencies.append(node)

# Folder

In [6]:
class FolderScriptBuilder:
    """Builds folder and script nodes from the filesystem, with global_path as root."""
    def __init__(self, global_path: str):
        self.global_path = global_path

    def build(self) -> Node:
        root = FolderNode("root", ".", None)  # Root node represents global_path
        # Set the root name to the last part of the global path
        global_path = Path(self.global_path)
        root.name = global_path.parts[-1]

        nodes: Dict[str, Node] = {"root": root}  # Path "" maps to root

        for root_dir, _, filenames in os.walk(self.global_path):
            rel_root = os.path.relpath(root_dir, self.global_path)
            parent = root

            if rel_root != ".":
                parts = rel_root.split("/")
                for i, part in enumerate(parts):
                    folder_path = "/".join(parts[:i + 1])
                    if folder_path not in nodes:
                        folder_node = FolderNode(part, folder_path)
                        nodes[folder_path] = folder_node
                        parent.add_child(folder_node)
                    parent = nodes[folder_path]

            for fname in filenames:
                if fname.endswith(".py"):
                    rel_path = os.path.join(rel_root, fname) if rel_root != "." else fname
                    script_node = ScriptNode(fname, rel_path)
                    nodes[rel_path] = script_node
                    parent.add_child(script_node)

        return root, nodes

In [7]:
# global_path = "/home/david/Documents/glovo/machine-learning-platform/widget_framework"
global_path = "/home/david/Documents/projects/app-local-rag-repo/dummy-folder"

In [8]:
fsb = FolderScriptBuilder(global_path)
root, nodes = fsb.build()

In [9]:
nodes

{'root': FolderNode(dummy-folder, .),
 'script2.py': ScriptNode(script2.py, script2.py),
 'script1.py': ScriptNode(script1.py, script1.py),
 'folder2': FolderNode(folder2, folder2),
 'folder1': FolderNode(folder1, folder1),
 'folder1/folder1_1': FolderNode(folder1_1, folder1/folder1_1),
 'folder1/folder1_1/script1_1A.py': ScriptNode(script1_1A.py, folder1/folder1_1/script1_1A.py),
 'folder1/folder1_1/script1_1B.py': ScriptNode(script1_1B.py, folder1/folder1_1/script1_1B.py),
 'folder1/folder1_2': FolderNode(folder1_2, folder1/folder1_2)}

In [10]:
root.folder1.folder1_1.children

[ScriptNode(script1_1A.py, folder1/folder1_1/script1_1A.py),
 ScriptNode(script1_1B.py, folder1/folder1_1/script1_1B.py)]

# Class Mapper

In [11]:
class ScriptAnalyzer:
    def __init__(self, nodes: Dict[str, Node], global_path: str, query_folder: Optional[str] = None):
        self.nodes = nodes
        self.global_path = Path(global_path)
        self.query_folder = self.global_path / query_folder if query_folder else self.global_path
        self.ast_cache = {}
        self._build_ast()

    def _build_ast(self):
        for path, node in list(self.nodes.items()):
            if isinstance(node, ScriptNode):
                full_path = self.global_path / node.path
                if full_path.exists() and full_path.is_relative_to(self.query_folder):
                    with open(full_path, "r") as file:
                        self.ast_cache[path] = ast.parse(file.read())

    def analyze(self):
        script_nodes = list(self.nodes.items())
        for path, node in script_nodes:
            if isinstance(node, ScriptNode) and path in self.ast_cache:
                tree = self.ast_cache[path]
                self._process_script(node, tree)

    def _process_script(self, script_node: ScriptNode, tree: ast.Module):
        for ast_node in tree.body:
            if isinstance(ast_node, ast.ClassDef):
                self._process_class(script_node, ast_node)
            elif isinstance(ast_node, ast.FunctionDef):
                self._process_function(script_node, ast_node)

    def _process_class(self, script_node: ScriptNode, ast_node: ast.ClassDef):
        class_path = f"{script_node.path}::{ast_node.name}"
        class_node = ClassNode(ast_node.name, class_path, script_node)
        self.nodes[class_path] = class_node
        script_node.add_child(class_node)

        for method in ast_node.body:
            if isinstance(method, ast.FunctionDef):
                method_path = f"{class_path}::{method.name}"
                method_node = MethodNode(method.name, method_path, class_node)
                self.nodes[method_path] = method_node
                class_node.add_child(method_node)

    def _process_function(self, script_node: ScriptNode, ast_node: ast.FunctionDef):
        function_path = f"{script_node.path}::{ast_node.name}"
        function_node = FunctionNode(ast_node.name, function_path, script_node)
        self.nodes[function_path] = function_node
        script_node.add_child(function_node)

In [12]:
global_path = "/home/david/Documents/projects/app-local-rag-repo/dummy-folder"
fsb = FolderScriptBuilder(global_path)
root, nodes = fsb.build()
script_analyzer = ScriptAnalyzer(
    nodes=nodes, 
    global_path=global_path
)
script_analyzer.analyze()

In [13]:
root

FolderNode(dummy-folder, .)

In [14]:
root.folder1.folder1_1.children[1].children

[FunctionNode(indep_fun_11B, folder1/folder1_1/script1_1B.py::indep_fun_11B),
 ClassNode(Class11B, folder1/folder1_1/script1_1B.py::Class11B)]

In [15]:
class11aa = root.folder1.folder1_1.children[0].children[0]
classa2 =   root.folder1.folder1_1.children[0].children[1]

In [16]:
class11aa.path

'folder1/folder1_1/script1_1A.py::Class11AA'

In [17]:
class11aa.children

[MethodNode(__init__, folder1/folder1_1/script1_1A.py::Class11AA::__init__),
 MethodNode(method1, folder1/folder1_1/script1_1A.py::Class11AA::method1),
 MethodNode(method2, folder1/folder1_1/script1_1A.py::Class11AA::method2)]

In [18]:
classa2.path

'folder1/folder1_1/script1_1A.py::ClassA2'

# Import Analyzer

In [19]:
class ImportAnalyzer:
    def __init__(self, global_path: str, nodes: Dict[str, Node], ast_cache: Dict[str, ast.Module], query_folder: Optional[str] = None):
        self.global_path = Path(global_path)
        self.nodes = nodes
        self.ast_cache = ast_cache
        self.query_folder = self.global_path / query_folder if query_folder else self.global_path

    def analyze(self, script_path: Optional[str] = None):
        if script_path:
            script_node = self.nodes.get(script_path)
            if isinstance(script_node, ScriptNode) and script_path in self.ast_cache:
                tree = self.ast_cache[script_path]
                self._process_imports(script_node, tree)
        else:
            for path, node in self.nodes.items():
                full_path = self.global_path / path
                if isinstance(node, ScriptNode) and path in self.ast_cache and full_path.is_relative_to(self.query_folder):
                    tree = self.ast_cache[path]
                    self._process_imports(node, tree)


    def _process_imports(self, script_node: ScriptNode, tree: ast.Module):
        for stmt in tree.body:
            if isinstance(stmt, ast.Import):
                self._handle_import(script_node, stmt)
            elif isinstance(stmt, ast.ImportFrom):
                self._handle_import_from(script_node, stmt)

    def _handle_import(self, script_node: ScriptNode, stmt: ast.Import):
        """
        Example:
            import script2
            import script2 as s2
            import script1, script2 as s2
        """
        for alias in stmt.names:
            module_name = alias.name
            as_name = alias.asname or module_name

            script_path = self._find_script_path(module_name, base_path=script_node.path)
            if script_path and script_path in self.nodes and isinstance(self.nodes[script_path], ScriptNode):
                target_node: ScriptNode = self.nodes[script_path]
                
                # Ensure no duplicate dependencies using a set for paths
                if target_node not in set(script_node.script_dependencies):
                    script_node.script_dependencies.append(target_node)
                    script_node.aliases[as_name] = script_path

    def _handle_import_from(self, script_node: ScriptNode, stmt: ast.ImportFrom):
        """
        Example:
            from script1 import classA, functionB as fB
            from . import something
            from .script2 import SomeClass
        """
        module_name = stmt.module or ""
        level = stmt.level
        from_script_path = self._find_script_path(module_name, level, script_node.path)

        if not from_script_path or from_script_path not in self.nodes:
            return

        from_script_node = self.nodes[from_script_path]
        if not isinstance(from_script_node, ScriptNode):
            return

        for alias in stmt.names:
            imported_name = alias.name
            as_name = alias.asname or imported_name

            fq_path = f"{from_script_path}::{imported_name}"
            if fq_path in self.nodes:
                # If it's a ClassNode
                if isinstance(self.nodes[fq_path], ClassNode):
                    class_node = self.nodes[fq_path]
                    script_node.class_dependencies.append(class_node)
                    script_node.aliases[as_name] = fq_path
                # If it's a FunctionNode
                elif isinstance(self.nodes[fq_path], FunctionNode):
                    func_node = self.nodes[fq_path]
                    script_node.function_dependencies.append(func_node)
                    script_node.aliases[as_name] = fq_path
            else:
                # If no symbol match is found, treat as entire script import
                if from_script_node not in set(script_node.script_dependencies):
                    script_node.script_dependencies.append(from_script_node)
                    # script_node.aliases[as_name] = from_script_path

    def _find_script_path(self, module_name: str, level: int = 0, base_path: str = "") -> Optional[str]:
        if not module_name and level == 0:
            return None

        # Convert module name using dot notation to a relative script path
        relative_path = Path(module_name.replace(".", "/")).with_suffix(".py")
        
        # If relative path exists in nodes, return it
        return str(relative_path) if str(relative_path) in self.nodes else None

In [114]:
# global_path = "/home/david/Documents/projects/app-local-rag-repo/dummy-folder"
global_path = "/home/david/Documents/glovo/machine-learning-platform/"
query_folder = "widget_framework"

# 0) Build the folder and script nodes
fsb = FolderScriptBuilder(global_path)
root, nodes = fsb.build()

# 1) Build Script/Class/Function/Method nodes
script_analyzer = ScriptAnalyzer(
    nodes=nodes, 
    global_path=global_path,
    query_folder=query_folder
)
script_analyzer.analyze()

# 2) Build the import relationships
import_analyzer = ImportAnalyzer(
    global_path=global_path, 
    nodes=nodes, 
    ast_cache=script_analyzer.ast_cache,
    query_folder=query_folder
)
import_analyzer.analyze("widget_framework/api.py")

In [115]:
vars(nodes["widget_framework/api.py"])

{'name': 'api.py',
 'path': 'widget_framework/api.py',
 'parent': FolderNode(widget_framework, widget_framework),
 'children': [FunctionNode(inference, widget_framework/api.py::inference),
  FunctionNode(production_app, widget_framework/api.py::production_app),
  FunctionNode(local_app, widget_framework/api.py::local_app)],
 'script_dependencies': [ScriptNode(constants.py, widget_framework/constants.py)],
 'class_dependencies': [ClassNode(ContractInput, widget_framework/src/utils.py::ContractInput),
  ClassNode(Utils, widget_framework/src/utils.py::Utils),
  ClassNode(WidgetBuilder, widget_framework/src/widget_builder.py::WidgetBuilder)],
 'function_dependencies': [],
 'aliases': {'ContractInput': 'widget_framework/src/utils.py::ContractInput',
  'Utils': 'widget_framework/src/utils.py::Utils',
  'WidgetBuilder': 'widget_framework/src/widget_builder.py::WidgetBuilder'}}

In [22]:
vars(nodes["widget_framework/api.py"])["class_dependencies"]

[ClassNode(ContractInput, widget_framework/src/utils.py::ContractInput),
 ClassNode(Utils, widget_framework/src/utils.py::Utils),
 ClassNode(WidgetBuilder, widget_framework/src/widget_builder.py::WidgetBuilder)]

In [23]:
script_api = root.widget_framework.children[4]
fun_infe, fun_app, fun_local = script_api.children

In [24]:
vars(script_api)

{'name': 'api.py',
 'path': 'widget_framework/api.py',
 'parent': FolderNode(widget_framework, widget_framework),
 'children': [FunctionNode(inference, widget_framework/api.py::inference),
  FunctionNode(production_app, widget_framework/api.py::production_app),
  FunctionNode(local_app, widget_framework/api.py::local_app)],
 'script_dependencies': [ScriptNode(constants.py, widget_framework/constants.py)],
 'class_dependencies': [ClassNode(ContractInput, widget_framework/src/utils.py::ContractInput),
  ClassNode(Utils, widget_framework/src/utils.py::Utils),
  ClassNode(WidgetBuilder, widget_framework/src/widget_builder.py::WidgetBuilder)],
 'function_dependencies': [],
 'aliases': {'ContractInput': 'widget_framework/src/utils.py::ContractInput',
  'Utils': 'widget_framework/src/utils.py::Utils',
  'WidgetBuilder': 'widget_framework/src/widget_builder.py::WidgetBuilder'}}

In [40]:
script_api.class_dependencies

[ClassNode(ContractInput, widget_framework/src/utils.py::ContractInput),
 ClassNode(Utils, widget_framework/src/utils.py::Utils),
 ClassNode(WidgetBuilder, widget_framework/src/widget_builder.py::WidgetBuilder)]

In [43]:
classnode_contract = script_api.class_dependencies[0]
classnode_contract

ClassNode(ContractInput, widget_framework/src/utils.py::ContractInput)

In [44]:
classnode_contract.children

[MethodNode(ensure_experiment_string_is_non_empty, widget_framework/src/utils.py::ContractInput::ensure_experiment_string_is_non_empty),
 MethodNode(convert_keys_to_int, widget_framework/src/utils.py::ContractInput::convert_keys_to_int),
 MethodNode(set_flag_allowlist_provided, widget_framework/src/utils.py::ContractInput::set_flag_allowlist_provided)]

In [47]:
classnode_contract.children[0]

MethodNode(ensure_experiment_string_is_non_empty, widget_framework/src/utils.py::ContractInput::ensure_experiment_string_is_non_empty)

In [143]:
[xx for xx in nodes if xx.startswith("widget_framework")]

['widget_framework',
 'widget_framework/contract_negotiation.py',
 'widget_framework/data_loader.py',
 'widget_framework/precomputed_setup.py',
 'widget_framework/constants.py',
 'widget_framework/api.py',
 'widget_framework/project_types.py',
 'widget_framework/mapper.py',
 'widget_framework/__init__.py',
 'widget_framework/deployment',
 'widget_framework/deployment/kubernetes',
 'widget_framework/deployment/kubernetes/base',
 'widget_framework/deployment/kubernetes/stage',
 'widget_framework/deployment/kubernetes/stage/resources',
 'widget_framework/deployment/kubernetes/prod',
 'widget_framework/deployment/kubernetes/prod/resources',
 'widget_framework/notebooks',
 'widget_framework/notebooks/debug',
 'widget_framework/notebooks/debug/utils_debug.py',
 'widget_framework/notebooks/debug/api',
 'widget_framework/notebooks/debug/api/archive',
 'widget_framework/notebooks/debug/widget_builder',
 'widget_framework/notebooks/debug/data',
 'widget_framework/notebooks/debug/competing_carous

In [146]:
nodes["widget_framework/src/utils.py::Utils::read_avro_contract"]

MethodNode(read_avro_contract, widget_framework/src/utils.py::Utils::read_avro_contract)

In [150]:
script_wb = nodes["widget_framework/src/widget_builder.py"]
vars(script_wb)

{'name': 'widget_builder.py',
 'path': 'widget_framework/src/widget_builder.py',
 'parent': FolderNode(src, widget_framework/src),
 'children': [ClassNode(WidgetBuilder, widget_framework/src/widget_builder.py::WidgetBuilder)],
 'script_dependencies': [],
 'class_dependencies': [],
 'function_dependencies': [],
 'aliases': {}}

In [152]:
class_wb = script_wb.children[0]
vars(class_wb)

{'name': 'WidgetBuilder',
 'path': 'widget_framework/src/widget_builder.py::WidgetBuilder',
 'parent': ScriptNode(widget_builder.py, widget_framework/src/widget_builder.py),
 'children': [MethodNode(__init__, widget_framework/src/widget_builder.py::WidgetBuilder::__init__),
  MethodNode(preprocess_input, widget_framework/src/widget_builder.py::WidgetBuilder::preprocess_input),
  MethodNode(postprocess_output, widget_framework/src/widget_builder.py::WidgetBuilder::postprocess_output),
  MethodNode(run, widget_framework/src/widget_builder.py::WidgetBuilder::run),
  MethodNode(pretty_output, widget_framework/src/widget_builder.py::WidgetBuilder::pretty_output)],
 'aliases': set()}

# YAML tree Dep

In [160]:
import yaml

def node_to_structure(node):
    # For a FolderNode, build a dict keyed by child name.
    if isinstance(node, FolderNode):
        result = {}
        for child in node.children:
            child_struct = node_to_structure(child)
            if child_struct is not None:
                result[child.name] = child_struct
        return result if result else None

    # For a ScriptNode, consider its children.
    # For classes, use the class name as key and its methods as list.
    # For top-level functions, collect them into a "functions" list.
    elif isinstance(node, ScriptNode):
        result = {}
        functions = []
        for child in node.children:
            if isinstance(child, ClassNode):
                child_struct = node_to_structure(child)
                if child_struct is not None:
                    result[child.name] = child_struct
            elif isinstance(child, FunctionNode):
                functions.append(child.name)
        if functions:
            result["functions"] = functions
        return result if result else None

    # For a ClassNode, return a list of its method names.
    elif isinstance(node, ClassNode):
        methods = [child.name for child in node.children if isinstance(child, MethodNode)]
        return methods if methods else None

    # We don't expect to call this on leaf FunctionNodes or MethodNodes.
    return None

# Assuming 'root' is the root node built by FolderScriptBuilder
structure = {root.name: node_to_structure(root)}
yaml_output = yaml.dump(structure, sort_keys=False)

# Write to a YAML file
output_path = "file-structure.yaml"
with open(output_path, "w") as file:
    file.write(yaml_output)


In [164]:
nodes["widget_framework/src/widget_builder.py"].children[0].children

[MethodNode(__init__, widget_framework/src/widget_builder.py::WidgetBuilder::__init__),
 MethodNode(preprocess_input, widget_framework/src/widget_builder.py::WidgetBuilder::preprocess_input),
 MethodNode(postprocess_output, widget_framework/src/widget_builder.py::WidgetBuilder::postprocess_output),
 MethodNode(run, widget_framework/src/widget_builder.py::WidgetBuilder::run),
 MethodNode(pretty_output, widget_framework/src/widget_builder.py::WidgetBuilder::pretty_output)]

In [None]:
structure

In [158]:
tree_dict['machine-learning-platform']["widget_framework"]

{'contract_negotiation.py': {'ContractNegotiation': {'__init__': {},
   'create_raw_input_data_from_contract_input': {},
   'create_contract_output_from_output_data': {},
   'create_contract_input_from_raw_input_data': {}}},
 'data_loader.py': {'WidgetFrameworkDataLoader': {'__init__': {},
   'load_training_raw_features': {},
   'load_prediction_raw_features': {}}},
 'precomputed_setup.py': {'parse_input': {}},
 'constants.py': {},
 'api.py': {'inference': {'__init__': {}},
  'production_app': {},
  'local_app': {}},
 'project_types.py': {},
 'mapper.py': {'WidgetFrameworkMapper': {'map_request_to_keys': {},
   'map_values_to_response': {},
   'map_raw_input_data_to_keys': {}}},
 '__init__.py': {},
 'deployment': {'kubernetes': {'base': {},
   'stage': {'resources': {}},
   'prod': {'resources': {}}}},
 'notebooks': {'debug': {'utils_debug.py': {'DebugUtils': {'ping': {},
     'post': {},
     'pretty': {},
     'pretty_full': {},
     'pretty_exclude': {},
     'save_pickle': {},
    

In [177]:
import ast
import os
from collections import defaultdict

class InnerDependencyAnalyzer(ast.NodeVisitor):
    def __init__(self, module_path, nodes):
        """
        :param module_path: A string like "widget_framework.src.widget_builder"
        :param nodes: Dictionary of file nodes (from your FolderScriptBuilder)
        """
        self.module_path = module_path
        self.nodes = nodes
        # Map alias -> full import string (e.g. "Utils" -> "widget_framework.src.utils::Utils")
        self.imports = {}
        # Dependency tree: key = fully qualified function/method name, 
        # value = list of tuples (lineno, dependency string)
        self.deps = defaultdict(list)
        self.current_class = None    # current class name, if inside one
        self.current_function = None # fully qualified current function (or method)
        self.var_mapping = {}        # mapping from variable names to dependency strings

    def visit_Import(self, node):
        for alias in node.names:
            asname = alias.asname or alias.name
            self.imports[asname] = alias.name
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        module = node.module  # e.g., "widget_framework.src.utils"
        for alias in node.names:
            asname = alias.asname or alias.name
            full = f"{module}::{alias.name}"
            self.imports[asname] = full
        self.generic_visit(node)

    def visit_ClassDef(self, node):
        prev_class = self.current_class
        self.current_class = node.name
        self.generic_visit(node)
        self.current_class = prev_class

    def visit_FunctionDef(self, node):
        # Save the current variable mapping for this function scope.
        old_var_mapping = self.var_mapping.copy()
        if self.current_class:
            func_qualified = f"{self.module_path}::{self.current_class}::{node.name}"
        else:
            func_qualified = f"{self.module_path}::{node.name}"
        prev_function = self.current_function
        self.current_function = func_qualified
        self.generic_visit(node)
        self.current_function = prev_function
        self.var_mapping = old_var_mapping

    def visit_Assign(self, node):
        # Capture variable assignments from constructor calls.
        if isinstance(node.value, ast.Call):
            dep = self._resolve_call(node.value)
            if dep:
                for target in node.targets:
                    if isinstance(target, ast.Name):
                        self.var_mapping[target.id] = dep
        self.generic_visit(node)

    def visit_Call(self, node):
        dep = self._resolve_call(node)
        if dep and self.current_function:
            if self._dependency_exists(dep):
                if not any(existing_dep == dep for _, existing_dep in self.deps[self.current_function]):
                    self.deps[self.current_function].append((node.lineno, dep))
        self.generic_visit(node)

    def _resolve_call(self, node):
        """
        Resolve a call expression into a dependency string.
        Returns strings in forms such as:
          - For self.method: "<module_path>::<current_class>::<method>"
          - For imported alias call: "<import_module>::<ImportedClass/Module>::<attribute>"
          - For local function: "<module_path>::<function>"
          - For variable method calls: "<mapped_dependency>::<method>"
        """
        if isinstance(node.func, ast.Attribute):
            attr_chain = []
            curr = node.func
            while isinstance(curr, ast.Attribute):
                attr_chain.insert(0, curr.attr)
                curr = curr.value
            if isinstance(curr, ast.Name):
                base = curr.id
                if base == "self" and self.current_class:
                    if attr_chain:
                        return f"{self.module_path}::{self.current_class}::{attr_chain[0]}"
                elif base in self.imports:
                    imported = self.imports[base]  # e.g., "widget_framework.src.utils::Utils"
                    if attr_chain:
                        return f"{imported}::{attr_chain[0]}"
                    return imported
                elif base in self.var_mapping:
                    if attr_chain:
                        return f"{self.var_mapping[base]}::{attr_chain[0]}"
                    return self.var_mapping[base]
                else:
                    if attr_chain:
                        return f"{self.module_path}::{base}.{attr_chain[0]}"
                    return f"{self.module_path}::{base}"
        elif isinstance(node.func, ast.Name):
            name = node.func.id
            if name in self.imports:
                return self.imports[name]
            return f"{self.module_path}::{name}"
        return None

    def _dependency_exists(self, dep):
        """
        Checks whether the dependency exists in the structure (nodes).
        Given a dependency string like:
          widget_framework.src.widget_builder::WidgetBuilder::preprocess_input
        we convert it into a node key by:
          1. Converting the first part to a file path (replace dots with os.sep, add ".py")
          2. Appending the remaining parts with "::"
        Only if the resulting key exists in nodes do we keep the dependency.
        """
        parts = dep.split("::")
        if not parts:
            return False
        file_key = parts[0].replace(".", os.sep) + ".py"
        if len(parts) > 1:
            file_key += "::" + "::".join(parts[1:])
        return file_key in self.nodes

    def get_dependencies(self):
        """
        Returns a dict mapping function names to a list of (lineno, dependency) tuples.
        """
        return dict(self.deps)

def analyze_inner_dependencies(global_path, script_rel_path, nodes):
    """
    Given the global_path and the script_rel_path (relative to global_path),
    analyze its inner dependencies.
    Returns a dict mapping fully qualified function/method names to a list of
    (lineno, dependency string) tuples.
    """
    script_path = os.path.join(global_path, script_rel_path)
    module_path = script_rel_path.replace(os.sep, ".").replace(".py", "")
    with open(script_path, "r") as f:
        content = f.read()
    tree = ast.parse(content)
    analyzer = InnerDependencyAnalyzer(module_path, nodes)
    analyzer.visit(tree)
    return analyzer.get_dependencies()

# === Example usage ===
if __name__ == "__main__":
    # Input parameters:
    global_path = "/home/david/Documents/glovo/machine-learning-platform/"
    script_rel_path = "widget_framework/api.py"

    # Assume that 'nodes' has been built previously using your FolderScriptBuilder.
    # For example:
    #   fsb = FolderScriptBuilder(global_path)
    #   root, nodes = fsb.build()
    #   script_analyzer = ScriptAnalyzer(nodes, global_path, query_folder="widget_framework")
    #   script_analyzer.analyze()
    
    dependencies = analyze_inner_dependencies(global_path, script_rel_path, nodes)
    
    # Now you can sort the dependencies by line number for each function before printing.
    for func, dep_tuples in dependencies.items():
        sorted_deps = sorted(dep_tuples, key=lambda tup: tup[0])
        print(f"{func}:")
        for lineno, dep in sorted_deps:
            print(f"  Line {lineno}: {dep}")


widget_framework.api::inference:
  Line 26: widget_framework.src.widget_builder::WidgetBuilder
  Line 33: widget_framework.src.widget_builder::WidgetBuilder::run
widget_framework.api::production_app:
  Line 47: widget_framework.src.utils::Utils::read_avro_contract
  Line 51: widget_framework.api::inference
widget_framework.api::local_app:
  Line 74: widget_framework.src.utils::Utils::read_avro_contract
  Line 78: widget_framework.api::inference


In [181]:
nodes["widget_framework/api.py::inference"]

FunctionNode(inference, widget_framework/api.py::inference)

In [178]:
dependencies['widget_framework.api::inference']

[(26, 'widget_framework.src.widget_builder::WidgetBuilder'),
 (33, 'widget_framework.src.widget_builder::WidgetBuilder::run')]

In [169]:
nodes["widget_framework/src/widget_builder.py::WidgetBuilder::preprocess_input"]

MethodNode(preprocess_input, widget_framework/src/widget_builder.py::WidgetBuilder::preprocess_input)

# Code Analyzer

In [116]:
import ast
from typing import List, Dict

class StatementResolver:
    """
    Recursively examines a single AST statement (and nested statements) to discover 
    calls or instantiations referencing nodes in our 'nodes' dictionary.

    The main entry point is 'resolve(stmt)', which returns a list of Node objects 
    in the sequential order encountered.
    """

    def __init__(self, nodes: Dict[str, Node], script_node: ScriptNode):
        self.nodes = nodes
        self.script_node = script_node
        # You could store more context here if needed, e.g. a 'self_assignments' dict for classes.

    def resolve(self, statement: ast.stmt) -> List[Node]:
        """
        Analyzes the given statement, returns a list of Node dependencies in the order discovered.
        We'll do a depth-first scan but preserve statement order.
        """
        self.found_dependencies: List[Node] = []
        self._visit_node(statement)
        return self.found_dependencies

    # -----------------------------------------------------------------------
    # Generic visitors
    # -----------------------------------------------------------------------
    def _visit_node(self, node: ast.AST):
        """
        Recursively visits an AST node, dispatching specialized logic 
        for different statement/expr types.
        """
        if isinstance(node, ast.Assign):
            self._visit_assign(node)
        elif isinstance(node, ast.Return):
            self._visit_return(node)
        elif isinstance(node, ast.Expr):
            self._visit_expr(node)
        elif isinstance(node, ast.For):
            self._visit_for(node)
        elif isinstance(node, ast.With):
            self._visit_with(node)
        # ... handle more statement types as needed
        # e.g. if, while, try, etc.

        # For any node, we should still check children to catch nested calls.
        for child in ast.iter_child_nodes(node):
            if not isinstance(child, ast.stmt):  
                # e.g. expressions inside the node
                self._visit_expr_node(child)

    def _visit_expr_node(self, node: ast.AST):
        """
        Visits an expression (sub-node) that might contain calls, attributes, etc.
        We'll do a direct check for calls or keep drilling down.
        """
        if isinstance(node, ast.Call):
            self._handle_call(node)
        # Recursively check further children in case there are more nested calls
        for grandchild in ast.iter_child_nodes(node):
            self._visit_expr_node(grandchild)

    # -----------------------------------------------------------------------
    # Specific node visitors
    # -----------------------------------------------------------------------
    def _visit_assign(self, node: ast.Assign):
        """
        e.g. contracts = Utils.read_avro_contract()
        We'll check the right-hand side for calls or references to known nodes.
        """
        # The right-hand side might be a call, an attribute call, 
        # or something nested. Let's walk it:
        self._visit_expr_node(node.value)

    def _visit_return(self, node: ast.Return):
        """
        Example:
            return Utils.function(arg1, arg2)
            return some_call(another_call(...))
        """
        if node.value:
            self._visit_expr_node(node.value)

    def _visit_expr(self, node: ast.Expr):
        """
        Usually something like an expression statement:
          e.g. self.tracer.trace("xxx")
        We'll dig into node.value if it's a call, attribute, etc.
        """
        self._visit_expr_node(node.value)

    def _visit_for(self, node: ast.For):
        """
        If we have a for loop:
            for item in some_list:
                do_something(item)
        The 'some_list' might be a function call or class instantiation. 
        Also the loop body can have statements. 
        We'll check node.iter, then the body.
        """
        self._visit_expr_node(node.iter)
        for stmt in node.body:
            self._visit_node(stmt)
        # Optionally also handle node.orelse

    def _visit_with(self, node: ast.With):
        """
        A with statement can have an 'item' that is a call, e.g.:
            with self.tracer.trace('x'):
                ...
        We'll check each 'item' for calls, then the body.
        """
        for item in node.items:
            if item.context_expr:
                self._visit_expr_node(item.context_expr)
        for stmt in node.body:
            self._visit_node(stmt)

    # -----------------------------------------------------------------------
    # Handling calls
    # -----------------------------------------------------------------------
    def _handle_call(self, call_node: ast.Call):
        """
        We've encountered a function/method/class call like:
            WidgetBuilder(...),
            self.something(...),
            alias(...).
        We'll see if we can map it to a node in 'self.nodes'. 
        If it's a class node, we'll link to its __init__ method instead of returning the class node.
        """
        callee_node = call_node.func
        dep_node = self._resolve_callee_node(callee_node)
        
        # If we resolved a dependency, add it to found_dependencies
        if dep_node:
            self.found_dependencies.append(dep_node)

        # Also examine call arguments for nested calls, which might themselves reference known functions/classes
        for arg in call_node.args:
            self._visit_expr_node(arg)
        for kw in call_node.keywords:
            self._visit_expr_node(kw.value)

    def _resolve_callee_node(self, callee: ast.expr) -> Optional[Node]:
        """
        Attempt to map 'callee' to a known Node in self.nodes.
        If it's a ClassNode, redirect to __init__ if available.
        """
        if isinstance(callee, ast.Name):
            # e.g. 'WidgetBuilder(...)'
            name = callee.id
            # Check if it's an alias in script_node
            if name in self.script_node.aliases:
                fq_path = self.script_node.aliases[name]
                return self._resolve_class_or_function(fq_path)
            else:
                # Possibly a local function in the same script
                candidate_path = f"{self.script_node.path}::{name}"
                return self._resolve_class_or_function(candidate_path)

        elif isinstance(callee, ast.Attribute):
            # e.g. 'WidgetBuilder.run(...)' or 'self.xyz(...)'
            return self._resolve_attribute_callee(callee)

        return None
    
    def _resolve_class_or_function(self, fq_path: str) -> Optional[Node]:
        """
        If fq_path points to a ClassNode, return its __init__ MethodNode if it exists.
        Otherwise, if it points to a FunctionNode/MethodNode, return that.
        If there's no __init__ or it's not recognized, return None or skip.
        """
        dep_node = self.nodes.get(fq_path)
        if not dep_node:
            return None

        # If it's a class, see if there's an __init__ method
        if isinstance(dep_node, ClassNode):
            init_path = f"{dep_node.path}::__init__"
            if init_path in self.nodes:
                # Return the constructor method node
                return self.nodes[init_path]
            else:
                return None  # skip returning the raw class
        else:
            # If it's already a function or method node, we can return it directly
            return dep_node

    def _resolve_attribute_callee(self, attr_node: ast.Attribute) -> Optional[Node]:
        """
        e.g. 'WidgetBuilder.run(...)' or 'alias.run(...)' or 'self.xyz(...)'
        We'll see if the 'base' is an alias or local function, and then combine with '.run'.
        """
        base = attr_node.value
        final_attr = attr_node.attr

        # If base is ast.Name, e.g. 'WidgetBuilder.run'
        if isinstance(base, ast.Name):
            base_name = base.id
            if base_name in self.script_node.aliases:
                fq_path = self.script_node.aliases[base_name]  # e.g. 'widget_framework/src/widget_builder.py::WidgetBuilder'
                # Check if there's a sub-method => e.g. 'WidgetBuilder::run'
                candidate_path = f"{fq_path}::{final_attr}"
                return self._resolve_class_or_function(candidate_path)
            elif base_name == "self":
                # Possibly calling a method in the same class => advanced logic goes here
                return None
            else:
                # Possibly a local function object stored in a variable => advanced logic
                return None

        # If base is another attribute or complex expression, you can expand the logic similarly.
        return None


In [117]:
import ast
from typing import List, Dict, Optional, Union

class CodeAnalyzer:
    """
    Analyzes ClassNodes and FunctionNodes by walking their AST statements.
    A new StatementResolver handles the heavy recursion for each statement.
    """

    def __init__(self, nodes: Dict[str, Node], ast_cache: Dict[str, ast.Module]):
        self.nodes = nodes
        self.ast_cache = ast_cache

    def analyze(self, node_path: Optional[str] = None):
        """
        Entry point:
          - If node_path is provided, and it's a ClassNode or FunctionNode,
            analyze only that node.
          - Otherwise, analyze all ClassNodes and FunctionNodes in the project.
        """
        if node_path:
            node = self.nodes.get(node_path)
            if isinstance(node, ClassNode):
                self._analyze_class(node)
            elif isinstance(node, FunctionNode):
                self._analyze_function(node)
            # else do nothing if not a ClassNode/FunctionNode
        else:
            # Analyze all relevant nodes
            for path, node in self.nodes.items():
                if isinstance(node, ClassNode):
                    self._analyze_class(node)
                elif isinstance(node, FunctionNode):
                    self._analyze_function(node)

    # -----------------------------------------------------------------------
    # Class analysis
    # -----------------------------------------------------------------------
    def _analyze_class(self, class_node: ClassNode):
        """
        For each MethodNode child, analyze it as if it were a function.
        A more advanced approach might unify 'self.attr' usage across methods.
        """
        method_nodes = [
            child for child in class_node.children
            if isinstance(child, MethodNode)
        ]
        for mnode in method_nodes:
            self._analyze_function(mnode)

    # -----------------------------------------------------------------------
    # Function (or method) analysis
    # -----------------------------------------------------------------------
    def _analyze_function(self, fn_node: Union[FunctionNode, MethodNode]):
        """
        1) Locate the ScriptNode that defines this function/method.
        2) Grab the AST for that script from ast_cache.
        3) Find the specific ast.FunctionDef (or ast.ClassDef -> method).
        4) For each statement in the function body, call _analyze_statement.
        5) Each _analyze_statement call returns discovered dependencies in order.
        6) We add them to fn_node's children.
        """
        script_node = self._find_script_node_for_function(fn_node)
        if not script_node:
            return

        module_ast = self.ast_cache.get(script_node.path)
        if not module_ast:
            return

        func_ast = self._find_function_ast(module_ast, fn_node)
        if not func_ast:
            return

        # For each statement, discover new dependencies
        for stmt in func_ast.body:
            # Let the statement resolver do the heavy-lifting
            dependencies_in_stmt = self._analyze_statement(stmt, script_node)
            # Then link them in sequential order
            for dep in dependencies_in_stmt:
                fn_node.add_child(dep)

    def _analyze_statement(
        self,
        stmt: ast.stmt,
        script_node: ScriptNode
    ) -> List[Node]:
        """
        Delegates the actual "deep" analysis of a single statement to StatementResolver.
        StatementResolver returns a list of discovered Node dependencies in the order they appear.
        """
        statement_resolver = StatementResolver(
            nodes=self.nodes,
            script_node=script_node
        )
        return statement_resolver.resolve(stmt)

    # -----------------------------------------------------------------------
    # Utility
    # -----------------------------------------------------------------------
    def _find_script_node_for_function(
        self,
        fn_node: Union[FunctionNode, MethodNode]
    ) -> Optional[ScriptNode]:
        """
        If fn_node.path == 'widget_framework/api.py::function_name',
        we parse out 'widget_framework/api.py' and find that ScriptNode in self.nodes.
        """
        script_part = fn_node.path.split("::")[0]
        possible = self.nodes.get(script_part)
        return possible if isinstance(possible, ScriptNode) else None

    def _find_function_ast(
        self,
        module_ast: ast.Module,
        fn_node: Union[FunctionNode, MethodNode]
    ) -> Optional[ast.FunctionDef]:
        """
        Finds the specific ast.FunctionDef for a function or method inside the given module_ast.
        If it's a method, we find the matching ast.ClassDef, then the method.
        """
        fn_name = fn_node.name
        parent_node = fn_node.parent
        if isinstance(fn_node, MethodNode) and isinstance(parent_node, ClassNode):
            class_name = parent_node.name
            # locate ast.ClassDef with that name
            for top_stmt in module_ast.body:
                if isinstance(top_stmt, ast.ClassDef) and top_stmt.name == class_name:
                    # find the method in this class
                    for sub_stmt in top_stmt.body:
                        if isinstance(sub_stmt, ast.FunctionDef) and sub_stmt.name == fn_name:
                            return sub_stmt
        else:
            # top-level function
            for top_stmt in module_ast.body:
                if isinstance(top_stmt, ast.FunctionDef) and top_stmt.name == fn_name:
                    return top_stmt
        return None


In [129]:
code_analyzer = CodeAnalyzer(nodes, script_analyzer.ast_cache)
code_analyzer.analyze(node_path="widget_framework/api.py::inference")

In [119]:
script_api = root.widget_framework.children[4]
script_api

ScriptNode(api.py, widget_framework/api.py)

In [120]:
script_api.children


[FunctionNode(inference, widget_framework/api.py::inference),
 FunctionNode(production_app, widget_framework/api.py::production_app),
 FunctionNode(local_app, widget_framework/api.py::local_app)]

In [128]:
nodes["widget_framework/src/utils.py::Utils::read_avro_contract"]

MethodNode(read_avro_contract, widget_framework/src/utils.py::Utils::read_avro_contract)

In [121]:
fun_inference = script_api.children[0]
fun_prod_app = script_api.children[1]
fun_prod_app

FunctionNode(production_app, widget_framework/api.py::production_app)

In [122]:
vars(fun_prod_app)

{'name': 'production_app',
 'path': 'widget_framework/api.py::production_app',
 'parent': ScriptNode(api.py, widget_framework/api.py),
 'children': []}

In [123]:
vars(fun_inference)

{'name': 'inference',
 'path': 'widget_framework/api.py::inference',
 'parent': ScriptNode(api.py, widget_framework/api.py),
 'children': [MethodNode(__init__, widget_framework/src/widget_builder.py::WidgetBuilder::__init__)]}

In [95]:
fun_read_avro = fun_prod_app.children[0]
fun_inference = fun_prod_app.children[1]

In [98]:
fun_inference

FunctionNode(inference, widget_framework/api.py::inference)

In [97]:
vars(fun_inference)

{'name': 'inference',
 'path': 'widget_framework/api.py::inference',
 'parent': FunctionNode(production_app, widget_framework/api.py::production_app),
 'children': []}