In [84]:

from langgraph.graph import StateGraph, END, START
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import json
import os
import re
from typing import TypedDict, List, Dict, Optional
from IPython.display import Image, display
import openpyxl
import re
from openpyxl.utils import get_column_letter, coordinate_to_tuple, column_index_from_string

from openpyxl.formula.tokenizer import Tokenizer
import ast
load_dotenv()

True

In [85]:


def get_dependencies_from_tokens(formula, current_sheet, all_headers_map):
    """
    Uses the official Excel formula tokenizer to find sheet and cell references.
    """
    tok = Tokenizer(formula)
    dependencies = []

    for token in tok.items:
        # We only care about OPERAND tokens (cells, ranges, sheet refs)
        if token.type == "OPERAND":
            value = token.value
            sheet_name = current_sheet
            
            # 1. Check if there is a sheet reference (e.g., Sales!D2)
            if "!" in value:
                sheet_part, cell_part = value.split("!")
                sheet_name = sheet_part.strip("'")
                value = cell_part
            
            # 2. Extract column letters from the cell/range (e.g., D2:D10 -> D)
            # Find all sequences of letters that look like column IDs
            col_matches = re.findall(r'[A-Z]+', value)
            
            for col_letter in col_matches:
                try:
                    col_idx = column_index_from_string(col_letter)
                    if sheet_name in all_headers_map and col_idx in all_headers_map[sheet_name]:
                        header = all_headers_map[sheet_name][col_idx]
                        dependencies.append(f"{sheet_name}.{header}")
                except ValueError:
                    continue # Not a valid column letter

    return list(set(dependencies))

def get_all_sheet_headers(wb):
    all_headers = {}
    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        all_headers[sheet_name] = {
            c: ws.cell(row=1, column=c).value 
            for c in range(1, ws.max_column + 1) 
            if ws.cell(row=1, column=c).value
        }
    return all_headers

def extract_metadata_final(file_path):
    wb = openpyxl.load_workbook(file_path, data_only=False)
    # Peek at types only
    wb_types = openpyxl.load_workbook(file_path, data_only=True)
    
    all_headers = get_all_sheet_headers(wb)
    metadata = {}

    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        ws_types = wb_types[sheet_name]
        sheet_formulas = []
        
        # Track formula headers to filter them out of raw inputs later
        formula_header_names = []

        for col_idx, header in all_headers[sheet_name].items():
            cell = ws.cell(row=2, column=col_idx)
            # Get evaluated value to determine data type
            type_val = ws_types.cell(row=2, column=col_idx).value
            
            if cell.data_type == 'f':
                formula = cell.value
                formula_header_names.append(header)
                
                deps = get_dependencies_from_tokens(formula, sheet_name, all_headers)
                deps = [d for d in deps if d != f"{sheet_name}.{header}"]

                sheet_formulas.append({
                    "column": header,
                    "formula": formula,
                    "dtype": type(type_val).__name__ if type_val is not None else "Unknown",
                    "depends_on": deps,
                    "method_name": f"calculate_{str(header).lower().replace(' ', '_')}"
                })

        # --- UPDATED RAW INPUTS LOGIC ---
        raw_inputs_with_types = []
        for col_idx, header in all_headers[sheet_name].items():
            if header not in formula_header_names:
                # Get type for raw data columns too
                raw_val = ws_types.cell(row=2, column=col_idx).value
                raw_inputs_with_types.append({
                    "column": header,
                    "dtype": type(raw_val).__name__ if raw_val is not None else "Unknown"
                })

        metadata[sheet_name] = {
            "formulas": sheet_formulas,
            "raw_inputs": raw_inputs_with_types
        }

    return metadata

In [86]:


class AgentState(TypedDict):
    metadata: Dict           # Metadata from your Excel extractor
    file_path: str           # Path to the source Excel file
    functions: List[str]      # Individual method blocks generated by the Developer
    full_code: str           # The final assembled Python script
    error_log: Optional[str] # Feedback for the LLM if something fails
    iterations: int          # Counter to prevent infinite loops
    success: bool            # Final flag for the workflow
    

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def function_developer_node(state: AgentState):
    all_functions = []
    metadata = state["metadata"]
    
    # System prompt to enforce naming and docstring rules
    system_msg = (
        "You are a Python expert. Create a vectorized Pandas function for an Excel formula. "
        "Each function must include a docstring clearly listing its Excel dependencies. "
        "Naming convention: use the 'method_name' provided in metadata."
    )

    for sheet_name, content in metadata.items():
        for item in content['formulas']:
            prompt = f"""
            Sheet: {sheet_name}
            Column: {item['column']}
            Formula: {item['formula']}
            Data Type: {item['dtype']}
            Dependencies: {item['depends_on']}
            Method Name: {item['method_name']}
            
            Generate a Python method for a class. Use 'df' as the input. Import required modules within the function. Return ONLY the code inside a markdown code block. No need to be chatty.
            Example format:
            def calculate_tax(self, df):
                '''
                Excel Formula: {item['formula']}
                Dependencies: {', '.join(item['depends_on'])}
                '''
                df['{item['column']}'] = ... 
                return df

            """
            response = llm.invoke([("system", system_msg), ("human", prompt)])
            code_match = re.search(r"```python\s+(.*?)\s+```", response.content, re.DOTALL)

            if code_match:

                code = code_match.group(1)

            else:

                # Fallback if no backticks

                code = response.content
            
            # print(code)
            # print("/n")

            all_functions.append(code)
                    # print(response)
            
    return {"functions": all_functions}

def orchestrator_node(state: AgentState):
    """
    Takes all generated functions and the metadata to write the final, 
    production-ready Python class.
    """
    metadata = state["metadata"]
    functions_list = "\n\n".join(state["functions"])
    
    system_msg = (
        "You are a Senior Software Architect. Your job is to assemble a final, "
        "production-ready Python script. You must output the ENTIRE code block "
        "including imports, the class definition, all methods, and the orchestrator."
    )

    prompt = f"""
    METADATA:
    {metadata}

    INDIVIDUAL GENERATED FUNCTIONS:
    {functions_list}

    TASK:
    Write the FINAL and COMPLETE Python script. 
    
    REQUIREMENTS:
    1. Include imports: `import pandas as pd` and `import numpy as np`.
    2. Define `class ExcelModel:`.
    3. Include an `__init__` method if necessary to handle multi-sheet dataframes.
    4. Paste all the INDIVIDUAL GENERATED FUNCTIONS provided above inside the class.
    5. Write a `transform(self, all_sheets_dict)` method.
    6. CRITICAL: Inside `transform`, call the functions in the correct order based on the 'depends_on' metadata.
    7. Ensure every function has the docstrings indicating dependencies as previously generated.
    8. Output the FULL CODE explicitly. Do not use placeholders like '# ... rest of code'.
    """

    response = llm.invoke([("system", system_msg), ("human", prompt)])
    
    # Clean the code block
    code_match = re.search(r"```python\s+(.*?)\s+```", response.content, re.DOTALL)

    if code_match:

        code = code_match.group(1)

    else:

        # Fallback if no backticks

        code = response.content
    
    return {"full_code": code, "success": True}

def syntax_check_node(state: AgentState):
    """
    Validates that the generated full_code is syntactically correct Python.
    """
    code = state["full_code"]
    
    try:
        # 'exec' mode checks the whole block (imports, classes, methods)
        ast.parse(code) 
        print("✅ Syntax validation passed.")
        return {"success": True, "error_log": None}
    
    except SyntaxError as e:
        # Capture the specific line and reason for the LLM to fix
        error_details = f"SyntaxError on line {e.lineno}: {e.msg}\nContext: {e.text}"
        print(f"❌ Syntax validation failed: {error_details}")
        return {
            "success": False, 
            "error_log": f"The code you generated has a syntax error. Please fix it:\n{error_details}",
            "iterations": state["iterations"] + 1
        }
    
def should_retry(state: AgentState):
    if state["success"] or state["iterations"] >= 3:
        return END
    return "orchestrator"


In [87]:
builder = StateGraph(AgentState)

builder.add_node("developer", function_developer_node)
builder.add_node("orchestrator", orchestrator_node)
builder.add_node("syntax_check", syntax_check_node)

builder.add_edge(START, "developer")
builder.add_edge("developer", "orchestrator")
builder.add_edge("orchestrator", "syntax_check")
builder.add_conditional_edges("syntax_check", should_retry)

app = builder.compile()

In [88]:
if __name__ == "__main__":
    # 1. Define your file paths
    INPUT_EXCEL = "complex_financial_model_4.xlsx"
    OUTPUT_FILE = "excel_model_replica.py"
    
    # 2. Extract Metadata (using our previous extractor)
    print(f"--- Phase 1: Extracting Metadata from {INPUT_EXCEL} ---")
    metadata_payload = extract_metadata_final(INPUT_EXCEL)
    
    # 3. Invoke the LangGraph Agent
    print("--- Phase 2: Running LangGraph Agentic Workflow ---")
    final_state = app.invoke({
        "metadata": metadata_payload, 
        "file_path": INPUT_EXCEL, 
        "iterations": 0, 
        "functions": [],
        "full_code": "",
        "success": False,
        "error_log": None
    })
    
    # 4. Save the Final Code to Disk
    if final_state["success"]:
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            f.write(final_state["full_code"])
        
        print("-" * 30)
        print(f"✅ SUCCESS: Python model saved to: {os.path.abspath(OUTPUT_FILE)}")
        print(f"Total Iterations: {final_state['iterations']}")
    else:
        print("-" * 30)
        print("❌ FAILURE: The agent could not generate valid code.")
        print(f"Last Error: {final_state['error_log']}")

--- Phase 1: Extracting Metadata from complex_financial_model_4.xlsx ---
--- Phase 2: Running LangGraph Agentic Workflow ---
✅ Syntax validation passed.
------------------------------
✅ SUCCESS: Python model saved to: c:\Users\arind\Documents\GitHub\LangGraph\LangGraph\excel_model_replica.py
Total Iterations: 0
