In [45]:
import json
import http.client
from pathlib import Path
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from enum import Enum
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.tools import BaseTool
from dotenv import load_dotenv
import requests
import os

In [46]:
load_dotenv()

DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY')
SERVER_ENDPOINT = os.getenv('SERVER_ENDPOINT')

In [35]:
########################################
# 1. Define our Pydantic State and Enums
########################################

class AlertType(str, Enum):
    WEATHER = "WEATHER"
    FINANCIAL = "FINANCIAL"
    OPERATIONAL = "OPERATIONAL"
    OTHER = "OTHER"

class ERPAgentState(BaseModel):
    external_factors: List[Dict[str, Any]] = Field(default_factory=list)
    classification_results: List[Dict[str, Any]] = Field(default_factory=list)
    web_search_results: Dict[str, Any] = Field(default_factory=dict)
    final_plan: str = ""


In [44]:
########################################
# 2. Define the Tools
########################################

class WebSearchTool(BaseTool):
    name: str = "web_search_tool"
    description: str = "Search the web for additional ERP-related information"

    def _run(self, query: str) -> str:
        try:
            # Example web search using Serper or any other API
            conn = http.client.HTTPSConnection("google.serper.dev")
            payload = json.dumps({"q": query})
            headers = {
                'X-API-KEY': 'YOUR_SERPER_API_KEY',
                'Content-Type': 'application/json'
            }
            conn.request("POST", "/search", payload, headers)
            res = conn.getresponse()
            data_bytes = res.read()
            data_str = data_bytes.decode("utf-8")
            data = json.loads(data_str)
            return data
        except Exception as e:
            return {"error": f"Error in web search: {str(e)}"}

class ERPDataTool(BaseTool):
    """
    Simulates an ERP endpoint by loading data from 'erp_data.json'.
    """
    name: str = "erp_data_tool"
    description: str = "Fetch ERP data related to external factors."
    json_file: Path = Field(default=Path("erp_data.json"))

    def _run(self, query: str) -> List[Dict[str, Any]]:
        """
        'query' can be used to filter data or to specify what data we want.
        Here, we ignore the query and simply return all external_factors.
        """
        try:
            with open(self.json_file, 'r') as f:
                data = json.load(f)
            # Optionally, you could filter by FactorID or Type...
            return data
        except FileNotFoundError:
            return [{"error": f"File {self.json_file} not found"}]
        except json.JSONDecodeError:
            return [{"error": f"Invalid JSON format in {self.json_file}"}]
        except Exception as e:
            return [{"error": f"Unexpected error: {str(e)}"}]
    
    def run_from_api(self, alert_id: int, base_url: str) -> List[Dict[str, Any]]:
        """
        Fetch ERP data from a remote endpoint (e.g., an internal API).
        We'll assume there's an endpoint like: GET /alerts/{alert_id}.
        Adjust to match your actual API's structure.
        """
        url = f"{base_url}/alerts/{alert_id}"  # e.g. "https://erp.example.com/alerts/101"
        
        try:
            response = requests.get(url, timeout=10)  # Optional: set a timeout
            response.raise_for_status()  # Raise an exception for 4xx/5xx errors
            
            # Assuming the endpoint returns JSON like:
            # {
            #    "FactorID": ...,
            #    "Type": ...,
            #    "Alert": {...},
            #    "Action": {...}
            # }
            data = response.json()
            
            # Convert single dict to a list for uniformity, or adapt as needed
            return [data] if isinstance(data, dict) else data
        except requests.RequestException as e:
            return [{"error": f"Network or HTTP error: {str(e)}"}]
        except ValueError as e:
            # This typically means invalid JSON was returned
            return [{"error": f"JSON parse error: {str(e)}"}]
        except Exception as e:
            return [{"error": f"Unexpected error: {str(e)}"}]

In [None]:
########################################
# 3. Define the Steps (Functions) in the Workflow
########################################

def fetch_erp_data(state: ERPAgentState, tool: ERPDataTool, alert_id: Optional[int] = None) -> ERPAgentState:
    """
    Step 
    : Retrieve external factor data from the ERP (simulated JSON file).
    """
    # We might pass a parameter or factor ID into _run, but for simplicity
    # let's just fetch everything:
    data = tool.run_from_api(alert_id, SERVER_ENDPOINT)

    state.external_factors = data
    return state


def classify_alert_messages(state: ERPAgentState, llm: ChatOpenAI) -> ERPAgentState:
    """
    Step 2: Use an LLM to classify alerts (based on type, description, severity, etc.).
    We'll store the classification inside state.classification_results.
    """
    # Build a prompt that includes each alert and asks for classification:
    prompt_parts = []
    for factor in state.external_factors:
        alert = factor.get("Alert", {})
        prompt_parts.append(f"""
            AlertID: {alert.get("AlertID")}
            Type: {alert.get("Type")}
            Severity: {alert.get("Severity")}
            Description: {alert.get("Description")}
        """)
    prompt_str = "\n".join(prompt_parts)

    # The LLM prompt: we ask it to output a simple mapping of each alert to a recognized enum.
    prompt = f"""
You are an expert in classifying ERP alerts. 
We have the following alerts:

{prompt_str}

Valid AlertType categories are: WEATHER, FINANCIAL, OPERATIONAL, OTHER.

For each alert, output the most appropriate AlertType based on its info.
Provide your answer as a JSON list of objects in the format:
[{{"AlertID": <id>, "Classification": <AlertType>}}, ...]
"""

    response = llm.invoke(prompt)
    # We'll assume the LLM response is valid JSON. In real usage, add error handling.
    try:
        classification_data = json.loads(response.content)
        state.classification_results = classification_data
    except:
        # fallback or error
        state.classification_results = []
    return state


def web_search_for_improved_action(state: ERPAgentState, tool: WebSearchTool) -> ERPAgentState:
    """
    Step 3: Perform web searches to gather additional info for each alert,
    to refine or validate the best action to take.
    """
    # For demonstration, let's just pick the first classification and do a search:
    # (In real usage, you might loop through all classifications or pick critical ones.)
    if not state.classification_results:
        return state

    first_classification = state.classification_results[0]
    classification_type = first_classification.get("Classification", "OTHER")
    search_query = f"Best practices to handle {classification_type} ERP alert"
    search_results = tool._run(search_query)
    state.web_search_results = search_results
    return state


def finalize_action_plan(state: ERPAgentState, llm: ChatOpenAI) -> ERPAgentState:
    """
    Step 4: Combine alert classification, original data, and any web search results
    to recommend a final plan or confirm the recommended action.
    """
    context = {
        "external_factors": state.external_factors,
        "classification_results": state.classification_results,
        "web_search_results": state.web_search_results
    }

    prompt = f"""
Given the following context:
- External Factors Data: {context["external_factors"]}
- Alert Classifications: {context["classification_results"]}
- Web Search Results: {context["web_search_results"]}

Propose a final recommended action plan for each alert. 
Consider severity, classification, and best practices from the search results.
Present the final plan as structured text.
"""

    response = llm.invoke(prompt)
    state.final_plan = response.content
    return state

In [None]:
########################################
# 4. Build the LangGraph Workflow
########################################

def build_erp_workflow(alert_id: Optional[int] = None):
    """
    Constructs the LangGraph workflow for ERP data retrieval,
    alert classification, web search, and final plan creation.
    """
    workflow = StateGraph(ERPAgentState)
    
    # Tools
    erp_tool = ERPDataTool()
    web_search_tool = WebSearchTool()

    # Replace with your actual LLM details or your custom config
    # For demonstration, we use a stand-in model name.
    llm = ChatOpenAI(
        model="deepseek/deepseek-r1-distill-llama-70b:free",
        openai_api_key=DEEPSEEK_API_KEY,
        openai_api_base="https://openrouter.ai/api/v1"
    )

    # Add nodes (steps)
    workflow.add_node("fetch_erp_data", lambda state: fetch_erp_data(state, erp_tool, alert_id=alert_id))
    workflow.add_node("classify_alerts", lambda state: classify_alert_messages(state, llm))
    workflow.add_node("web_search", lambda state: web_search_for_improved_action(state, web_search_tool))
    workflow.add_node("finalize_plan", lambda state: finalize_action_plan(state, llm))

    # Define transitions
    workflow.set_entry_point("fetch_erp_data")
    workflow.add_edge("fetch_erp_data", "classify_alerts")
    workflow.add_edge("classify_alerts", "web_search")
    workflow.add_edge("web_search", "finalize_plan")
    workflow.add_edge("finalize_plan", END)

    # Compile the graph
    return workflow.compile()



In [40]:
########################################
# 5. Run the Workflow
########################################

def main():
    workflow = build_erp_workflow()
    initial_state = ERPAgentState()
    result_state = workflow.invoke(initial_state)

    print("=== Final Plan ===")
    print(result_state['final_plan'])
    return result_state


if __name__ == "__main__":
    result_state = main()

=== Final Plan ===
### Final Recommended Action Plan for Each Alert

#### Alert 1: Weather Alert - Storm Approaching
**Severity:** High  
**Impact Score:** 8.5  
**Description:** A storm is likely to disrupt shipping routes.

**Recommended Action Plan:**
1. **Immediate Notification (ActionID: 201)**  
   - **Responsible Team:** LogisticsTeam  
   - **Action:** Trigger the "NotifyLogistics" action to inform all relevant stakeholders, including shipping partners, customers, and internal teams.  
   - **Deadline:** Within 2 hours of alert detection (2025-02-07T09:30:00Z).  

2. **Assess Shipping Route impact**  
   - **Action:** Conduct a detailed assessment of shipping routes that may be affected by the storm.  
   - **Deadline:** 2025-02-07T12:00:00Z.  

3. **Develop Contingency Plan**  
   - **Action:** Create an alternative shipping plan, including rerouting shipments, securing ships, and ensuring crew safety.  
   - **Deadline:** 2025-02-07T15:00:00Z.  

4. **Accelerate Critical Ship

In [41]:
result_state

{'external_factors': [{'FactorID': 1,
   'Type': 'Weather Alert',
   'Value': 'Storm Approaching',
   'Source': 'WeatherAPI',
   'ImpactScore': 8.5,
   'DateDetected': '2025-02-07',
   'Alert': {'AlertID': 101,
    'Type': 'WEATHER',
    'Severity': 'High',
    'Description': 'A storm is likely to disrupt shipping routes.',
    'Status': 'Open',
    'Timestamp': '2025-02-07T09:30:00Z'},
   'Action': {'ActionID': 201,
    'AlertID': 101,
    'ActionType': 'NotifyLogistics',
    'ResponsibleTeam': 'LogisticsTeam',
    'Status': 'Pending',
    'CompletionDate': None}},
  {'FactorID': 2,
   'Type': 'Financial Alert',
   'Value': 'Market Downturn',
   'Source': 'FinanceAPI',
   'ImpactScore': 7.2,
   'DateDetected': '2025-02-08',
   'Alert': {'AlertID': 102,
    'Type': 'FINANCIAL',
    'Severity': 'Medium',
    'Description': 'Stock prices falling in key sectors.',
    'Status': 'Open',
    'Timestamp': '2025-02-08T12:00:00Z'},
   'Action': {'ActionID': 202,
    'AlertID': 102,
    'Action