In [1]:
!!pip install litellm

['Collecting litellm',
 '  Downloading litellm-1.74.8-py3-none-any.whl.metadata (40 kB)',
 '\x1b[?25l     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m0.0/40.4 kB\x1b[0m \x1b[31m?\x1b[0m eta \x1b[36m-:--:--\x1b[0m',
 '\x1b[2K     \x1b[91m━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[90m╺\x1b[0m\x1b[90m━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m20.5/40.4 kB\x1b[0m \x1b[31m2.2 MB/s\x1b[0m eta \x1b[36m0:00:01\x1b[0m',
 '\x1b[2K     \x1b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[91m╸\x1b[0m\x1b[90m━━━━━━━━━\x1b[0m \x1b[32m30.7/40.4 kB\x1b[0m \x1b[31m396.8 kB/s\x1b[0m eta \x1b[36m0:00:01\x1b[0m',
 '\x1b[2K     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m40.4/40.4 kB\x1b[0m \x1b[31m422.7 kB/s\x1b[0m eta \x1b[36m0:00:00\x1b[0m',
 'Collecting python-dotenv>=0.2.0 (from litellm)',
 '  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)',
 'Downloading litellm-1.74.8-py3-none-any.whl (8.7 MB)',
 '\x1b[?25l   \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m 

In [3]:
import os
import json
import time
import traceback
import inspect
from typing import List, Dict, Any
from dataclasses import dataclass, field

try:
    from litellm import completion
except ImportError:
    print("Please install LiteLLM: pip install litellm")
    raise

# ---------------------- CONFIG ---------------------- #
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
GROQ_KEY = os.environ.get("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")

if not GROQ_KEY:
    import getpass
    GROQ_KEY = getpass.getpass("Enter your Groq API key:")

# --------- TOOL REGISTRATION DECORATOR --------- #
tools = {}
tools_by_tag = {}

def register_tool(tool_name=None, description=None, parameters_override=None, terminal=False, tags=None):
    def decorator(func):
        signature = inspect.signature(func)
        type_hints = getattr(func, '__annotations__', {})
        args_schema = {
            "type": "object",
            "properties": {},
            "required": []
        }
        for param_name, param in signature.parameters.items():
            if param_name in ["action_context", "action_agent"]: continue
            param_type = type_hints.get(param_name, str)
            if param_type == str:
                json_type = "string"
            elif param_type == int:
                json_type = "integer"
            elif param_type == float:
                json_type = "number"
            elif param_type == bool:
                json_type = "boolean"
            elif param_type == dict:
                json_type = "object"
            elif param_type == list:
                json_type = "array"
            else:
                json_type = "string"
            args_schema["properties"][param_name] = {"type": json_type}
            if param.default == inspect.Parameter.empty:
                args_schema["required"].append(param_name)
        if parameters_override:
            args_schema = parameters_override
        tools[func.__name__] = {
            "description": description or func.__doc__ or "",
            "parameters": args_schema,
            "function": func,
            "terminal": terminal,
            "tags": tags or []
        }
        for tag in (tags or []):
            if tag not in tools_by_tag:
                tools_by_tag[tag] = []
            tools_by_tag[tag].append(func.__name__)
        return func
    return decorator

# --------------- LITELLM+GROQ EXPERT CALLER ---------------- #
def prompt_expert(action_context, description_of_expert, prompt) -> Any:
    print(f"\n[LLM Expert: {description_of_expert}]\nPrompt (truncated): {prompt[:290]}...\n")
    try:
        resp = completion(
            model=f"groq/{GROQ_MODEL}",
            messages=[
                {"role": "system", "content": description_of_expert},
                {"role": "user", "content": prompt}
            ],
            api_key=GROQ_KEY,
            max_tokens=400
        )
        # --- Extract actual response content robustly ---
        # Newer LiteLLM returns ModelResponse object
        # Try all options in order:
        text = None
        # Case 1: resp.choices[0].message.content, common in OpenAI-style models
        if hasattr(resp, "choices") and hasattr(resp.choices[0], "message") and hasattr(resp.choices[0].message, "content"):
            text = resp.choices[0].message.content
        # Case 2: resp.choices[0].text
        elif hasattr(resp, "choices") and hasattr(resp.choices[0], "text"):
            text = resp.choices[0].text
        # Case 3: dict with choices
        elif isinstance(resp, dict) and "choices" in resp:
            if "message" in resp["choices"][0]:
                text = resp["choices"][0]["message"]["content"]
            elif "text" in resp["choices"][0]:
                text = resp["choices"][0]["text"]
        # Case 4: fallback to string (rare)
        elif isinstance(resp, str):
            text = resp
        else:
            text = str(resp)

        # Try to parse as JSON if possible (for compliance check output)
        try:
            return json.loads(text)
        except Exception:
            return text.strip() if isinstance(text, str) else text
    except Exception as e:
        print(f"Error in prompt_expert LLM call: {e}")
        return f"LLM error: {e}"



# --------- TOOLS (Groq-backed) --------- #
@register_tool(tags=["invoice_processing", "categorization"])
def categorize_expenditure(action_context, description: str) -> str:
    """
    Categorize an invoice expenditure based on a short description.
    Returns a category name from the predefined set of 20 categories.
    """
    categories = [
        "Office Supplies", "IT Equipment", "Software Licenses", "Consulting Services",
        "Travel Expenses", "Marketing", "Training & Development", "Facilities Maintenance",
        "Utilities", "Legal Services", "Insurance", "Medical Services", "Payroll",
        "Research & Development", "Manufacturing Supplies", "Construction", "Logistics",
        "Customer Support", "Security Services", "Miscellaneous"
    ]
    return prompt_expert(
        action_context=action_context,
        description_of_expert="A senior financial analyst with deep expertise in corporate spending categorization.",
        prompt=f"Given the following description: '{description}', classify the expense into one of these categories:\n{categories}\nRespond only with the best category."
    )

@register_tool(tags=["invoice_processing", "validation"])
def check_purchasing_rules(action_context, invoice_data: dict, rules_path="config/purchasing_rules.txt") -> dict:
    """
    Validate an invoice against company purchasing policies (read from rules file).
    Returns compliance status and brief report.
    """
    try:
        with open(rules_path, "r") as f:
            purchasing_rules = f.read()
    except FileNotFoundError:
        return {"compliant": True, "issues": f"No purchasing rules file found at {rules_path}. All invoices assumed compliant."}
    prompt = f"""
Given this invoice data:
{json.dumps(invoice_data, indent=2)}

and these company purchasing rules:
{purchasing_rules}

Analyze the invoice and respond in JSON:
{{"compliant": true|false, "issues": "<brief string>"}}
"""
    return prompt_expert(
        action_context=action_context,
        description_of_expert="A corporate procurement compliance officer with extensive knowledge of purchasing policies.",
        prompt=prompt
    )

# --------- BASIC AGENT CLASSES --------- #
@dataclass
class Memory:
    items: list = field(default_factory=list)
    def add(self, entry: dict): self.items.append(entry)
    def get(self): return self.items

class Action:
    def __init__(self, name, function, description, parameters, terminal=False):
        self.name = name
        self.function = function
        self.description = description
        self.terminal = terminal
        self.parameters = parameters
    def execute(self, **args):
        return self.function(**args)

class PythonActionRegistry:
    def __init__(self, tags=None):
        self.actions = {}
        for tool_name, tool_desc in tools.items():
            if tags and not any(tag in tool_desc.get("tags", []) for tag in tags):
                continue
            self.actions[tool_name] = Action(
                name=tool_name,
                function=tool_desc["function"],
                description=tool_desc["description"],
                parameters=tool_desc["parameters"],
                terminal=tool_desc.get("terminal", False)
            )
    def get_action(self, name): return self.actions.get(name, None)
    def get_actions(self): return list(self.actions.values())

class Environment:
    def execute_action(self, action, args: dict) -> dict:
        try:
            result = action.execute(**args)
            return {
                "tool_executed": True,
                "result": result,
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z")
            }
        except Exception as e:
            return {
                "tool_executed": False,
                "error": str(e),
                "traceback": traceback.format_exc()
            }

# --------- INVOICE DATA EXTRACTION --------- #
def extract_invoice_data(invoice_text):
    # Replace with actual NLP or field extraction logic as needed.
    # Demo logic:
    ret = {
        "invoice_id": "INV-2025-0042",
        "vendor": "AcmeIT",
        "description": "Laptop computers for engineering team",
        "amount": 7700,
        "date": "2025-07-17",
        "line_items": [
            {"item": "Laptop", "qty": 5, "unit_price": 1500}
        ]
    }
    if "Acme" in invoice_text:
        ret["vendor"] = "AcmeIT"
    if "Training" in invoice_text:
        ret["description"] = "Online training course for staff"
        ret["amount"] = 450
    return ret

# --------- MAIN AGENT --------- #
class InvoiceProcessingAgent:
    def __init__(self, rules_path="config/purchasing_rules.txt"):
        self.memory = Memory()
        self.env = Environment()
        self.rules_path = rules_path
        self.tools = PythonActionRegistry(tags=["invoice_processing"])
    def process_invoice(self, invoice_text):
        print("Extracting invoice data...")
        invoice_data = extract_invoice_data(invoice_text)
        self.memory.add({"type": "system", "content": f"Extracted invoice: {json.dumps(invoice_data)}"})
        # 1. Categorize expenditure
        categorize_action = self.tools.get_action("categorize_expenditure")
        cat_result = self.env.execute_action(
            categorize_action, {"action_context": None, "description": invoice_data["description"]}
        )["result"]
        # 2. Check compliance (load rules from rules_path every time!)
        compliance_action = self.tools.get_action("check_purchasing_rules")
        compliance_result = self.env.execute_action(
            compliance_action, {"action_context": None, "invoice_data": invoice_data, "rules_path": self.rules_path}
        )["result"]
        # Store and return structured output
        structured = dict(invoice_data)
        structured["category"] = cat_result
        structured["policy_compliance"] = compliance_result
        self.memory.add({"type": "assistant", "content": json.dumps(structured)})
        return structured

# --------- USAGE EXAMPLE --------- #
if __name__ == "__main__":
    invoice_text = """
    Vendor: AcmeIT
    For: Laptop computers for engineering team
    Amount: $7700
    Date: 2025-07-17
    """
    agent = InvoiceProcessingAgent(rules_path="purchasing_rules.txt")
    result = agent.process_invoice(invoice_text)
    print("\nFINAL STRUCTURED OUTPUT:")
    print(json.dumps(result, indent=2))


Enter your Groq API key:··········
Extracting invoice data...

[LLM Expert: A senior financial analyst with deep expertise in corporate spending categorization.]
Prompt (truncated): Given the following description: 'Laptop computers for engineering team', classify the expense into one of these categories:
['Office Supplies', 'IT Equipment', 'Software Licenses', 'Consulting Services', 'Travel Expenses', 'Marketing', 'Training & Development', 'Facilities Maintenance', '...


[LLM Expert: A corporate procurement compliance officer with extensive knowledge of purchasing policies.]
Prompt (truncated): 
Given this invoice data:
{
  "invoice_id": "INV-2025-0042",
  "vendor": "AcmeIT",
  "description": "Laptop computers for engineering team",
  "amount": 7700,
  "date": "2025-07-17",
  "line_items": [
    {
      "item": "Laptop",
      "qty": 5,
      "unit_price": 1500
    }
  ]
}

and t...


FINAL STRUCTURED OUTPUT:
{
  "invoice_id": "INV-2025-0042",
  "vendor": "AcmeIT",
  "description": "