In [11]:
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
import json
import psycopg2
import psycopg2.extras
from typing import List, Dict, Any, Optional
from datetime import date

load_dotenv()

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

DB_PARAMS = {
    "dbname": "mcp",
    "user": "postgres",
    "password": "1234",
    "host": "localhost",
    "port": "5432"
}

class PostgresConnection:
    def __init__(self, db_params: dict):
        self.db_params = db_params
        self.conn = None

    def __enter__(self):
        self.conn = psycopg2.connect(**self.db_params)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            self.conn.close()

User_intent_parse

In [49]:
class T1Output(BaseModel):
    # Senario 1
    product_id: str = Field(..., description="Product/Model ID to be transferred")
    requesting_loc_id: str = Field(..., description="Receiving location ID (Dealer etc.)")
    supplying_loc_id: str = Field(..., description="Sending location ID (Plant, Hub etc.)")

class T2Output(BaseModel):
    # Senario 2
    product_id: str = Field(..., description="Product/Model ID to be produced")
    requested_qty: int = Field(..., description="Requested production quantity")
    due_date: str = Field(..., description="Requested completion or delivery date")

class T3Output(BaseModel):
    # Senario 3
    product_id: str = Field(..., description="Product/Model ID")
    target_period: str = Field(..., description="Forecast period (date or week etc.)")
    upcoming_campaigns: str = Field(..., description="Relevant marketing campaigns in period")
    
class T4Output(BaseModel):
    # Senario 4
    product_id: str = Field(..., description="The product ID for the incentive plan.")
    period: str = Field(..., description="The target period for the incentive plan (e.g., 'Q4 2025', 'August 2025').")
    total_budget: int = Field(..., description="The total available budget for the incentive plan, as an integer.")

class T5Output(BaseModel):
    # Senario 5
    component_id: str = Field(..., description="The ID of the component with a shortage.")
    shortage_quantity: int = Field(..., description="The quantity of the component shortage.")

class T6Output(BaseModel):
    # Senario 6
    source_plant_id: str = Field(..., description="The starting point of the route, typically a plant.")
    new_hub_id: str = Field(..., description="The new hub to be considered as a waypoint.")
    target_dealer_id: str = Field(..., description="The final destination of the route, typically a dealer.")

class T7Output(BaseModel):
    # Senario 7
    base_model: str = Field(..., description="The base model of the product lineup to be analyzed.")
    period: str = Field(..., description="The target period for the analysis (e.g., '1 quarter', '1 year').")

class T8Output(BaseModel):
    # Senario 8
    component_id: str = Field(..., description="The ID of the component being discontinued.")
    supplier_id: str = Field(..., description="The ID of the supplier who is discontinuing the part.")
    last_order_date: str = Field(..., description="The final date for placing an order in YYYY-MM-DD format.")
    
class Scenario(BaseModel):
    # For senario 9. A single scenario for financial forecasting.
    scenario_name: str = Field(..., description="The name of the scenario (e.g., 'Base Case', 'High Inflation').")
    inflation_factor: float = Field(..., description="The inflation factor (e.g., 1.0 for base, 1.05 for 5% inflation).")
    price_pass_through_pct: int = Field(..., description="The percentage of cost increase passed to the price (0-100).")

class T9Output(BaseModel):
    # Senario 9
    target_base_model: str = Field(..., description="The target base model to forecast.")
    forecast_horizon_months: int = Field(..., description="The number of months for the forecast horizon.")
    scenarios: List[Scenario] = Field(..., description="A list of scenarios to be simulated.")
    

    
task_map = {
    "t1": T1Output,
    "t2": T2Output,
    "t3": T3Output,
    "t4": T4Output,
    "t5": T5Output,
    "t6": T6Output,
    "t7": T7Output,
    "t8": T8Output,
    "t9": T9Output,
}

def user_intent_parser(task: str, query: str) -> Dict[str, Any]:
    """
    Parses a natural language user query into structured data based on a specified task type. 
    This tool is essential for understanding the user's intent and extracting key parameters for subsequent tool calls.

    Args:
        task (str): The type of intent to parse. 
                    - 't1': For parsing inventory transfer requests.
                    - 't2': For parsing production feasibility checks.
                    - 't3': For parsing demand forecast queries.
                    - 't4': For parsing incentive budget allocation requests.
                    - 't5': For parsing component shortage and sourcing plan requests.
                    - 't6': For parsing logistics route re-optimization requests.
                    - 't7': For parsing product trim simplification requests.
                    - 't8': For parsing End-of-Life (EOL) buy strategy requests.
                    - 't9': For parsing financial forecasts with inflation scenarios.
        query (str): The user's full request in natural language.
    
    Returns:
        Dict[str, Any]: A dictionary containing the extracted information, structured 
                        according to the Pydantic model for the given task.
    """
    output_schema = task_map.get(task)
    if output_schema is None:
        raise ValueError(f"Unsupported task.")

    llm_structured = llm.with_structured_output(output_schema)
    result = llm_structured.invoke(query)
    return result.model_dump()

test_questions = {
    "t1": [
        "We need to send the popular 'SNTF-25-CL-AWD' model from the Ulsan factory (P1_ULSAN) to the Gangnam dealer (DEALER_GANGNAM) in Seoul. What is the optimal quantity?",
        "It is reported that the Daegu dealer (DEALER_DAEGU) is low on stock for the 'IONIQ 6 Long Range AWD' model. How many units should be sent from the central hub (HUB_CENTRAL)?",
        "We are planning to transfer 'GRND-35-EX-2WD' sedan stock from the Asan factory (P2_ASAN) to the Incheon dealer (DEALER_INCHEON). Please provide the optimal transfer quantity.",
        "We need to send the 'SNTF-25-CL-AWD' model from the central hub (HUB_CENTRAL) to the Jeju dealer (DEALER_JEJU). What is the best quantity?",
        "The southern logistics center (WAREHOUSE_SOUTH) is almost out of 'GRND-35-EX-2WD' stock. In this situation, what is the best quantity to send to the Busan dealer (DEALER_BUSAN)?",
        "The Gwangju dealer (DEALER_GWANGJU) reports that the 'IONIQ 6 Long Range AWD' is a particularly slow-moving model at their location. They want to replenish stock while avoiding overstocking. What's the best number to send?",
        "The Gangnam dealer (DEALER_GANGNAM) reports that there are 3 units of 'SNTF-25-CL-AWD' stock remaining. How many additional units should be sent from the central hub?",
        "The sedan model 'GRND-35-EX-2WD' is about to have a model year change, and inventory holding costs are expected to surge. How many should be sent to the Daejeon dealer (DEALER_DAEJEON)?",
        "Please send the 'SNTF-25-CL-AWD' model from the Ulsan factory (P1_ULSAN) to the Suwon dealer (DEALER_SUWON).",
        "We need to replenish the 'MODEL-C-EV' stock for 'DEALER_A' from the central hub (HUB_CENTRAL). What quantity would be good?"
    ],
    "t2": [
        "Is it possible to produce 50 units of 'SNTF-25-CL-AWD' by July 28th?",
        "Is it possible to produce 10 units of 'MODEL-C-EV' by July 24th?",
        "Is it possible to produce 500 units of 'SNTF-25-CL-AWD' by August 29th?",
        "Is it possible to produce 1 unit of 'SNTF-25-CL-AWD' by July 25th?",
        "Is it possible to produce 70 units of 'MODEL-C-EV' by August 8th?",
        "Is it possible to produce 2 units of 'SNTF-25-CL-AWD' by July 24th?",
        "Is it possible to produce 5 units of 'MODEL-B-LUXURY' by July 31st?",
        "Is it possible to produce 15 units of 'MODEL-C-EV' by July 25th?",
        "Is it possible to produce 150 units of 'SNTF-25-CL-AWD' by August 1st?",
        "Is it possible to produce 30 units of 'MODEL-C-EV' by July 25th?"
    ],
    "t3": [
        "What is the demand forecast for 'MODEL-B' for next week, considering the 'Summer Special Promotion' is scheduled?",
        "Given the 'New Customer Discount' campaign, what is the final forecast for the 'SNTF-25-CL-AWD' model in 2 weeks?",
        "With the 'EV Subsidy Matching' campaign running, what is the demand forecast for 'MODEL-C-EV' for next week?",
        "What is the forecast for 'MODEL-A-STD' in 3 weeks, when both the 'Year-End Special Discount' and 'Online Purchase Bonus' campaigns will be active?",
        "Considering the 'Quarter-End Sale', what is the demand forecast for 'MODEL-A-STD' for next week?",
        "For 'MODEL-B', a 'Back-to-School Promotion' is planned in 2 weeks. What is the sales forecast?",
        "What is the final demand forecast for 'MODEL-C-EV' for next week, with the 'Inventory Clearance Sale' taking place?",
        "A 'Test Drive Event' is scheduled for 'SNTF-25-CL-AWD' in 3 weeks. What is the expected demand forecast?",
        "What is the forecast for 'MODEL-B' in 2 weeks, considering the 'Summer Special Promotion' will be running?",
        "With the 'New Customer Discount' scheduled, what is the demand forecast for 'MODEL-C-EV' for next week?"
    ],
    "t4": [
        "What is the optimal incentive budget allocation for 'MODEL-B' in Q4 2025, given a total budget of 10 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-C-EV' in Q4 2025, given a total budget of 15 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-A' in Q4 2025, given a total budget of 8 billion won?",
        "What is the optimal incentive budget allocation for 'SNTF-25-CL-AWD' in Q4 2025, given a total budget of 10 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-C-TRIM2' in August 2025, given a total budget of 5 billion won?",
        "What is the optimal incentive budget allocation for 'IONIQ_6_Long_Range_AWD' in Q4 2025, given a total budget of 12 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-D-TRIM-B' in Q4 2025, given a total budget of 9 billion won?",
        "What is the optimal incentive budget allocation for the 'MODEL-X-LUXURY' in Q4 2025, given a total budget of 5 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-A-SPORT' in the Seoul area for Q4 2025, given a total budget of 3 billion won?",
        "What is the optimal incentive budget allocation for 'MODEL-D-HYBRID' in Q4 2025, given a total budget of 10 billion won?"
    ],
    "t5": [
        "We have a shortage of 2,000 units for component 'P-404-BAT-LG'. What is the optimal sourcing plan?",
        "We need to source 1,000 units of 'P-737-SENSOR' from an alternative supplier. Which one should we choose to minimize lead time?",
        "We have a large shortage of 8,000 units for component 'P-888-CHASSIS'. Please provide a sourcing plan.",
        "Our primary supplier for 'P-515-MOTOR' cannot deliver. We have a shortfall of 12,000 units. What is the emergency plan?",
        "A quality issue has blocked 1,500 units of 'P-626-INVERTER' from our primary supplier. How should we respond?",
        "Due to a tier-2 supplier issue, our primary supplier for 'P-101-BAT' is at risk for 5,000 units. What is the best action?",
        "We have a shortfall of 500 units of 'P-515-MOTOR'. We need to select the lowest cost alternative supplier.",
        "We have a shortage of 300 units of 'P-626-INVERTER'. Are there any alternative suppliers available?",
        "We need 10,000 units of 'P-123-NAV', but the primary supplier has failed. What are the options?",
        "We are short 4,000 units of 'P-DEF-HOSE'. Is the alternative supplier S-N a viable option?"
    ],
    "t6": [
        "What is the optimal route from the P1_ULSAN to the DEALER_BUSAN, considering the southern hub?",
        "What is the optimal route from the P2_ASAN to WAREHOUSE_SOUTH, considering the central hub?",
        "What is the optimal route from the P1_ULSAN to the DEALER_GWANGJU, considering the southern hub?",
        "What is the optimal route from the P2_ASAN to the DEALER_DAEGU, considering the central hub?",
        "What is the optimal route from the P1_ULSAN to the DEALER_BUSAN, considering the southern hub?",
        "What is the optimal route from the P2_ASAN to the P1_ULSAN, considering the central hub?",
        "What is the optimal route from the P1_ULSAN to the DEALER_A, considering the central hub?",
        "What is the optimal route from the P2_ASAN to the DEALER_INCHEON, considering the western hub?",
        "What is the optimal route from the P2_ASAN to the DEALER_SUWON center, considering the central hub?",
        "What is the optimal route from the P1_ULSAN to the DEALER_DAEJEON, considering the southern hub?"
    ],
    "t7": [
        "Which trim of the 'MODEL-B' lineup should we analyze for discontinuation for next quarter?",
        "Please analyze the 'MODEL-B' lineup for next quarter and recommend a trim for simplification.",
        "Please review the status of the 'MODEL-A' trim for next quarter's lineup.",
        "Please recommend two trims to discontinue from the 'MODEL-C' lineup for next year.",
        "Please recommend a trim to simplify from the 'MODEL-B' lineup for next quarter.",
        "Please analyze the 'MODEL-D' lineup for next quarter and recommend one trim to discontinue.",
        "Please analyze the ''Santafe' for next year's eco-friendly vehicle transition.",
        "Please review the 'MODEL-B' trim for next two quarter's lineup.",
        "Please recommend a 'MODEL-B' trim to discontinue for next quarter."
    ],
    "t8": [ 
        "Supplier 'S-B' has announced that part 'P-404-BAT-LG' will be discontinued, and the final order date is January 31, 2026. Please provide the final buy quantity that minimizes total cost.",
        "Part 'P-404-BAT-LG' will be discontinued, and this part has a very high obsolescence cost. In this case, what should the final order quantity be?",
        "A final buy strategy is needed for supplier 'S-C's discontinued part 'P-515-MOTOR'. What is the optimal buy quantity if the last order date is December 15, 2025?",
        "Part 'P-626-INVERTER' will be discontinued by supplier 'S-D' as of February 28, 2026. Please determine the final buy quantity.",
        "The critical component 'P-737-SENSOR', used in safety systems, is being discontinued. Please advise on the final order strategy. The supplier is 'S-E' and the last order date is March 10, 2026.",
        "The 'P-OLD-RADIO' part, used only in older models, is being discontinued. What should we do? The supplier is 'S-F' and the last order date is November 30, 2025.",
        "Supplier 'S-H's part 'P-XYZ-CLIP' is being discontinued, with a final order date of October 31, 2025. Please calculate the final buy quantity.",
        "Please provide the final buy quantity for the discontinued part 'P-515-MOTOR', given our very large current inventory. The supplier is 'S-H' and the last order date is April 15, 2026.",
        "Part 'P-DEF-HOSE' is being discontinued. What is the most economical final order quantity? The supplier is 'S-I' and the final order date is January 20, 2026.",
        "The accessory part 'P-ACC-HOLDER' is being discontinued. Please advise on the final buy strategy. The final order date is December 31, 2025, and the supplier is 'S-C'."
    ],
    "t9": [ 
        "Please provide a comparative financial forecast for the 'SNTF-25-CL-AWD' model over the next 12 months, analyzing a 'Base Scenario' that maintains the current state versus a '5% Inflation Scenario' where only 50% of the cost increase is passed on to the price.",
        "Assuming 5% inflation occurs for the 'MODEL-D-TRIM-A' model over the next 12 months, show the profit and loss difference between a scenario where 100% of the cost increase is passed on to the price versus a scenario where none of it (0%) is passed on.",
        "Conversely, for the 'MODEL-C-EV' model over the next 12 months, what would the profit and loss look like compared to the base scenario if 3% deflation occurs and 50% of the cost reduction is reflected in a sales price decrease?",
        "For the high-profit model 'MODEL-X-LUXURY', compare the financial forecast for the next 12 months with a base scenario, assuming 5% inflation occurs and only 25% of the cost increase can be reflected in the price due to competition.",
        "Please simulate the profit and loss forecast for the entire 'MODEL-A-STD' lineup for the next quarter (3 months) under two cases: a 'base' case and a case with '5% inflation and 50% price pass-through'.",
        "For the 12-month forecast of the 'MODEL-B-STD', please compare the profit and loss difference between a '2% Mild Inflation' scenario and an '8% Severe Inflation' scenario. Assume a 50% price pass-through rate for both cases.",
        "For the 'GRND-35-EX-2WD' model's 12-month outlook under a 5% inflation situation, please compare the gross profit difference between a 75% price pass-through rate and a 25% rate.",
        "Please forecast the 12-month profit and loss for the entire company's model lineup, applying a '4% inflation' and '50% price pass-through' scenario.",
        "Please simulate the financial forecast for the specific trim 'MODEL-D-HYBRID' for the next 6 months using the base scenario.",
        "In a worst-case scenario where costs surge by 10% and 80% of this is immediately reflected in the selling price, what is the 12-month profit and loss for the 'IONIQ_6_Long_Range_AWD'?"
    ]
    
}

def run_tests():
    tasks_to_test = ["t8"]
    #tasks_to_test = test_questions.keys()
    for task_name in tasks_to_test:
        questions = test_questions.get(task_name, [])
        print(f"\n{'='*20} TESTING TASK: {task_name.upper()} {'='*20}")
        for i, q in enumerate(questions):
            print(f"\n--- Task: {task_name}, Question {i+1}/{len(questions)} ---")
            print(f"  [Input Query]: {q}")
            try:
                structured_output = user_intent_parser(task=task_name, query=q)
                print(f"  [Structured Output]:")
                print(json.dumps(structured_output, indent=4, ensure_ascii=False))
            except Exception as e:
                print(f"  [ERROR]: {e}")
    print(f"\n{'='*20} ALL TESTS COMPLETE {'='*20}")

run_tests()



--- Task: t8, Question 1/10 ---
  [Input Query]: Supplier 'S-B' has announced that part 'P-404-BAT-LG' will be discontinued, and the final order date is January 31, 2026. Please provide the final buy quantity that minimizes total cost.
  [Structured Output]:
{
    "component_id": "P-404-BAT-LG",
    "supplier_id": "S-B",
    "last_order_date": "2026-01-31"
}

--- Task: t8, Question 2/10 ---
  [Input Query]: Part 'P-404-BAT-LG' will be discontinued, and this part has a very high obsolescence cost. In this case, what should the final order quantity be?
  [Structured Output]:
{
    "component_id": "P-404-BAT-LG",
    "supplier_id": "SUP-12345",
    "last_order_date": "2023-12-31"
}

--- Task: t8, Question 3/10 ---
  [Input Query]: A final buy strategy is needed for supplier 'S-C's discontinued part 'P-515-MOTOR'. What is the optimal buy quantity if the last order date is December 15, 2025?
  [Structured Output]:
{
    "component_id": "P-515-MOTOR",
    "supplier_id": "S-C",
    "last_or

sales_history_calculator

In [None]:
def calculate_sales_history(product_id: Optional[str] = None, location_id: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    Calculates sales statistics (mean and standard deviation) from 'Sales History' data. 
    Can be filtered by product_id and/or location_id.

    Args:
        product_id (Optional[str]): The unique identifier for the product to filter by.
        location_id (Optional[str]): The unique identifier for the sales location to filter by.
    
    Returns:
        List[Dict[str, Any]]: A list containing a single dictionary with the calculated 'mean' and 
                             'standard_deviation' for the filtered sales data.
    """
    query = """
        SELECT 
            AVG(units_sold) AS mean,
            COALESCE(STDDEV(units_sold), 0) AS standard_deviation
        FROM 
            Sales_History
        WHERE 1=1
    """
    params = []
    if product_id:
        query += " AND product_id = %s"
        params.append(product_id)
        
    if location_id:
        query += " AND location_id = %s"
        params.append(location_id)
        
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, tuple(params))
            return cur.fetchall()
        
print(calculate_sales_history("SNTF-25-CL-AWD"))

[RealDictRow([('mean', Decimal('10.4921875000000000')), ('standard_deviation', Decimal('5.7218961861053482'))])]


read_inventory_history

In [9]:
def read_inventory_history(item_id: str, location_id: str) -> List[Dict[str, Any]]:
    """
    Reads the most recent inventory quantity for a specific item at a specific location.
    """
    query = """
        SELECT 
            quantity_on_hand 
        FROM 
            Inventory_History
        WHERE 
            item_id = %s AND location_id = %s
        ORDER BY 
            snapshot_ts DESC
        LIMIT 1;
    """
    params = (item_id, location_id)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print("--- Running Tool Test on Existing DB Data ---")

item = "SNTF-25-CL-AWD"
location = "P1_ULSAN" 

try:
    result = read_inventory_history(item, location)
    print(f"Tool Output -> {result[0]}")

except Exception as e:
    print(f"Error occured: {e}")

--- Running Tool Test on Existing DB Data ---
Tool Output -> RealDictRow([('quantity_on_hand', 5)])


read_products

In [10]:
def read_products(product_id: str) -> List[Dict[str, Any]]:
    """
    Retrieves all information for a specific product using its product_id.
    
    Args:
        product_id: The unique ID of the product to retrieve.
        
    Returns:
        A list containing a single dictionary with the product's full details.
    """
    query = "SELECT * FROM Products WHERE product_id = %s;"
    params = (product_id,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(read_products("SNTF-25-CL-AWD"))

[RealDictRow([('product_id', 'SNTF-25-CL-AWD'), ('product_name', 'Santafe 2.5T Calligraphy AWD'), ('base_model', 'Santafe'), ('trim_level', 'Calligraphy'), ('product_category', 'SUV'), ('lifecycle_status', 'Active'), ('base_price', Decimal('45000000.00')), ('currency', 'KRW'), ('standard_product_cost', Decimal('38500000.00')), ('end_of_service_date', datetime.date(2030, 12, 31)), ('standard_production_time_hours', 3)])]


evaluate_production_capacity

In [None]:


def evaluate_production_capacity(product_id: str, requested_qty: int, due_date: str) -> List[Dict[str, Any]]:
    """
    Checks if there is enough production capacity to produce a requested quantity of a product by a specific due date.
    It calculates the required hours and compares it with the total available hours from today until the due date.
    
    Args:
        product_id: The ID of the product to be produced.
        requested_qty: The requested quantity for production.
        due_date: The requested completion date in 'YYYY-MM-DD' format.
        
    Returns:
        A list containing a single dictionary with 'is_capacity_available', 'total_available_hours', and 'required_hours'.
    """
    query = """
        WITH product_time AS (
            SELECT standard_production_time_hours 
            FROM Products 
            WHERE product_id = %s
        ), available_capacity AS (
            SELECT SUM(available_hours) as total_available_hours
            FROM Production_Capacity
            WHERE capacity_date BETWEEN CURRENT_DATE AND %s
        )
        SELECT 
            pt.standard_production_time_hours * %s AS required_hours,
            ac.total_available_hours,
            (ac.total_available_hours >= pt.standard_production_time_hours * %s) AS is_capacity_available
        FROM 
            product_time pt, available_capacity ac;
    """
    params = (product_id, due_date, requested_qty, requested_qty)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
        
print(evaluate_production_capacity("SNTF-25-CL-AWD", 500, "2025-08-29"))

[RealDictRow([('required_hours', 1500), ('total_available_hours', Decimal('307.00')), ('is_capacity_available', False)])]


read_bill_of_materials

In [27]:
def read_bill_of_materials(product_id: str) -> List[Dict[str, Any]]:
    """
    Retrieves the entire Bill of Materials (BOM) for a specific product_id.
    This lists all components and their quantities needed to build the product.
    
    Args:
        product_id: The unique ID of the final product.
        
    Returns:
        A list of dictionaries, where each dictionary represents a component line in the BOM.
    """
    query = "SELECT * FROM Bill_of_Materials WHERE product_id = %s;"
    params = (product_id,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(read_bill_of_materials("SNTF-25-CL-AWD"))

[RealDictRow([('bom_line_id', 2071), ('product_id', 'SNTF-25-CL-AWD'), ('component_id', 'P-404-BAT-LG'), ('quantity_per_unit', 1), ('is_critical_in_bom', True)]), RealDictRow([('bom_line_id', 2072), ('product_id', 'SNTF-25-CL-AWD'), ('component_id', 'P-626-INVERTER'), ('quantity_per_unit', 1), ('is_critical_in_bom', True)]), RealDictRow([('bom_line_id', 2073), ('product_id', 'SNTF-25-CL-AWD'), ('component_id', 'P-888-CHASSIS'), ('quantity_per_unit', 1), ('is_critical_in_bom', False)]), RealDictRow([('bom_line_id', 2074), ('product_id', 'SNTF-25-CL-AWD'), ('component_id', 'P-123-NAV'), ('quantity_per_unit', 1), ('is_critical_in_bom', False)]), RealDictRow([('bom_line_id', 2075), ('product_id', 'SNTF-25-CL-AWD'), ('component_id', 'P-234-SEAT'), ('quantity_per_unit', 2), ('is_critical_in_bom', False)])]


read_inventory_history_by_components

In [None]:
def read_inventory_history_by_components(component_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieves the entire inventory history for a given list of component IDs.
    
    Args:
        component_ids: A list of component IDs to search for (e.g., ['P-404-BAT-LG', 'P-505-CHIP']).
                       To search for a single component, provide a list with one item.
        
    Returns:
        A list of dictionaries, where each dictionary is a row from the inventory history 
        matching any of the provided component IDs.
    """
    query = """
        SELECT 
            * FROM 
            Inventory_History
        WHERE 
            item_id = ANY(%s);
    """
    params = (component_ids,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(read_inventory_history_by_components(["SNTF-25-CL-AWD"]))

[RealDictRow([('snapshot_id', 50001), ('snapshot_ts', datetime.datetime(2025, 7, 24, 0, 0)), ('location_id', 'DEALER_GANGNAM'), ('item_id', 'SNTF-25-CL-AWD'), ('item_type', 'PRODUCT'), ('quantity_on_hand', 26), ('inventory_status', 'On_Hold')]), RealDictRow([('snapshot_id', 50002), ('snapshot_ts', datetime.datetime(2025, 7, 24, 0, 0)), ('location_id', 'P1_ULSAN'), ('item_id', 'SNTF-25-CL-AWD'), ('item_type', 'PRODUCT'), ('quantity_on_hand', 14), ('inventory_status', 'On_Hold')]), RealDictRow([('snapshot_id', 50003), ('snapshot_ts', datetime.datetime(2025, 7, 24, 0, 0)), ('location_id', 'DEALER_DAEGU'), ('item_id', 'SNTF-25-CL-AWD'), ('item_type', 'PRODUCT'), ('quantity_on_hand', 42), ('inventory_status', 'On_Hold')]), RealDictRow([('snapshot_id', 50004), ('snapshot_ts', datetime.datetime(2025, 7, 24, 0, 0)), ('location_id', 'HUB_CENTRAL'), ('item_id', 'SNTF-25-CL-AWD'), ('item_type', 'PRODUCT'), ('quantity_on_hand', 15), ('inventory_status', 'Available')]), RealDictRow([('snapshot_id',

read_purchase_order_lines

In [7]:
def read_purchase_order_lines(component_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieves open purchase order lines for a given list of component IDs.
    This is used to check for upcoming, unreceived deliveries.
    
    Args:
        component_ids: A list of component IDs to search for (e.g., ['P-404-BAT-LG', 'P-505-CHIP']).
                       To search for a single component, provide a list with one item.
        
    Returns:
        A list of dictionaries, where each dictionary is an open purchase order line
        matching any of the provided component IDs.
    """
    query = """
        SELECT 
            * FROM 
            Purchase_Order_Lines
        WHERE 
            component_id = ANY(%s) AND line_status = 'OPEN';
    """
    params = (component_ids,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(read_purchase_order_lines(["P-404-BAT-LG"]))

[RealDictRow([('po_line_id', 9001), ('po_id', 'PO-S-B-20250810-001'), ('sourcing_id', 5001), ('component_id', 'P-404-BAT-LG'), ('quantity_ordered', 500), ('quantity_received', 0), ('unit_price', Decimal('25.50')), ('line_total_value', Decimal('12750.00')), ('line_status', 'OPEN'), ('expected_line_delivery_dt', datetime.date(2025, 8, 10))])]


read_sourcing_rules

In [6]:
def read_sourcing_rules(shortages: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieves sourcing rules for a given list of component IDs.

    Args:
        shortages: A list of component_ids for which to find sourcing rules.
        
    Returns:
        A list of dictionaries containing the sourcing rules for the specified components.
    """
    query = """
        SELECT 
            * FROM 
            Sourcing_Rules
        WHERE 
            component_id = ANY(%s);
    """
    params = (shortages,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(read_sourcing_rules(["P-404-BAT-LG"]))

[RealDictRow([('sourcing_id', 5001), ('component_id', 'P-404-BAT-LG'), ('partner_id', 'S-B'), ('is_primary_supplier', True), ('volume_pricing_json', '{"tiers": [{"min_qty": 10000, "price": 25.50}, {"min_qty": 50000, "price": 24.00}, {"min_qty": 100000, "price": 22.50}]}'), ('unit_price', Decimal('25.50')), ('currency', 'USD'), ('min_order_qty', 10000), ('lead_time_days', 30), ('committed_capacity_monthly', 50000), ('max_capacity_monthly', 80000)]), RealDictRow([('sourcing_id', 5002), ('component_id', 'P-404-BAT-LG'), ('partner_id', 'S-C'), ('is_primary_supplier', False), ('volume_pricing_json', '{"tiers": [{"min_qty": 10000, "price": 25.80}, {"min_qty": 50000, "price": 24.50}]}'), ('unit_price', Decimal('25.80')), ('currency', 'USD'), ('min_order_qty', 10000), ('lead_time_days', 40), ('committed_capacity_monthly', 20000), ('max_capacity_monthly', 40000)])]


read_marketing_campaigns

In [5]:
def read_marketing_campaigns(product_id: str, upcoming_campaigns: List[str], baseline_forecast_mc: int) -> List[Dict[str, Any]]:
    """
    Calculates the expected sales uplift quantity from specified marketing campaigns.

    Args:
        product_id: The ID of the product to forecast.
        upcoming_campaigns: A list of campaign names active in the target period.
        baseline_forecast_mc: The baseline demand forecast before applying campaign effects.
        
    Returns:
        A list containing a single dictionary with the calculated 'campaign_uplift_qty'.
    """
    query = """
        SELECT 
            COALESCE(SUM(predicted_uplift_pct), 0) as total_uplift_pct
        FROM 
            Marketing_Campaigns
        WHERE 
            target_product_id = %s AND campaign_name = ANY(%s);
    """
    params = (product_id, upcoming_campaigns)
    
    total_uplift_pct = 0
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            result = cur.fetchone()
            if result:
                total_uplift_pct = result['total_uplift_pct']

    campaign_uplift_quantity = baseline_forecast_mc * (total_uplift_pct / 100)
    
    return [{"campaign_uplift_qty": int(campaign_uplift_quantity)}]

print(read_marketing_campaigns("MODEL-B", ["Back-to-School Promotion"], 200))

[{'campaign_uplift_qty': 11}]


montecarlo_shortage

In [None]:

def montecarlo_shortage(
    current_inv: int,
    model_type: str,
    location_id: str, 
    transfer_cost_per_unit: int = 40,
    lost_sale_penalty: int = 150,
    holding_cost_per_unit_week: int = 10,
    demand_lambda: float = 30,
    mean: float = 30.0,
    standard_deviation: float = 5.0,
    planning_horizon_weeks: int = 1
) -> Dict[str, Any]:
    """
    
    Args:
        current_inv (int): Current inventory level.
        model_type (str): Car model type for which the transfer quantities are calculated.
        location_id (str): The ID of the supplying location, which affects the cost curve.
        transfer_cost_per_unit (int): Cost incurred for transferring one unit of inventory.
        lost_sale_penalty (int): Penalty incurred for each unit of lost sale.
        holding_cost_per_unit_week (int): Cost incurred for holding one unit of inventory.
        demand_lambda (float): Lambda parameter for the Poisson distribution to model demand.
        mean (float): Mean of the normal distribution for demand.
        standard_deviation (float): Standard deviation of the normal distribution for demand.
        planning_horizon_weeks (int): Planning horizon in weeks for the Monte Carlo simulation.
    
    Returns:
        Dict[str, Any]: A dictionary containing the transfer quantities and the optimal transfer quantity.
    """
    answer = {
        "SNTF-25-CL-AWD": {
            "P1_ULSAN": {
                "optimal_quantity": 25,
                "transfer_quantities": {5: 78, 10: 63, 15: 52, 20: 47, 25: 11, 30: 66, 35: 59, 40: 80, 45: 70, 50: 61}
            },
            "HUB_CENTRAL": {
                "optimal_quantity": 10,
                "transfer_quantities": {5: 58, 10: 45, 15: 59, 20: 62, 25: 61, 30: 53, 35: 57, 40: 60, 45: 64, 50: 55}
            }
        },
        "IONIQ 6 Long Range AWD": {
            "HUB_CENTRAL": {
                "optimal_quantity": 10,
                "transfer_quantities": {5: 67, 10: 12, 15: 74, 20: 59, 25: 83, 30: 70, 35: 91, 40: 76, 45: 88, 50: 95}
            },
            "WAREHOUSE_SOUTH": {
                "optimal_quantity": 5,
                "transfer_quantities": {5: 7, 10: 80, 15: 76, 20: 91, 25: 83, 30: 88, 35: 92, 40: 94, 45: 85, 50: 90}
            }
        },
        "GRND-35-EX-2WD": {
            "P2_ASAN": {
                "optimal_quantity": 45,
                "transfer_quantities": {5: 73, 10: 88, 15: 92, 20: 81, 25: 95, 30: 85, 35: 90, 40: 87, 45: 11, 50: 78}
            },
            "WAREHOUSE_SOUTH": {
                "optimal_quantity": 5,
                "transfer_quantities": {5: 13, 10: 77, 15: 93, 20: 81, 25: 89, 30: 85, 35: 95, 40: 98, 45: 90, 50: 97}
            },
            "HUB_CENTRAL": {
                "optimal_quantity": 15,
                "transfer_quantities": {5: 72, 10: 74, 15: 60, 20: 70, 25: 78, 30: 71, 35: 73, 40: 75, 45: 76, 50: 77}
            }
        },
        "MODEL-C-EV": {
            "HUB_CENTRAL": {
                "optimal_quantity": 15,
                "transfer_quantities": {5: 53, 10: 54, 15: 41, 20: 59, 25: 48, 30: 57, 35: 50, 40: 60, 45: 52, 50: 56}
            }
        }
    }
    model_data = answer.get(model_type, {})
    result_data = model_data.get(location_id)

    if not result_data:
        raise ValueError(f"Simulation data for model '{model_type}' at location '{location_id}' is not supported.")
    
    transfer_quantities = result_data["transfer_quantities"]
    optimal_q = result_data["optimal_quantity"]

    return {"transfer_quantities": transfer_quantities, "optimal_transfer_quantity": optimal_q}

print(montecarlo_shortage(50, "SNTF-25-CL-AWD", "P1_ULSAN"))

{'transfer_quantities': {5: 78, 10: 63, 15: 52, 20: 47, 25: 11, 30: 66, 35: 59, 40: 80, 45: 70, 50: 61}, 'optimal_transfer_quantity': 25}


montecarlo_demand

In [None]:
def montecarlo_demand(
    model_type: str,
    current_inv: int = 20,
    location_id: str = None, 
    transfer_cost_per_unit: int = 40,
    lost_sale_penalty: int = 150,
    holding_cost_per_unit_week: int = 10,
    demand_lambda: float = 30,
    mean: float = 30.0,
    standard_deviation: float = 5.0,
    planning_horizon_weeks: int = 1
) -> Dict[str, Any]:

    baseline_forecasts = {
        "SNTF-25-CL-AWD": 1150,
        "MODEL-C-EV": 980,
        "MODEL-A-STD": 800,
        "MODEL-B": 1050 
    }
    
    forecast_value = baseline_forecasts.get(model_type)
    
    if forecast_value is None:
        raise ValueError(f"Baseline forecast for model type '{model_type}' is not available.")
    
    return {"baseline_forecast_mc": forecast_value}

print(montecarlo_demand("SNTF-25-CL-AWD"))

{'baseline_forecast_mc': 1150}


retrieve_primary_partners

In [13]:
def retrieve_primary_partners(component_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieves the primary partner (supplier) for a given list of component IDs from the Sourcing_Rules table.

    Args:
        component_ids: A list of component IDs to find the primary suppliers for.
        
    Returns:
        A list of dictionaries, each containing the component_id and its corresponding primary partner_id.
    """
    query = """
        SELECT
            component_id,
            partner_id
        FROM
            Sourcing_Rules
        WHERE
            component_id = ANY(%s) AND is_primary_supplier = TRUE;
    """
    params = (component_ids,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()
        
print(retrieve_primary_partners(["P-404-BAT-LG"]))

[RealDictRow([('component_id', 'P-404-BAT-LG'), ('partner_id', 'S-B')])]


search_alternative_suppliers

In [16]:
def search_alternative_suppliers(component_id: str, primary_supplier_id: str) -> Dict[str, Any]:
    """
    Searches for all alternative suppliers for a given component, excluding the primary supplier.
    It joins Sourcing_Rules and Partners tables to provide comprehensive details for each option.

    Args:
        component_id: The ID of the component that has a shortage.
        primary_supplier_id: The ID of the primary supplier to be excluded from the search results.

    Returns:
        A dictionary with a single key 'mitigation_options' containing a list of
        potential alternative suppliers and their terms.
    """
    query = """
        SELECT
            'ALTERNATIVE_SUPPLIER' AS type,
            sr.component_id,
            sr.partner_id AS supplier_id,
            p.partner_name AS supplier_name,
            sr.lead_time_days,
            sr.unit_price,
            p.quality_score
        FROM
            Sourcing_Rules sr
        JOIN
            Partners p ON sr.partner_id = p.partner_id
        WHERE
            sr.component_id = %s
            AND sr.partner_id != %s;
    """
    params = (component_id, primary_supplier_id)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            mitigation_options = cur.fetchall()
            return {"mitigation_options": mitigation_options}
        
print(search_alternative_suppliers('P-404-BAT-LG', 'S-B'))

{'mitigation_options': [RealDictRow([('type', 'ALTERNATIVE_SUPPLIER'), ('component_id', 'P-404-BAT-LG'), ('supplier_id', 'S-C'), ('supplier_name', 'Samsung SDI'), ('lead_time_days', 40), ('unit_price', Decimal('25.80')), ('quality_score', 96)]), RealDictRow([('type', 'ALTERNATIVE_SUPPLIER'), ('component_id', 'P-404-BAT-LG'), ('supplier_id', 'S-G'), ('supplier_name', 'G-Power Systems'), ('lead_time_days', 35), ('unit_price', Decimal('26.10')), ('quality_score', 91)])]}


select_optimal_supplier

In [17]:
def select_optimal_supplier(
    mitigation_options: List[Dict[str, Any]], 
    shortage_quantity: int
) -> Dict[str, Any]:
    """
    Evaluates a list of alternative suppliers based on a weighted score of quality, lead time, and cost, 
    and selects the optimal one.

    Args:
        mitigation_options: A list of dictionaries, where each dictionary is an alternative supplier option from 
                            the 'search_alternative_suppliers' tool.
        shortage_quantity: The number of units that need to be sourced.

    Returns:
        A dictionary containing the 'selected_solution' and a 'rationale' for the decision.
    """

    if not mitigation_options:
        return {
            "selected_solution": {
                "type": "SUPPLY_UNAVAILABLE",
                "reason": "No alternative suppliers were found for the component."
            },
            "rationale": "No sourcing options available in the Sourcing_Rules table."
        }

    max_lead_time = max(opt['lead_time_days'] for opt in mitigation_options)
    max_unit_price = max(float(opt['unit_price']) for opt in mitigation_options)

    scored_options = []
    for option in mitigation_options:
        
        norm_quality = option['quality_score'] / 100.0

        norm_lead_time = 1.0 - (option['lead_time_days'] / max_lead_time) if max_lead_time > 0 else 0

        norm_cost = 1.0 - (float(option['unit_price']) / max_unit_price) if max_unit_price > 0 else 0

        score = (norm_quality * 0.6) + (norm_lead_time * 0.25) + (norm_cost * 0.15)

        option_with_score = option.copy()
        option_with_score['score'] = round(score, 4)
        scored_options.append(option_with_score)

    best_option = max(scored_options, key=lambda x: x['score'])

    selected_solution = {
        "type": "ALTERNATIVE_SUPPLIER",
        "component_id": best_option['component_id'],
        "supplier_id": best_option['supplier_id'],
        "supplier_name": best_option['supplier_name'],
        "quantity_to_order": shortage_quantity,
        "score": best_option['score']
    }
    
    rationale = (
        f"Selected supplier '{best_option['supplier_name']}' ({best_option['supplier_id']}) due to the highest overall score of {best_option['score']}. "
        f"This decision was based on a weighted evaluation of its quality score ({best_option['quality_score']}), "
        f"lead time ({best_option['lead_time_days']} days), and unit price (${float(best_option['unit_price']):.2f})."
    )

    return {"selected_solution": selected_solution, "rationale": rationale}

sample_input_data = {
    'mitigation_options': [
        {
            'type': 'ALTERNATIVE_SUPPLIER', 'component_id': 'P-404-BAT-LG', 'supplier_id': 'S-C',
            'supplier_name': 'Samsung SDI', 'lead_time_days': 40, 'unit_price': 25.80, 'quality_score': 96
        },
        {
            'type': 'ALTERNATIVE_SUPPLIER', 'component_id': 'P-404-BAT-LG', 'supplier_id': 'S-G',
            'supplier_name': 'G-Power Systems', 'lead_time_days': 35, 'unit_price': 26.10, 'quality_score': 91
        }
    ]
}

sample_shortage_quantity = 4000 
print(select_optimal_supplier(sample_input_data['mitigation_options'], sample_shortage_quantity))

{'selected_solution': {'type': 'ALTERNATIVE_SUPPLIER', 'component_id': 'P-404-BAT-LG', 'supplier_id': 'S-C', 'supplier_name': 'Samsung SDI', 'quantity_to_order': 4000, 'score': 0.5777}, 'rationale': "Selected supplier 'Samsung SDI' (S-C) due to the highest overall score of 0.5777. This decision was based on a weighted evaluation of its quality score (96), lead time (40 days), and unit price ($25.80)."}


model_transportation_routes

In [21]:
def model_transportation_routes(
    source_plant_id: str, 
    new_hub_id: str, 
    target_dealer_id: str
) -> Dict[str, Any]:
    """
    Models and compares two transportation routes: a direct route and a new route via a hub.
    It retrieves cost and time for each segment from the Transportation_Lanes and Locations tables.

    Args:
        source_plant_id: The starting location ID (typically a plant).
        new_hub_id: The ID of the hub to be considered as a waypoint.
        target_dealer_id: The final destination ID (typically a dealer).

    Returns:
        A dictionary containing two models: 'current_route_model' for the direct path, 
        and 'new_route_model' for the path through the hub.
    """
    
    current_route_model = {}
    new_route_model = {}

    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(
                "SELECT lane_id, standard_transit_hr, standard_cost_per_shipment FROM Transportation_Lanes WHERE origin_loc_id = %s AND dest_loc_id = %s",
                (source_plant_id, target_dealer_id)
            )
            direct_route = cur.fetchone()
            if direct_route:
                current_route_model = {
                    "lane_id": direct_route['lane_id'],
                    "total_transit_hr": float(direct_route['standard_transit_hr']),
                    "direct_cost": float(direct_route['standard_cost_per_shipment'])
                }

            cur.execute(
                "SELECT lane_id, standard_transit_hr, standard_cost_per_shipment FROM Transportation_Lanes WHERE origin_loc_id = %s AND dest_loc_id = %s",
                (source_plant_id, new_hub_id)
            )
            leg1 = cur.fetchone()

            cur.execute(
                "SELECT location_id, avg_handling_hr, handling_cost_per_unit FROM Locations WHERE location_id = %s",
                (new_hub_id,)
            )
            hub = cur.fetchone()

            cur.execute(
                "SELECT lane_id, standard_transit_hr, standard_cost_per_shipment FROM Transportation_Lanes WHERE origin_loc_id = %s AND dest_loc_id = %s",
                (new_hub_id, target_dealer_id)
            )
            leg2 = cur.fetchone()

            new_route_model = {
                "leg1": {
                    "lane_id": leg1['lane_id'] if leg1 else None,
                    "transit_hr": float(leg1['standard_transit_hr']) if leg1 else 0,
                    "cost": float(leg1['standard_cost_per_shipment']) if leg1 else 0
                },
                "hub": {
                    "location_id": hub['location_id'] if hub else new_hub_id,
                    "handling_hr": float(hub['avg_handling_hr']) if hub else 0,
                    "handling_cost": float(hub['handling_cost_per_unit']) if hub else 0
                },
                "leg2": {
                    "lane_id": leg2['lane_id'] if leg2 else None,
                    "transit_hr": float(leg2['standard_transit_hr']) if leg2 else 0,
                    "cost": float(leg2['standard_cost_per_shipment']) if leg2 else 0
                }
            }

    return {
        "current_route_model": current_route_model,
        "new_route_model": new_route_model
    }
    
sample_source = "P1_ULSAN"
sample_hub = "SOUTHERN_HUB"
sample_target = "DEALER_BUSAN"

test_result = model_transportation_routes(sample_source, sample_hub, sample_target)
print(json.dumps(test_result, indent=4))

{
    "current_route_model": {
        "lane_id": "P1_ULSAN-DEALER_BUSAN",
        "total_transit_hr": 1.2,
        "direct_cost": 60000.0
    },
    "new_route_model": {
        "leg1": {
            "lane_id": "P1_ULSAN-SOUTHERN_HUB",
            "transit_hr": 0.8,
            "cost": 35000.0
        },
        "hub": {
            "location_id": "SOUTHERN_HUB",
            "handling_hr": 1.5,
            "handling_cost": 1200.0
        },
        "leg2": {
            "lane_id": "SOUTHERN_HUB-DEALER_BUSAN",
            "transit_hr": 0.5,
            "cost": 25000.0
        }
    }
}


compare_route_effectiveness

In [22]:
def compare_route_effectiveness(
    current_route_model: Dict[str, Any], 
    new_route_model: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Calculates and compares the total time and 'effective total cost' for a direct route and a new route via a hub.
    Effective cost includes a time-based cost (500 per hour).

    Args:
        current_route_model: A dictionary representing the direct route with 'total_transit_hr' and 'direct_cost'.
        new_route_model: A nested dictionary representing the hub route with 'leg1', 'hub', and 'leg2' details.

    Returns:
        A dictionary containing a 'comparison_summary' of both routes.
    """

    current_route_summary = {}
    if current_route_model:
        direct_time = current_route_model.get('total_transit_hr', 0)
        direct_cost = current_route_model.get('direct_cost', 0)
        effective_cost = direct_cost + (direct_time * 500)
        
        current_route_summary = {
            "total_time_hr": direct_time,
            "direct_cost": direct_cost,
            "effective_total_cost": effective_cost
        }

    leg1 = new_route_model.get('leg1', {})
    hub = new_route_model.get('hub', {})
    leg2 = new_route_model.get('leg2', {})
    
    new_total_time = leg1.get('transit_hr', 0) + hub.get('handling_hr', 0) + leg2.get('transit_hr', 0)
    new_direct_cost = leg1.get('cost', 0) + hub.get('handling_cost', 0) + leg2.get('cost', 0)
    new_effective_cost = new_direct_cost + (new_total_time * 500)
    
    new_route_summary = {
        "total_time_hr": new_total_time,
        "direct_cost": new_direct_cost,
        "effective_total_cost": new_effective_cost
    }

    return {
        "comparison_summary": {
            "current_route": current_route_summary,
            "new_route": new_route_summary
        }
    }
    
sample_route_models = {
    "current_route_model": {
        "lane_id": "P1_ULSAN-DEALER_BUSAN",
        "total_transit_hr": 1.2,
        "direct_cost": 60000.0
    },
    "new_route_model": {
        "leg1": {
            "lane_id": "P1_ULSAN-SOUTHERN_HUB",
            "transit_hr": 0.8,
            "cost": 35000.0
        },
        "hub": {
            "location_id": "SOUTHERN_HUB",
            "handling_hr": 1.5,
            "handling_cost": 1200.0
        },
        "leg2": {
            "lane_id": "SOUTHERN_HUB-DEALER_BUSAN",
            "transit_hr": 0.5,
            "cost": 25000.0
        }
    }
}
    
comparison_summary = compare_route_effectiveness(
    current_route_model=sample_route_models['current_route_model'],
    new_route_model=sample_route_models['new_route_model']
)

print(json.dumps(comparison_summary, indent=4))    

{
    "comparison_summary": {
        "current_route": {
            "total_time_hr": 1.2,
            "direct_cost": 60000.0,
            "effective_total_cost": 60600.0
        },
        "new_route": {
            "total_time_hr": 2.8,
            "direct_cost": 61200.0,
            "effective_total_cost": 62600.0
        }
    }
}


generate_route_recommendation

In [23]:
def generate_route_recommendation(comparison_summary: Dict[str, Any]) -> Dict[str, Any]:
    """
    Based on the effective total cost comparison, determines the optimal route and 
    generates the final recommendation answer to present to the user.

    Args:
        comparison_summary: The output from the 'compare_route_effectiveness' tool, 
                            containing cost/time details for both the current and new routes.

    Returns:
        A dictionary containing the final answer, which includes the recommendation, 
        justification, and a cost comparison.
    """
    
    current_route = comparison_summary.get('current_route', {})
    new_route = comparison_summary.get('new_route', {})

    current_effective_cost = current_route.get('effective_total_cost', float('inf'))
    new_effective_cost = new_route.get('effective_total_cost', float('inf'))
    
    recommendation = ""
    justification = ""
    
    if new_effective_cost < current_effective_cost:
        recommendation = "New Route via Hub"
        justification = "The route via the new hub is more efficient in terms of total effective cost."
    else:
        recommendation = "Current Direct Route"
        justification = "The existing direct route is more or equally efficient in terms of total effective cost."

    if current_effective_cost == float('inf') and new_effective_cost == float('inf'):
         recommendation = "No Route Found"
         justification = "Could not find a valid direct or hub-based route between the specified locations."

    expected_savings = current_effective_cost - new_effective_cost if recommendation == "New Route via Hub" else 0

    final_answer = {
        "recommendation": recommendation,
        "justification": justification,
        "comparison": {
            "current_route_effective_cost": current_effective_cost if current_route else None,
            "new_route_effective_cost": new_effective_cost,
            "expected_savings": expected_savings
        }
    }
    
    return {"final_answer": final_answer}

sample_comparison_summary = {
    "comparison_summary": {
        "current_route": {
            "total_time_hr": 1.2,
            "direct_cost": 60000.0,
            "effective_total_cost": 60600.0
        },
        "new_route": {
            "total_time_hr": 2.8,
            "direct_cost": 61200.0,
            "effective_total_cost": 62600.0
        }
    }
}

final_recommendation = generate_route_recommendation(sample_comparison_summary['comparison_summary'])

print(json.dumps(final_recommendation, indent=4))

{
    "final_answer": {
        "recommendation": "Current Direct Route",
        "justification": "The existing direct route is more or equally efficient in terms of total effective cost.",
        "comparison": {
            "current_route_effective_cost": 60600.0,
            "new_route_effective_cost": 62600.0,
            "expected_savings": 0
        }
    }
}


aggregate_trim_performance

In [35]:
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta
import re

def aggregate_trim_performance(
    base_model: str, 
    period: str
) -> Dict[str, Any]:
    """
    Aggregates historical performance data for all trims of a target base_model for a given period.
    It calculates the unit margin for each trim by joining Products and Sales_History tables.

    Args:
        base_model: The base model of the product lineup to be analyzed (e.g., 'MODEL-B').
        period: The target period for the analysis (e.g., '1 quarter', '1 year').

    Returns:
        A dictionary containing a 'trim_performance_data' list.
    """
    today = date.today()
    end_date = today
    start_date = today - relativedelta(years=1) 

    try:
        parts = period.split()
        if len(parts) == 2 and parts[0].isdigit():
            value = int(parts[0])
            unit = parts[1].lower()
            if 'quarter' in unit:
                start_date = today - relativedelta(months=3 * value)
            elif 'year' in unit:
                start_date = today - relativedelta(years=value)
            elif 'month' in unit:
                start_date = today - relativedelta(months=value)
            elif 'week' in unit:
                start_date = today - relativedelta(weeks=value)
            elif 'day' in unit:
                start_date = today - relativedelta(days=value)
    except Exception:
        pass

    query = """
        SELECT
            p.product_id,
            p.standard_product_cost,
            p.standard_production_time_hours,
            COALESCE(AVG(sh.selling_price_per_unit), 0) - p.standard_product_cost AS unit_margin
        FROM
            Products p
        LEFT JOIN
            Sales_History sh ON p.product_id = sh.product_id AND sh.sales_dt BETWEEN %s AND %s
        WHERE
            p.base_model = %s
        GROUP BY
            p.product_id, p.standard_product_cost, p.standard_production_time_hours
        ORDER BY
            p.product_id;
    """
    params = (start_date, end_date, base_model)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            performance_data = cur.fetchall()
            return {"trim_performance_data": performance_data}
        
print(aggregate_trim_performance("Model-C", "1 quarter"))

{'trim_performance_data': [RealDictRow([('product_id', 'MODEL-C-EV'), ('standard_product_cost', Decimal('46000000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('4981834.235772357724'))]), RealDictRow([('product_id', 'MODEL-C-TRIM2'), ('standard_product_cost', Decimal('42000000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('-42000000.00'))]), RealDictRow([('product_id', 'MODEL-C-TRIM4'), ('standard_product_cost', Decimal('41500000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('-41500000.00'))])]}


identify_efficiency_outliers

In [36]:
from decimal import Decimal

def identify_efficiency_outliers(
    trim_performance_data: List[Dict[str, Any]]
) -> Dict[str, Any]:
    """
    Identifies the most and least efficient trims from a list of performance data 
    by calculating a margin-to-cost efficiency ratio for each.

    Args:
        trim_performance_data: The list of trim performance dictionaries, typically from the 
                               'aggregate_trim_performance' tool. Each dictionary must contain 
                               'product_id', 'standard_product_cost', 'standard_production_time_hours', 
                               and 'unit_margin'.

    Returns:
        A dictionary containing the 'least_efficient_trim' and 'most_efficient_trim',
        each with their key performance details.
    """
    if not trim_performance_data:
        raise ValueError("Input 'trim_performance_data' cannot be empty.")

    scored_trims = []
    for trim in trim_performance_data:
        cost = float(trim.get('standard_product_cost', 0))
        margin = float(trim.get('unit_margin', 0))

        if cost > 0:
            efficiency_ratio = margin / cost
        else:
            efficiency_ratio = 0

        scored_trim = trim.copy()
        scored_trim['efficiency_ratio'] = efficiency_ratio
        scored_trims.append(scored_trim)

    most_efficient = max(scored_trims, key=lambda x: x['efficiency_ratio'])
    least_efficient = min(scored_trims, key=lambda x: x['efficiency_ratio'])

    def format_output(trim_data):
        return {
            "product_id": trim_data['product_id'],
            "production_time": trim_data['standard_production_time_hours'],
            "unit_margin": float(trim_data['unit_margin'])
        }

    return {
        "least_efficient_trim": format_output(least_efficient),
        "most_efficient_trim": format_output(most_efficient)
    }
    
sample_data = {
    'trim_performance_data': [
        {'product_id': 'MODEL-C-EV', 'standard_product_cost': Decimal('46000000.00'), 'standard_production_time_hours': 4, 'unit_margin': Decimal('4981834.23')},
        {'product_id': 'MODEL-C-TRIM2', 'standard_product_cost': Decimal('42000000.00'), 'standard_production_time_hours': 4, 'unit_margin': Decimal('-42000000.00')},
        {'product_id': 'MODEL-C-TRIM4', 'standard_product_cost': Decimal('41500000.00'), 'standard_production_time_hours': 4, 'unit_margin': Decimal('-41500000.00')}
    ]
}

result = identify_efficiency_outliers(sample_data['trim_performance_data'])
print(json.dumps(result, indent=4))

{
    "least_efficient_trim": {
        "product_id": "MODEL-C-TRIM2",
        "production_time": 4,
        "unit_margin": -42000000.0
    },
    "most_efficient_trim": {
        "product_id": "MODEL-C-EV",
        "production_time": 4,
        "unit_margin": 4981834.23
    }
}


calculate_optimal_shift

In [45]:
def calculate_optimal_shift(
    least_efficient_trim: Dict[str, Any], 
    most_efficient_trim: Dict[str, Any],
    period: str = "1 quarter"
) -> Dict[str, Any]:
    """
    Calculates the optimal production shift from a low-efficiency trim to a high-efficiency trim,
    considering market demand and production capacity constraints.

    Args:
        least_efficient_trim: A dictionary with details of the trim to reduce production for.
        most_efficient_trim: A dictionary with details of the trim to reallocate production to.
        period: The future period for which to calculate the shift (e.g., '1 quarter', '1 month').

    Returns:
        A dictionary containing the optimal simulation parameters including what to reduce, 
        what to reallocate to, and the binding constraint.
    """
    most_efficient_id = most_efficient_trim['product_id']
    most_efficient_time = most_efficient_trim['production_time']
    least_efficient_id = least_efficient_trim['product_id']
    least_efficient_time = least_efficient_trim['production_time']

    today = date.today()
    start_date = today
    
    parts = period.split()
    if len(parts) == 2 and parts[0].isdigit():
        value = int(parts[0])
        unit = parts[1].lower()
        if 'quarter' in unit:
            end_date = today + relativedelta(months=3 * value)
        elif 'year' in unit:
            end_date = today + relativedelta(years=value)
        elif 'month' in unit:
            end_date = today + relativedelta(months=value)
        else:
            raise ValueError(f"Unknown time unit in period: '{parts[1]}'")
    else:
        raise ValueError(f"Invalid period format: '{period}'. Expected format like '1 quarter' or '2 years'.")

    market_constraint_qty = 0
    production_constraint_hours = 0
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            start_period_str = start_date.strftime('%Y-%m')
            end_period_str = end_date.strftime('%Y-%m')

            cur.execute(
                "SELECT COALESCE(SUM(forecasted_qty), 0) as total_forecast FROM Demand_Forecast_Log WHERE product_id = %s AND forecast_source = 'FINAL_PLAN' AND target_period BETWEEN %s AND %s",
                (most_efficient_id, start_period_str, end_period_str)
            )
            result = cur.fetchone()
            if result:
                market_constraint_qty = result['total_forecast']

            cur.execute(
                "SELECT COALESCE(SUM(available_hours), 0) as total_available FROM Production_Capacity WHERE capacity_date BETWEEN %s AND %s",
                (start_date, end_date)
            )
            result = cur.fetchone()
            if result:
                production_constraint_hours = result['total_available']

    max_producible_qty = production_constraint_hours / most_efficient_time if most_efficient_time > 0 else 0
    reallocate_qty = min(market_constraint_qty, max_producible_qty)
    binding_constraint = "Market Demand Forecast" if market_constraint_qty < max_producible_qty else "Production Capacity"

    hours_to_free_up = reallocate_qty * most_efficient_time
    reduce_qty = hours_to_free_up / least_efficient_time if least_efficient_time > 0 else 0

    result = {
        "optimal_simulation_parameters": {
            "reduce_target": {
                "product_id": least_efficient_id,
                "quantity": int(reduce_qty)
            },
            "reallocate_to": {
                "product_id": most_efficient_id,
                "quantity": int(reallocate_qty)
            },
            "binding_constraint": binding_constraint
        }
    }
    return result

sample_input = {
    "least_efficient_trim": {
        "product_id": "MODEL-C-TRIM2",
        "production_time": 4,
        "unit_margin": -42000000.0
    },
    "most_efficient_trim": {
        "product_id": "MODEL-C-EV",
        "production_time": 4,
        "unit_margin": 4981834.23
    }
}

optimal_shift = calculate_optimal_shift(
    least_efficient_trim=sample_input['least_efficient_trim'],
    most_efficient_trim=sample_input['most_efficient_trim'],
    period = "1 quarter"
)

print(json.dumps(optimal_shift, indent=4))

{
    "optimal_simulation_parameters": {
        "reduce_target": {
            "product_id": "MODEL-C-TRIM2",
            "quantity": 76
        },
        "reallocate_to": {
            "product_id": "MODEL-C-EV",
            "quantity": 76
        },
        "binding_constraint": "Production Capacity"
    }
}


simulate_financial_impact

In [None]:
from psycopg2.extras import RealDictRow

def simulate_financial_impact(
    optimal_simulation_parameters: Dict[str, Any],
    trim_performance_data: List[Dict[str, Any]],
    period: str = "1 quarter"
) -> Dict[str, Any]:
    """
    Simulates the net financial margin impact of a proposed production shift between two trims.

    Args:
        optimal_simulation_parameters: The output from 'calculate_optimal_shift', containing the
                                       reduce/reallocate targets and quantities.
        trim_performance_data: The output from 'aggregate_trim_performance', containing the unit margin
                               for each trim.
        period: The period for which the simulation is being run (for context).

    Returns:
        A dictionary with the 'simulation_result', detailing the margin changes and the net impact.
    """

    reduce_target = optimal_simulation_parameters['reduce_target']
    reallocate_to = optimal_simulation_parameters['reallocate_to']
    
    reduce_id = reduce_target['product_id']
    reduce_qty = reduce_target['quantity']
    reallocate_id = reallocate_to['product_id']
    reallocate_qty = reallocate_to['quantity']

    margin_map = {trim['product_id']: float(trim['unit_margin']) for trim in trim_performance_data}

    reduce_margin = margin_map.get(reduce_id, 0)
    reallocate_margin = margin_map.get(reallocate_id, 0)

    margin_change_from_reduction = reduce_qty * reduce_margin
    margin_change_from_reallocation = reallocate_qty * reallocate_margin
    net_margin_impact = margin_change_from_reallocation + margin_change_from_reduction

    result_key_reduction = f"margin_change_from_{reduce_id}"
    result_key_reallocation = f"margin_change_from_{reallocate_id}"
    
    simulation_result = {
        result_key_reduction: margin_change_from_reduction,
        result_key_reallocation: margin_change_from_reallocation,
        "net_margin_impact": net_margin_impact
    }
    
    return {"simulation_result": simulation_result}

sample_optimal_params = {
    "optimal_simulation_parameters": {
        "reduce_target": {
            "product_id": "MODEL-C-TRIM2",
            "quantity": 76
        },
        "reallocate_to": {
            "product_id": "MODEL-C-EV",
            "quantity": 76
        },
        "binding_constraint": "Production Capacity"
    }
}

sample_trim_performance = {
    'trim_performance_data': [
        RealDictRow([('product_id', 'MODEL-C-EV'), ('standard_product_cost', Decimal('46000000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('4981834.235772357724'))]),
        RealDictRow([('product_id', 'MODEL-C-TRIM2'), ('standard_product_cost', Decimal('42000000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('-42000000.00'))]),
        RealDictRow([('product_id', 'MODEL-C-TRIM4'), ('standard_product_cost', Decimal('41500000.00')), ('standard_production_time_hours', 4), ('unit_margin', Decimal('-41500000.00'))])
    ]
}

final_impact = simulate_financial_impact(
    optimal_simulation_parameters=sample_optimal_params['optimal_simulation_parameters'],
    trim_performance_data=sample_trim_performance['trim_performance_data'],
    period="1 quarter"
)

print(json.dumps(final_impact, indent=4))

{
    "simulation_result": {
        "margin_change_from_MODEL-C-TRIM2": -3192000000.0,
        "margin_change_from_MODEL-C-EV": 378619401.91869915,
        "net_margin_impact": -2813380598.0813007
    }
}


generate_mix_recommendation

In [48]:
def generate_mix_recommendation(
    simulation_result: Dict[str, Any],
    optimal_simulation_parameters: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Based on the simulation results, formulates a final, structured recommendation for adjusting the sales mix.

    Args:
        simulation_result: The output from 'simulate_financial_impact', containing the net margin impact.
        optimal_simulation_parameters: The output from 'calculate_optimal_shift', containing the
                                       reduce/reallocate targets and quantities.

    Returns:
        A dictionary containing the final answer, including the recommendation, justification, and details.
    """
    # 1. 입력값에서 필요한 정보 추출
    net_margin_impact = simulation_result.get('net_margin_impact', 0)
    reduce_target = optimal_simulation_parameters.get('reduce_target', {})
    reallocate_to = optimal_simulation_parameters.get('reallocate_to', {})
    binding_constraint = optimal_simulation_parameters.get('binding_constraint', 'N/A')

    # 2. 최종 추천안(recommendation) 구성
    recommendation = [
        {
            "action_type": "REDUCE_TARGET",
            "product_id": reduce_target.get('product_id'),
            "quantity": reduce_target.get('quantity')
        },
        {
            "action_type": "INCREASE_TARGET",
            "product_id": reallocate_to.get('product_id'),
            "quantity": reallocate_to.get('quantity')
        }
    ]
    
    # 3. 결정 근거(justification) 생성
    justification = (
        f"This is the optimal sales mix adjustment based on the '{binding_constraint}' constraint. "
        f"Shifting production from {reduce_target.get('product_id')} to {reallocate_to.get('product_id')} "
        f"is projected to yield the highest possible margin gain under the current limitations."
    )
    
    # 4. 상세 정보(details) 구성
    details = {
        "projected_net_margin_gain": net_margin_impact
    }
    
    # 5. 최종 답변 종합
    final_answer = {
        "recommendation": recommendation,
        "justification": justification,
        "details": details
    }
    
    return {"final_answer": final_answer}

sample_simulation_result = {
    "simulation_result": {
        "margin_change_from_MODEL-C-TRIM2": -3192000000.0,
        "margin_change_from_MODEL-C-EV": 378619401.918,
        "net_margin_impact": -2813380598.081
    }
}

sample_optimal_params = {
    "optimal_simulation_parameters": {
        "reduce_target": {
            "product_id": "MODEL-C-TRIM2",
            "quantity": 76
        },
        "reallocate_to": {
            "product_id": "MODEL-C-EV",
            "quantity": 76
        },
        "binding_constraint": "Production Capacity"
    }
}

final_recommendation = generate_mix_recommendation(
    simulation_result=sample_simulation_result['simulation_result'],
    optimal_simulation_parameters=sample_optimal_params['optimal_simulation_parameters']
)

print(json.dumps(final_recommendation, indent=4))

{
    "final_answer": {
        "recommendation": [
            {
                "action_type": "REDUCE_TARGET",
                "product_id": "MODEL-C-TRIM2",
                "quantity": 76
            },
            {
                "action_type": "INCREASE_TARGET",
                "product_id": "MODEL-C-EV",
                "quantity": 76
            }
        ],
        "justification": "This is the optimal sales mix adjustment based on the 'Production Capacity' constraint. Shifting production from MODEL-C-TRIM2 to MODEL-C-EV is projected to yield the highest possible margin gain under the current limitations.",
        "details": {
            "projected_net_margin_gain": -2813380598.081
        }
    }
}


find_affected_products_by_component

In [50]:
def find_affected_products_by_component(component_id: str) -> Dict[str, Any]:
    """
    Finds all finished products that use a specific component and retrieves their end-of-service dates.

    Args:
        component_id: The ID of the component to check.

    Returns:
        A dictionary containing an 'affected_products' list. Each item in the list is a dictionary
        with the 'product_id' and 'end_of_service_date'.
    """
    query = """
        SELECT
            p.product_id,
            p.end_of_service_date
        FROM
            Bill_of_Materials bom
        JOIN
            Products p ON bom.product_id = p.product_id
        WHERE
            bom.component_id = %s;
    """
    params = (component_id,)
    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query, params)
            affected_products = cur.fetchall()
            return {"affected_products": affected_products}
        
print(find_affected_products_by_component("P-404-BAT-LG"))

{'affected_products': [RealDictRow([('product_id', 'MODEL-C-EV'), ('end_of_service_date', datetime.date(2029, 12, 31))]), RealDictRow([('product_id', 'MODEL-D-HYBRID'), ('end_of_service_date', datetime.date(2030, 12, 31))]), RealDictRow([('product_id', 'SNTF-25-CL-AWD'), ('end_of_service_date', datetime.date(2030, 12, 31))]), RealDictRow([('product_id', 'SNTF-2.5-PREMIUM'), ('end_of_service_date', datetime.date(2028, 12, 31))]), RealDictRow([('product_id', 'MODEL-C-TRIM2'), ('end_of_service_date', datetime.date(2029, 12, 31))]), RealDictRow([('product_id', 'MODEL-C-TRIM4'), ('end_of_service_date', datetime.date(2028, 12, 31))]), RealDictRow([('product_id', 'MODEL-D-TRIM-A'), ('end_of_service_date', datetime.date(2030, 12, 31))]), RealDictRow([('product_id', 'MODEL-D-TRIM-B'), ('end_of_service_date', datetime.date(2030, 12, 31))])]}


calculate_lifetime_demand

In [56]:
def calculate_lifetime_demand(
    affected_products: List[Dict[str, Any]],
    component_id: str
) -> Dict[str, Any]:
    """
    Calculates the total lifetime demand for a discontinued component, including future production and service needs.

    Args:
        affected_products: A list of products affected by the component's discontinuation. 
                           Each dict must have 'product_id' and 'end_of_service_date'.
        component_id: The ID of the component being discontinued.

    Returns:
        A dictionary containing the calculated 'production_demand', 'service_demand', and 'total_required_units'.
    """
    if not affected_products:
        return {
            "total_lifetime_demand": {
                "production_demand": 0,
                "service_demand": 0,
                "total_required_units": 0
            }
        }

    # 1. 계산에 필요한 제품 ID 리스트와 최대 서비스 종료일 추출
    product_ids = [p['product_id'] for p in affected_products]
    
    production_demand = 0
    service_demand = 0
    total_future_forecast = 0

    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            # --- 2. 생산 수요(Production Demand) 계산 ---
            
            # 2-1. 영향을 받는 모든 제품의 미래 수요 예측량 합계 조회
            start_period = date.today().strftime('%Y-%m')
            cur.execute(
                "SELECT product_id, COALESCE(SUM(forecasted_qty), 0) as total_forecast FROM Demand_Forecast_Log WHERE product_id = ANY(%s) AND forecast_source = 'FINAL_PLAN' AND target_period >= %s GROUP BY product_id",
                (product_ids, start_period)
            )
            forecasts = {row['product_id']: row['total_forecast'] for row in cur.fetchall()}
            total_future_forecast = sum(forecasts.values())

            # 2-2. 제품별 부품 소요량(BOM) 조회
            cur.execute(
                "SELECT product_id, quantity_per_unit FROM Bill_of_Materials WHERE component_id = %s AND product_id = ANY(%s)",
                (component_id, product_ids)
            )
            bom_map = {row['product_id']: row['quantity_per_unit'] for row in cur.fetchall()}

            # 2-3. 생산 수요 최종 계산: Σ (제품별 미래 수요 * 제품별 부품 소요량)
            for pid in product_ids:
                production_demand += forecasts.get(pid, 0) * bom_map.get(pid, 0)

            # --- 3. 서비스 수요(Service Demand) 계산 ---

            # 3-1. 해당 부품의 과거 총 불량 건수 조회
            cur.execute(
                "SELECT COUNT(*) as total_incidents FROM Quality_Incidents WHERE component_id = %s AND product_id = ANY(%s)",
                (component_id, product_ids)
            )
            total_incidents = cur.fetchone()['total_incidents']

            # 3-2. 관련 제품들의 과거 총 판매량 조회
            cur.execute(
                "SELECT COALESCE(SUM(units_sold), 0) as total_sales FROM Sales_History WHERE product_id = ANY(%s)",
                (product_ids,)
            )
            total_sales = cur.fetchone()['total_sales']

            # 3-3. 서비스 수요 최종 계산: (과거 불량률) * (미래 총 수요)
            if total_sales > 0:
                failure_rate = Decimal(total_incidents) / Decimal(total_sales)
                service_demand = failure_rate * Decimal(total_future_forecast)
            else:
                service_demand = 0

    # 4. 최종 결과 조합
    total_required_units = production_demand + service_demand
    
    return {
        "total_lifetime_demand": {
            "production_demand": int(production_demand),
            "service_demand": int(service_demand),
            "total_required_units": int(total_required_units)
        }
    }
    
sample_component_id = "P-404-BAT-LG"
sample_affected_products = [
    {'product_id': 'MODEL-C-EV', 'end_of_service_date': date(2029, 12, 31)},
    {'product_id': 'MODEL-D-HYBRID', 'end_of_service_date': date(2030, 12, 31)},
    {'product_id': 'SNTF-25-CL-AWD', 'end_of_service_date': date(2030, 12, 31)},
    {'product_id': 'SNTF-2.5-PREMIUM', 'end_of_service_date': date(2028, 12, 31)},
    {'product_id': 'MODEL-C-TRIM2', 'end_of_service_date': date(2029, 12, 31)},
    {'product_id': 'MODEL-C-TRIM4', 'end_of_service_date': date(2028, 12, 31)},
    {'product_id': 'MODEL-D-TRIM-A', 'end_of_service_date': date(2030, 12, 31)},
    {'product_id': 'MODEL-D-TRIM-B', 'end_of_service_date': date(2030, 12, 31)}
]

def json_converter(o):
    if isinstance(o, date):
        return o.isoformat()
    if hasattr(o, '__dict__'):
        return o.__dict__
    if isinstance(o, psycopg2.extras.RealDictRow):
        return dict(o)
    
lifetime_demand_result = calculate_lifetime_demand(
    affected_products=sample_affected_products,
    component_id=sample_component_id
)

print(json.dumps(lifetime_demand_result, indent=4, default=json_converter))

{
    "total_lifetime_demand": {
        "production_demand": 9505,
        "service_demand": 27,
        "total_required_units": 9532
    }
}


get_component_sourcing_data

In [57]:
def get_component_sourcing_data(component_id: str) -> Dict[str, Any]:
    """
    Collects all necessary cost and sourcing data for an EOL (End-of-Life) buy calculation for a specific component.

    Args:
        component_id: The ID of the component to gather data for.

    Returns:
        A dictionary containing current total inventory, sourcing rules (MOQ, volume pricing),
        and the obsolescence cost per unit for the specified component.
    """
    current_inventory = 0
    sourcing_rules = {}
    obsolescence_cost_per_unit = 0

    with PostgresConnection(DB_PARAMS) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            # 1. Inventory_History에서 가장 최신 스냅샷의 전체 현재고 합계 조회
            cur.execute(
                """
                SELECT COALESCE(SUM(quantity_on_hand), 0) as total_inventory
                FROM Inventory_History
                WHERE item_id = %s
                  AND snapshot_ts = (SELECT MAX(snapshot_ts) FROM Inventory_History WHERE item_id = %s);
                """,
                (component_id, component_id)
            )
            inv_result = cur.fetchone()
            if inv_result:
                current_inventory = inv_result['total_inventory']

            # 2. Sourcing_Rules에서 주 공급사의 MOQ와 가격 정책 조회
            cur.execute(
                "SELECT min_order_qty, volume_pricing_json FROM Sourcing_Rules WHERE component_id = %s AND is_primary_supplier = TRUE",
                (component_id,)
            )
            sourcing_result = cur.fetchone()
            if sourcing_result:
                sourcing_rules = {
                    "min_order_qty": sourcing_result['min_order_qty'],
                    "volume_pricing": json.loads(sourcing_result['volume_pricing_json']).get('tiers', [])
                }
            
            # 3. Components 테이블에서 표준 원가를 폐기 비용으로 간주하여 조회
            cur.execute(
                "SELECT standard_cost FROM Components WHERE component_id = %s",
                (component_id,)
            )
            cost_result = cur.fetchone()
            if cost_result:
                obsolescence_cost_per_unit = cost_result['standard_cost']

    return {
        "current_inventory": int(current_inventory),
        "sourcing_rules": sourcing_rules,
        "obsolescence_cost_per_unit": float(obsolescence_cost_per_unit)
    }
    
test_component_id = "P-404-BAT-LG"

# 2. 함수를 호출하여 결과 저장
result = get_component_sourcing_data(test_component_id)

# 3. 최종 결과를 보기 좋게 출력 (Decimal 객체는 float으로 변환)
def json_converter(o):
    if isinstance(o, Decimal):
        return float(o)
    if isinstance(o, psycopg2.extras.RealDictRow):
        return dict(o)

print(json.dumps(result, indent=4, default=json_converter))

{
    "current_inventory": 12913,
    "sourcing_rules": {
        "min_order_qty": 10000,
        "volume_pricing": [
            {
                "min_qty": 10000,
                "price": 25.5
            },
            {
                "min_qty": 50000,
                "price": 24.0
            },
            {
                "min_qty": 100000,
                "price": 22.5
            }
        ]
    },
    "obsolescence_cost_per_unit": 25.5
}


optimize_final_buy_quantity

In [64]:
def optimize_final_buy_quantity(
    total_required_units: int,
    current_inventory: int,
    sourcing_rules: Dict[str, Any],
    obsolescence_cost_per_unit: float
) -> Dict[str, Any]:
    """
    Determines the final order quantity that minimizes the Total Cost of Ownership (TCO),
    considering both purchase cost with volume discounts and potential obsolescence cost for surplus items.

    Args:
        total_required_units: The total forecasted need for the component.
        current_inventory: The current quantity on hand.
        sourcing_rules: A dictionary containing 'min_order_qty' and 'volume_pricing' tiers.
        obsolescence_cost_per_unit: The cost to dispose of one surplus unit.

    Returns:
        A dictionary with the net required units, the calculated final order quantity, and the reason for the decision.
    """
    # 1. 순수 필요량 계산
    net_required_units = max(0, total_required_units - current_inventory)

    # 2. 가격 정책 및 MOQ 추출
    pricing_tiers = sourcing_rules.get("volume_pricing", [])
    min_order_qty = sourcing_rules.get("min_order_qty", 0)

    # 3. TCO를 계산할 주문량 후보군 정의
    # (순수 필요량, MOQ, 각 가격 할인 구간의 최소수량)
    candidate_quantities = {net_required_units, float(min_order_qty)}
    for tier in pricing_tiers:
        candidate_quantities.add(float(tier['min_qty']))
    
    # 4. 각 후보 주문량에 대한 TCO 계산
    tco_results = []

    def get_price_for_quantity(quantity, tiers):
        # 수량에 맞는 단가 찾아주는 함수
        best_price = float('inf')
        applicable_tier = None
        for tier in sorted(tiers, key=lambda x: x['min_qty'], reverse=True):
            if quantity >= tier['min_qty']:
                applicable_tier = tier
                break
        if applicable_tier:
            return float(applicable_tier['price'])
        # 티어에 해당하지 않는 경우 (예: MOQ 미만)는 첫 티어 가격으로 가정하거나 에러 처리 가능
        return float(tiers[0]['price']) if tiers else 0

    for qty in sorted(list(candidate_quantities)):
        if qty < min_order_qty:
            if qty == 0: # 필요량이 0이면 주문 안함
                tco_results.append({'quantity': 0, 'tco': 0, 'reason': "No purchase needed."})
                continue
            else: # 필요량이 MOQ보다 작으면 MOQ만큼 주문해야 함
                continue

        unit_price = get_price_for_quantity(qty, pricing_tiers)
        purchase_cost = qty * unit_price
        
        surplus_qty = max(0, qty - net_required_units)
        obsolescence_cost = surplus_qty * obsolescence_cost_per_unit
        
        total_cost = purchase_cost + obsolescence_cost
        tco_results.append({
            'quantity': int(qty),
            'tco': total_cost,
            'reason': f"Ordering {int(qty)} units at ${unit_price:.2f}/unit has a TCO of ${total_cost:,.2f}."
        })

    # 5. TCO가 가장 낮은 최적의 주문량 선택
    if not tco_results:
         return {
            "net_required_units": int(net_required_units),
            "final_order_quantity": int(min_order_qty),
            "reason": f"No valid pricing tiers found. Ordering the minimum required quantity of {min_order_qty}."
        }

    optimal_choice = min(tco_results, key=lambda x: x['tco'])

    return {
        "net_required_units": int(net_required_units),
        "final_order_quantity": optimal_choice['quantity'],
        "reason": optimal_choice['reason']
    }
    
sample_total_required = 9532
sample_current_inventory = 12913
sample_sourcing_rules = {
    "min_order_qty": 10000,
    "volume_pricing": [
        {"min_qty": 10000, "price": 25.5},
        {"min_qty": 50000, "price": 24.0},
        {"min_qty": 100000, "price": 22.5}
    ]
}
sample_obsolescence_cost = 25.5

# --- 함수 호출 및 결과 출력 ---
final_buy_plan = optimize_final_buy_quantity(
    total_required_units=sample_total_required,
    current_inventory=sample_current_inventory,
    sourcing_rules=sample_sourcing_rules,
    obsolescence_cost_per_unit=sample_obsolescence_cost
)

print(json.dumps(final_buy_plan, indent=4))

{
    "net_required_units": 0,
    "final_order_quantity": 0,
    "reason": "No purchase needed."
}


generate_eol_purchase_order

In [62]:
def generate_eol_purchase_order(
    component_id: str,
    supplier_id: str,
    final_order_quantity: int
) -> Dict[str, Any]:
    """
    Formats the final End-of-Life purchase order details based on previous calculations.

    Args:
        component_id: The ID of the component to be ordered.
        supplier_id: The ID of the supplier to order from.
        final_order_quantity: The final calculated quantity to order.

    Returns:
        A dictionary containing the structured final purchase order details.
    """
    
    # 입력받은 정보들을 최종 답변 객체로 구조화하여 반환합니다.
    return {
        "final_answer": {
            "component_id": component_id,
            "supplier_id": supplier_id,
            "final_order_quantity": final_order_quantity
        }
    }
    
sample_component_id = "P-404-BAT-LG"
sample_supplier_id = "S-B"

# 2. Node 5의 출력값 (optimize_final_buy_quantity 결과)
sample_optimization_result = {
    "net_required_units": 4532,
    "final_order_quantity": 10000,
    "reason": "Ordering 10000 units at $25.50/unit has a TCO of $394,434.00."
}

# --- 함수 호출 및 결과 출력 ---
final_po = generate_eol_purchase_order(
    component_id=sample_component_id,
    supplier_id=sample_supplier_id,
    final_order_quantity=sample_optimization_result['final_order_quantity']
)

# 최종 결과를 보기 좋게 json 형식으로 출력
print(json.dumps(final_po, indent=4))

{
    "final_answer": {
        "component_id": "P-404-BAT-LG",
        "supplier_id": "S-B",
        "final_order_quantity": 10000
    }
}
