In [None]:
import re
import openpyxl
from collections import deque
import os
import ssl
import certifi
import httpx
import openai
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from typing import Dict, List, Any, Optional, Tuple, Set

A1_REF_RE = re.compile(
    r"(?:(?P<sheet>'[^']+'|[A-Za-z0-9_ .-]+)\!)?"
    r"(?P<ref>\$?[A-Za-z]{1,3}\$?\d+(?::\$?[A-Za-z]{1,3}\$?\d+)?)"
)

def col_to_num(col: str) -> int:
    col = col.replace("$", "").upper()
    n = 0
    for ch in col:
        if "A" <= ch <= "Z":
            n = n * 26 + (ord(ch) - 64)
    return n

def num_to_col(n: int) -> str:
    s = []
    while n:
        n, r = divmod(n - 1, 26)
        s.append(chr(r + 65))
    return "".join(reversed(s))

def split_a1(cell: str) -> Tuple[str, str]:
    cell = cell.upper()
    i = 0
    while i < len(cell) and (cell[i] == "$" or cell[i].isalpha()):
        i += 1
    return cell[:i], cell[i:]

def expand_range(a: str, b: str) -> List[str]:
    col_a, row_a = split_a1(a)
    col_b, row_b = split_a1(b)
    c1, c2 = col_to_num(col_a), col_to_num(col_b)
    r1, r2 = int(row_a.replace("$", "")), int(row_b.replace("$", ""))
    out = []
    for c in range(min(c1, c2), max(c1, c2) + 1):
        for r in range(min(r1, r2), max(r1, r2) + 1):
            out.append(f"{num_to_col(c)}{r}")
    return out

def parse_formula_refs(formula: str, current_sheet: Optional[str]) -> List[Dict[str, str]]:
    items: List[Dict[str, str]] = []
    if not formula:
        return items
    for m in A1_REF_RE.finditer(formula):
        sheet = m.group("sheet")
        ref = m.group("ref").upper().replace("$", "")
        if sheet:
            sheet = sheet.strip("'")
        else:
            sheet = current_sheet
        if ":" in ref:
            a, b = ref.split(":")
            items.append({"type":"range","sheet":sheet,"ref":ref,"a":a,"b":b})
        else:
            items.append({"type":"cell","sheet":sheet,"ref":ref})
    return items

def open_wb_formulas(path: str):
    return openpyxl.load_workbook(path, data_only=False, read_only=True)

def open_wb_values(path: str):
    return openpyxl.load_workbook(path, data_only=True, read_only=True)

def _strip_abs(a1: str) -> str:
    return a1.replace("$", "").upper()

def get_formula_text(ws, coord: str) -> Optional[str]:
    c = ws[_strip_abs(coord)]
    if c is None:
        return None
    val = c.value
    if val is None:
        return None
    s = str(val)
    if getattr(c, "data_type", None) == "f" or s.startswith("="):
        return s[1:] if s.startswith("=") else s
    return None

def get_formula_with_eq(ws, coord: str) -> Optional[str]:
    f = get_formula_text(ws, coord)
    if f is None:
        return None
    return f if f.startswith("=") else "=" + f

def get_cell_value(ws, coord: str):
    c = ws[_strip_abs(coord)]
    return None if c is None else c.value

class ExcelAgentTools:
    def __init__(self, workbook_path: str, max_range: int = 150):
        self.path = workbook_path
        self.max_range = max_range
        self._wf = open_wb_formulas(workbook_path)
        self._wv = open_wb_values(workbook_path)

    # ---------- tools ----------
    def get_cell(self, sheet: str, coord: str) -> Dict:
        sheet = sheet
        coord = coord.upper()
        if sheet not in self._wf.sheetnames:
            return {"error": f"Sheet '{sheet}' not found"}
        ws_f = self._wf[sheet]
        ws_v = self._wv[sheet]
        f = get_formula_text(ws_f, coord)
        feq = get_formula_with_eq(ws_f, coord)
        val = get_cell_value(ws_v, coord)
        return {
            "node_id": f"{sheet}!{coord}",
            "sheet": sheet,
            "coord": coord,
            "is_formula": f is not None,
            "formula": f,
            "display_formula": feq,
            "value": val
        }

    def list_precedents(self, sheet: str, coord: str) -> Dict:
        """
        Lists direct precedents. Ranges are included as a single object (unexpanded) 
        for LLM translation, while single cells are listed individually.
        """
        if sheet not in self._wf.sheetnames:
            return {"error": f"Sheet '{sheet}' not found"}
        ws_f = self._wf[sheet]
        f = get_formula_text(ws_f, coord)
        if f is None:
            return {"precedents": []}  # static
        
        # Parse all references in the formula
        refs = parse_formula_refs(f, current_sheet=sheet)
        out = []
        
        for it in refs:
            # Normalize the sheet name for the node_id
            ref_sheet = it["sheet"]
            
            if it["type"] == "cell":
                out.append({
                    "node_id": f"{ref_sheet}!{_strip_abs(it['ref'])}", # Normalized coord
                    "sheet": ref_sheet,
                    "coord": it["ref"],
                    "type": "cell"  # Explicit type for clarity
                })
                
            else:
                range_ref = it["ref"]
                # Use the full, non-absolute range string for the node_id
                out.append({
                    "node_id": f"{ref_sheet}!{_strip_abs(range_ref)}", # Normalized range (N1037:Y1037)
                    "sheet": ref_sheet,
                    "coord": range_ref,
                    "type": "range",
                    "a": it["a"],
                    "b": it["b"],
                })

        return {"precedents": out}

    def close(self):
        try: self._wf.close()
        except: pass
        try: self._wv.close()
        except: pass

In [None]:
class FormulaTraversalAgent:
    def __init__(self, tools: ExcelAgentTools):
        self.tools = tools
        self.cache: Dict[str, Dict[str, Any]] = {} # Cache for already processed cells
        self.leaf_nodes: Set[str] = set() # To store final, static inputs
        self.processed: Set[str] = set() # To prevent infinite recursion

    def _get_or_fetch_cell(self, node_id: str, sheet: str, coord: str) -> Dict[str, Any]:
        """Fetch cell data or return from cache."""
        if node_id in self.cache:
            return self.cache[node_id]

        data = self.tools.get_cell(sheet, coord)
        self.cache[node_id] = data
        return data

    def traverse_formula_tree(self, start_sheet: str, start_coord: str) -> Dict[str, Any]:
        """
        Recursively traverses the formula dependencies starting from a target cell.
        Returns a dictionary representing the full dependency graph.
        """
        start_node_id = f"{start_sheet}!{start_coord.upper()}"
        queue = deque([start_node_id])
        graph: Dict[str, Dict[str, Any]] = {}
        
        # Initialize the starting node
        start_node_data = self._get_or_fetch_cell(start_node_id, start_sheet, start_coord)
        graph[start_node_id] = {**start_node_data, "children": []}
        
        self.processed.add(start_node_id)
        self.leaf_nodes.clear() # Reset for this run

        while queue:
            parent_id = queue.popleft()
            
            # Fetch cell data (from cache or live)
            parent_data = self._get_or_fetch_cell(
                parent_id, 
                graph[parent_id]["sheet"], 
                graph[parent_id]["coord"]
            )
            
            if not parent_data["is_formula"]:
                # If a node has no formula, it's a static leaf input
                self.leaf_nodes.add(parent_id)
                continue

            # List the immediate dependencies (children)
            precedents_result = self.tools.list_precedents(
                parent_data["sheet"], 
                parent_data["coord"]
            )
            
            # Ensure the current parent_id exists in the graph structure
            if parent_id not in graph:
                graph[parent_id] = {**parent_data, "children": []}

            for child in precedents_result["precedents"]:
                child_id = child["node_id"]
                child_type = child.get("type", "cell") # Default to 'cell' for safety

                # Add child to the parent's dependency list
                if child_id not in graph[parent_id]["children"]:
                    graph[parent_id]["children"].append(child_id)

                if child_id not in self.processed:
                    self.processed.add(child_id)
                    
                    if child_type == "range":
                        # If it's a range, treat it as a terminal node (leaf) 
                        # for the purpose of the dependency graph.
                        
                        # Create a minimal node for the graph
                        graph[child_id] = {
                            "sheet": child["sheet"], 
                            "coord": child["coord"],
                            "is_formula": False,  # Treat ranges as non-formula inputs
                            "children": [] 
                        }
                        self.leaf_nodes.add(child_id)
                        continue # Skip the rest of the loop and move to the next precedent
                   
                    # Fetch and initialize the child node in the graph (only for single cell refs)
                    child_data = self._get_or_fetch_cell(child_id, child["sheet"], child["coord"])
                    graph[child_id] = {**child_data, "children": []}
                    
                    # Only queue if the child itself is a formula
                    if child_data["is_formula"]:
                        queue.append(child_id)
                    else:
                        # Direct static leaf node found
                        self.leaf_nodes.add(child_id)


        return {"graph": graph, "leaf_nodes": list(self.leaf_nodes)}


In [None]:
def topological_sort_dependencies(dependency_tree: Dict[str, Any]) -> List[str]:

    graph: Dict[str, Dict[str, Any]] = dependency_tree['graph']
    
    # In-degree: Count of incoming edges (precedents) for each formula node.
    in_degree: Dict[str, int] = {}
    
    # Adjacency List: Map a node to the formula nodes that depend on it.
    # We only care about dependencies between formula nodes, as leaf nodes are the starting points.
    adj: Dict[str, List[str]] = {node_id: [] for node_id in graph.keys()}
    
    # Initialize all formula nodes (non-leaf nodes) to have an in-degree of 0
    formula_nodes = [node_id for node_id, data in graph.items() if data.get("is_formula")]
    for node_id in formula_nodes:
        in_degree[node_id] = 0

    # Build In-Degrees and Adjacency List
    for node_id in formula_nodes:
        # 'node_id' is the parent
        parent_children_ids = graph[node_id].get("children", [])
        
        for child_id in parent_children_ids:
            # Check if the child is also a formula node (i.e., not a leaf node)
            if child_id in in_degree: 
                # We track the dependency in the reverse direction for the sort:
                # The child must be calculated first, and the parent (node_id) depends on it.
                # 'adj' will map the dependency (child_id) to the node that uses its result (node_id).
                adj[child_id].append(node_id) 
                
                # Increment the in-degree of the parent node
                in_degree[node_id] += 1

    # Initialize Queue with Ready Nodes
    # Ready nodes are formula nodes whose in-degree is 0. 
    # This means they only depend on static leaf nodes or previously processed formulas.
    queue = deque([node_id for node_id in formula_nodes if in_degree[node_id] == 0])
    
    sorted_order: List[str] = []

    # Process Graph
    while queue:
        # Get the next formula node ready for calculation
        u = queue.popleft()
        sorted_order.append(u)

        # Iterate over all nodes that depend on the current node 'u'
        for v in adj.get(u, []):
            # Decrement the in-degree of the dependent node 'v'
            in_degree[v] -= 1
            
            # If the dependent node 'v' now has an in-degree of 0, 
            # it means all its prerequisites are met, and it's ready for calculation.
            if in_degree[v] == 0:
                queue.append(v)

    # Check for circular dependencies (don't believe I've seen any thus far)
    if len(sorted_order) != len(formula_nodes):
        raise ValueError(
            "Circular dependency detected in the Excel formula graph. "
            "Cannot produce a valid calculation order."
        )

    return sorted_order

In [None]:
def initialize_azure_client(division="ts", region="eastus2", api_version="2024-10-21"):
    openai_endpoints = {
            'ts': {
                'eastus':'https://air-ts-eastus.openai.azure.com/',
                'eastus2':'https://air-ts-eastus2.openai.azure.com/',
                'northcentralus':'https://air-poc-northcentralus.openai.azure.com/',
            },
            'ps': {
                'eastus':'https://air-ps-eastus.openai.azure.com/',
                'eastus2':'https://air-ps-eastus2.openai.azure.com/',
                'northcentralus':'https://air-poc-northcentralus.openai.azure.com/'
            },
        }
    openai_endpoint = openai_endpoints[division][region]
    token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")
    ctx = ssl.create_default_context(cafile=os.environ.get('REQUESTS_CA_BUNDLE', certifi.where()))
    httpx_client = httpx.Client(verify=ctx)
    openai_client = openai.AzureOpenAI(
        api_version=api_version,
        azure_endpoint=openai_endpoint,
        azure_ad_token_provider=token_provider,
        http_client=httpx_client
    )
    return openai_client


SYSTEM_PROMPT = """
You are a highly specialized Excel-to-Python Formula Translator.
Your task is to convert a given Excel formula into functionally equivalent Python code.
The resulting Python code MUST use variables that directly correspond to the provided 'CELL_LOOKUP' references.

Conversion Rules:
1.  **Strictly use NumPy for array/range operations** (e.g., SUM, AVERAGE, MIN, MAX).
2.  **Translate range references to array indexing.** E.g., a reference to 'S!C10:C20' corresponds to the Python variable 'S_C10_C20_array'.
3.  **Translate single cell references to direct variables.** E.g., 'S!A1' corresponds to the Python variable 'S_A1_value'.
4.  **Use 'np.where' for IF/IFS functions.** E.g., IF(A1>0, B1, 0) becomes np.where(A1_value > 0, B1_value, 0).
5.  **Use standard Python for basic arithmetic.**
6.  **Optimize and remove redundancies where possible.** E.g., SUM(A1) can be simplified to A1_value.
7.  **Ensure robust data type handling.**
8.  **Return ONLY the raw, executable Python expression.** Do not include explanations, comments, or surrounding function definitions (unless the formula itself requires multiple lines, in which case use a single code block). Do not include import statements.
"""

llm_client = initialize_azure_client()
llm_model = "gpt-4o"

In [None]:
def translate_formula_to_python(
    llm_client, 
    llm_model: str, 
    excel_formula: str, 
    dependencies: List[Dict[str, str]]
) -> str:

    # 1. Prepare the CELL_LOOKUP for the prompt
    cell_lookup = {}
    
    for dep in dependencies:
        node_id = dep["node_id"] # e.g., 'S!A10'
        
        # Determine the variable name and type based on the reference structure
        sheet, ref = node_id.split('!')
        
        # Check if the dependency came from an expanded range (indicating an array)
        is_range_ref = dep.get("from_range") is not None
        
        # Simple heuristic for naming
        if is_range_ref or ":" in ref:
            # Use the original range ref if available, e.g., 'S_C10_C20_array'
            var_name = f"{sheet}_{ref.replace(':', '_')}_array" 
        else:
            # Single cell reference
            var_name = f"{sheet}_{ref}_value"
            
        cell_lookup[node_id] = var_name

    lookup_str = "\n".join([f"{k} -> {v}" for k, v in cell_lookup.items()])

    # 2. Construct the User Prompt
    user_prompt = f"""
    EXCEL FORMULA:
    {excel_formula}

    CELL_LOOKUP (Map Excel references to Python variables):
    {lookup_str}

    TARGET OUTPUT:
    Generate the Python expression that calculates the result of the EXCEL FORMULA using the variables from the CELL_LOOKUP.
    """

    # 3. Call the LLM
    try:
        response = llm_client.chat.completions.create(
            model=llm_model,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0  # Set temp to 0 for deterministic code generation
        )
        
        # Extract and clean the generated code
        python_code = response.choices[0].message.content.strip()
        
        # Simple cleanup for potential markdown code blocks
        if python_code.startswith("```"):
            # Try to strip common code block markers (e.g., ```python)
            lines = python_code.split('\n')
            if len(lines) > 2:
                # Remove the first and last line (```)
                python_code = '\n'.join(lines[1:-1]).strip()

        return var_name, python_code

    except Exception as e:
        print(f"LLM translation failed: {e}")
        return f"# ERROR: Translation failed for formula: {excel_formula}"

In [None]:
def assemble_python_code(
    llm_client, 
    llm_model: str, 
    agent_tools: Any, 
    dependency_tree: Dict[str, Any], 
    sorted_calc_order: List[str]
) -> str:
    
    python_script_lines = ["import numpy as np\n"]
    graph = dependency_tree['graph']
    
    # Iterate through the topologically sorted list
    for node_id in sorted_calc_order:
        node_data = graph[node_id]
        excel_formula = node_data['formula']
        
        # Determine the variable name for the result of this calculation
        sheet, coord = node_id.split('!')
        coord = coord.upper().replace("$", "")
        
        # Get the direct precedents for the LLM translator
        precedents_result = agent_tools.list_precedents(sheet, coord)
        dependencies = precedents_result['precedents']
        
        # Call the LLM Translator
        try:
            # The LLM outputs the raw expression (e.g., 'np.where(M_I1037_value > 0, 1, 0)')
            result_var_name, python_expression = translate_formula_to_python(
                llm_client=llm_client, 
                llm_model=llm_model, 
                excel_formula=excel_formula, 
                dependencies=dependencies
            )
            
            # Assemble the assignment statement
            # The result of the expression is assigned to the expected variable name.
            assignment_line = f"{result_var_name} = {python_expression}"
            
            python_script_lines.append(f"# --- {node_id}: {excel_formula} ---")
            python_script_lines.append(assignment_line)
            python_script_lines.append("\n")
            
        except Exception as e:
            python_script_lines.append(f"# ERROR: Failed to translate {node_id}: {e}")
            
    return "\n".join(python_script_lines)

In [None]:
agent_tools = ExcelAgentTools('../data/LIAS_Senegal.xlsx')
traversal_agent = FormulaTraversalAgent(agent_tools)
dependency_tree = traversal_agent.traverse_formula_tree('B', 'E277')

sorted_calc_order = topological_sort_dependencies(dependency_tree)

dependency_tree

{'graph': {'B!E277': {'node_id': 'B!E277',
   'sheet': 'B',
   'coord': 'E277',
   'is_formula': True,
   'formula': 'IF(M!$I1037="","",M!$I1037*M!AA1037)',
   'display_formula': '=IF(M!$I1037="","",M!$I1037*M!AA1037)',
   'value': 2.54368932,
   'children': ['M!I1037', 'M!AA1037']},
  'M!I1037': {'node_id': 'M!I1037',
   'sheet': 'M',
   'coord': 'I1037',
   'is_formula': False,
   'formula': None,
   'display_formula': None,
   'value': 2.0,
   'children': []},
  'M!AA1037': {'node_id': 'M!AA1037',
   'sheet': 'M',
   'coord': 'AA1037',
   'is_formula': True,
   'formula': 'IF(OR(SUM($E1037)=0,SUM(N1037)=0),IF($H1037="",IF(AND($E1037>0,$AM1037>0),$F1037,AA$1847),$H1037),IF($H1037="",$E1037/N1037,IF($F1037=0,$H1037,$H1037/$F1037*$E1037/N1037)))*AO1037',
   'display_formula': '=IF(OR(SUM($E1037)=0,SUM(N1037)=0),IF($H1037="",IF(AND($E1037>0,$AM1037>0),$F1037,AA$1847),$H1037),IF($H1037="",$E1037/N1037,IF($F1037=0,$H1037,$H1037/$F1037*$E1037/N1037)))*AO1037',
   'value': 1.27184466,
   'c

In [25]:
py_script = assemble_python_code(
    llm_client=llm_client,
    llm_model=llm_model,
    agent_tools=agent_tools,
    dependency_tree=dependency_tree,
    sorted_calc_order=sorted_calc_order
)

print(py_script)

import numpy as np

# --- M!AM1037: IF(SUM(N1037:Y1037)=0,0,AVERAGE(N1037:Y1037)) ---
M_AM1037_value = np.where(np.sum(M_N1037_Y1037_array) == 0, 0, np.mean(M_N1037_Y1037_array))


# --- M!AM1847: IF(SUM(N1847:Y1847)=0,0,AVERAGE(N1847:Y1847)) ---
M_AM1847_value = np.where(np.sum(M_N1847_Y1847_array) == 0, 0, np.mean(M_N1847_Y1847_array))


# --- M!F1037: IF(OR(E1037="",AM1037=0),1,E1037/AM1037) ---
M_F1037_value = np.where((M_E1037_value == "") | (M_AM1037_value == 0), 1, M_E1037_value / M_AM1037_value)


# --- M!F1847: IF(OR(E1847="",AM1847=0),1,E1847/AM1847) ---
M_F1847_value = np.where((M_E1847_value == "") | (M_AM1847_value == 0), 1, M_E1847_value / M_AM1847_value)


# --- M!AA1847: IF(OR(SUM($E1847)=0,SUM(N1847)=0),IF($H1847="",$F1847,$H1847),IF($H1847="",$E1847/N1847,IF($F1847=0,$H1847,$H1847/$F1847*$E1847/N1847)))*$AO1847 ---
M_AA1847_value = np.where(
    (M_E1847_value == 0) | (M_N1847_value == 0),
    np.where(
        M_H1847_value == "",
        M_F1847_value,
        M_H18