# Gather Insights from Industrial Data Stores with Amazon Bedrock

## Prerequisites

Before getting started, make sure you have the following:

- Python 3.x installed
- Access to an AWS account with permissions to use Amazon Bedrock
- Model access enabled for:
  - Anthropic Claude models (Claude 3 Haiku, Claude 3 Sonnet)
  - Amazon Nova models (Nova-Lite, Nova-Pro, Nova-Micro)
- The following repo: `https://github.com/aws-samples/industrial-data-store-simulation-chatbot` with this notebook in the same project directory
- (recommended) If running on Amazon SageMaker AI Jupyter Lab, a `ml.t3.medium` is recommended. This is the default instance type for CPU-based SageMaker images, and is available as part of the [AWS Free Tier](https://aws.amazon.com/free)

## Introduction

In this notebook, you will explore how to build a conversational interface for manufacturing data using Amazon Bedrock's Converse API with tool use capabilities. This approach allows models to interact with external tools like databases to provide accurate, up-to-date information in response to user queries.

You'll understand:
1. How to connect FMs to industrial data sources
2. How to use the Bedrock Converse API with tools
3. How to build a system that can translate natural language to SQL
4. How to create a complete workflow from user question to informative answer

### Context

Large language models (LLMs) have excellent reasoning capabilities but need external tools to access enterprise data. Instead of directly feeding database information to the model, we can leverage tool-calling capabilities that let the model:

1. Understand a user's question
2. Decide what data is needed
3. Call appropriate tools to retrieve that data (like database queries)
4. Interpret the results and provide a helpful response

This approach makes data accessible to users without SQL expertise or knowledge of database schemas. Rather than requiring specialized database tools, users can simply ask questions in natural language.

### Interact with a Manufacturing Execution System (MES)

In this workshop, you'll work with a simulated Manufacturing Execution System (MES) implemented as a SQLite database. Manufacturing Execution Systems track production processes, machine states, work orders, inventory, and quality control in manufacturing environments.

In real manufacturing settings, MES systems connect with other industrial systems like:
- ERP systems (for work orders and planning)
- SCADA and control systems (for operational data)
- Data historians (for time-series data and analytics)

Our simulated MES is self-contained for simplicity, but the techniques you'll learn can be extended to more complex environments with multiple connected systems.

## Environment Setup

First, let's install the required packages:

In [1]:
#install requirements
%pip install -q -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


Load environment variables:
> Note: This is not required when running this notebook in Amazon Sagemaker AI JupyterLab

In [2]:
# Environment setup
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

True

## Setting up the Amazon Bedrock Client

To use the Amazon Bedrock Converse API, we'll create a boto3 client for bedrock-runtime:

In [3]:
import boto3
import json

def get_bedrock_client():
    """Create a bedrock-runtime client"""
    return boto3.client(
        service_name='bedrock-runtime',
        region_name=os.getenv("AWS_REGION", "us-east-1"),
        endpoint_url=f'https://bedrock-runtime.{os.getenv("AWS_REGION", "us-east-1")}.amazonaws.com',
    )

# Create the client
bedrock_client = get_bedrock_client()

Let's test the client with a simple prompt to make sure our access to Bedrock is working properly:

In [4]:
def test_bedrock_model(prompt, model_id="anthropic.claude-3-haiku-20240307-v1:0"):
    """Test the Bedrock model with a simple prompt"""
    if "anthropic" in model_id:
        body = json.dumps({
            "max_tokens": 1024,
            "temperature": 0.5,
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "anthropic_version": "bedrock-2023-05-31"
        })
    elif "nova" in model_id:
        body = json.dumps({
            "schemaVersion": "messages-v1",
            "messages": [
                {"role": "user", "content": [{"text": prompt}]}
            ],
            "inferenceConfig": {
                "maxTokens": 1024,
                "temperature": 0.5,
                "topP": 0.9
            }
        })
    elif "amazon" in model_id:
        body = json.dumps({
            "inputText": prompt,
            "textGenerationConfig": {
                "maxTokenCount": 1024,
                "temperature": 0.5,
                "topP": 0.9
            }
        })
    
    response = bedrock_client.invoke_model(
        body=body, 
        modelId=model_id
    )
    
    response_body = json.loads(response.get("body").read())
    
    if "anthropic" in model_id:
        return response_body['content'][0]['text']
    elif "nova" in model_id:
        # Nova response format is different
        if 'contentBlocks' in response_body and len(response_body['contentBlocks']) > 0:
            # Extract text from content blocks
            content_block = response_body['contentBlocks'][0]
            if 'text' in content_block:
                return content_block['text']
            elif 'content' in content_block and len(content_block['content']) > 0:
                return content_block['content'][0].get('text', '')
        return str(response_body)
    elif "amazon" in model_id:
        return response_body.get('results')[0].get('outputText')

# Test with Claude 3 Haiku
prompt = "Hello, can you tell me about manufacturing execution systems (MES) in 2-3 sentences?"
response = test_bedrock_model(prompt)
print("Claude 3 Haiku Response:")
print(response)

Claude 3 Haiku Response:
Certainly! Here's a brief overview of manufacturing execution systems (MES) in 2-3 sentences:

1. A manufacturing execution system (MES) is a real-time information system that is used to track and document the transformation of raw materials into finished goods in a manufacturing environment.
2. MES systems provide visibility and control over the production process, helping manufacturers optimize production, improve quality, and ensure regulatory compliance.
3. MES systems integrate with other enterprise systems, such as enterprise resource planning (ERP) and product lifecycle management (PLM) systems, to provide a comprehensive view of the manufacturing process.


Let's compare responses between different available models:

In [5]:
import time

# Define models to test
models = [
    "anthropic.claude-3-haiku-20240307-v1:0",
    "anthropic.claude-3-sonnet-20240229-v1:0",
    "us.amazon.nova-micro-v1:0"
]

prompt = "Tell me about the top 3 trends in Industrial Manufacturing in 3 bullet points"

# Test each model
for model_id in models:
    print(f"\nTesting {model_id}...")
    start_time = time.time()
    
    try:
        response = test_bedrock_model(prompt, model_id)
        end_time = time.time()
        execution_time = end_time - start_time
        
        print(f"Response:\n{response}")
        print(f"\nExecution time: {execution_time:.2f} seconds")
    except Exception as e:
        print(f"Error with model {model_id}: {e}")
        print("Please make sure you have enabled access to this model in the AWS Bedrock console.")


Testing anthropic.claude-3-haiku-20240307-v1:0...
Response:
Here are the top 3 trends in Industrial Manufacturing in 3 bullet points:

• Automation and Robotics: The increasing adoption of advanced automation technologies, such as industrial robots, cobots (collaborative robots), and AI-powered systems, is transforming manufacturing processes, improving efficiency, productivity, and quality.

• Digitalization and Industry 4.0: The integration of digital technologies, including the Internet of Things (IoT), big data analytics, cloud computing, and cyber-physical systems, is enabling the creation of smart factories and the optimization of manufacturing operations.

• Sustainability and Circular Economy: Manufacturers are focusing on sustainable practices, such as reducing energy consumption, minimizing waste, and implementing circular economy principles, to address environmental concerns and meet evolving customer and regulatory demands.

Execution time: 1.61 seconds

Testing anthropic.

As you can see from the above tests, models do output different results from the same questions - models have their own "personality" and larger models generally reason better but are slower and more expensive to run. One can also notice different model families structure their output differently, making it difficult o easily switch between models. In the cell above we manually hanlded this for Amazon Nova and Anthropic Claude, but this is not scalable. 

Follow along with the notebook to see how we can address this to make model swapping a breeze!

## Database Exploration

Let's explore the simulated MES database to understand its structure. This is important for understanding what data we have access to through our tools.

The MES database includes tables for:

- Products and Bill of Materials
- Inventory and Suppliers
- Work Centers and Machines
- Employees and Shifts
- Work Orders and Production Schedules
- Quality Control and Defects
- Material Consumption
- Downtime Events
- OEE (Overall Equipment Effectiveness) Metrics



In [6]:
# First, let's create a helper function to execute SQL queries against our SQLite database
import sqlite3
import pandas as pd

def query_sqlite(query, db_path="mes.db"):
    """
    Executes a SQL query against a SQLite database and returns the results as a pandas DataFrame
    
    Parameters
    ----------
    query : str
        An SQL query to execute
    db_path : str
        Path to the SQLite database file
        
    Returns
    ----------
    pandas.DataFrame
        The results of the SQL query
    """
    try:
        conn = sqlite3.connect(db_path)
        df = pd.read_sql_query(query, conn)
        conn.close()
        return df
    except Exception as e:
        return f"Error executing query: {str(e)}"

Now, let's check if the database exists and what tables it contains:

In [7]:
# Check if the database file exists
import os
if not os.path.exists('mes.db'):
    print("MES database not found. We need to create it first.")
    print("Running the synthetic data generator...")
    
    # You can import and run the generator script 
    # from MES-synthetic-data.sqlite-synthetic-mes-data import main
    # main()
    
    # For this notebook, we'll run a simple command to create the database
    # if it doesn't exist
    !python MES-synthetic-data/sqlite-synthetic-mes-data.py

# Show tables in the database
print("Tables in the database:")
tables = query_sqlite("SELECT name FROM sqlite_master WHERE type='table';")
print(tables)

Tables in the database:
                   name
0              Products
1             Suppliers
2           WorkCenters
3                Shifts
4             Inventory
5              Machines
6             Employees
7       BillOfMaterials
8            WorkOrders
9            OEEMetrics
10            Downtimes
11       QualityControl
12  MaterialConsumption
13              Defects


Let's examine the structure of each table to understand what data we have:

In [8]:
# Function to explore table structure and sample data
def explore_table(table_name, limit=5):
    """
    Explores a table's structure and sample data
    
    Parameters
    ----------
    table_name : str
        The name of the table to explore
    limit : int
        Number of sample rows to display
        
    Returns
    ----------
    None
        Prints table info to the console
    """
    # Get table schema
    schema = query_sqlite(f"PRAGMA table_info({table_name});")
    
    # Get sample data
    sample_data = query_sqlite(f"SELECT * FROM {table_name} LIMIT {limit};")
    
    print(f"\n--- {table_name} Table ---")
    print("\nSchema:")
    print(schema[['name', 'type']])
    
    print("\nSample Data:")
    print(sample_data)
    
    # Get row count
    count = query_sqlite(f"SELECT COUNT(*) as count FROM {table_name};")
    print(f"\nTotal Rows: {count['count'].values[0]}")
    
    print("-" * 50)

In [9]:
# Explore each table in the database
for table_name in tables['name']:
    explore_table(table_name)


--- Products Table ---

Schema:
                  name     type
0            ProductID  INTEGER
1                 Name  VARCHAR
2          Description  VARCHAR
3             Category  VARCHAR
4                 Cost    FLOAT
5  StandardProcessTime    FLOAT
6             IsActive  BOOLEAN

Sample Data:
   ProductID        Name                                        Description  \
0          1  eBike T101  eBike T101: High-performance electric bicycle ...   
1          2  eBike T200  eBike T200: Premium mountain e-bike with rugge...   
2          3  eBike C150  eBike C150: Comfortable city commuter e-bike w...   
3          4  eBike M300  eBike M300: Mid-drive electric mountain bike w...   
4          5       Forks  Forks: Durable suspension forks for smooth rid...   

         Category    Cost  StandardProcessTime  IsActive  
0  Electric Bikes  380.97                 2.38         1  
1  Electric Bikes  741.34                 2.09         1  
2  Electric Bikes  887.42                 2.8

## Understanding the Bedrock Converse API with Tools

The Bedrock Converse API allows models to interact with external tools. This is particularly useful for helping models access databases or other systems when answering questions.

Here's how the tool use process works:

1. A user asks a question
2. We pass the question to the model through the Converse API
3. The model decides if it needs to use a tool to answer
4. If a tool is needed, the model requests to call the tool with specific parameters
5. We execute the tool and return the results to the model
6. The model uses the tool results to generate a final answer

This process can involve multiple tool calls within a single conversation turn.

Now, let's define our tool configurations for interacting with the MES database.

In [10]:
# Define tool configurations for interacting with the MES database
def get_tool_config():
    """
    Get the tool configuration for the Bedrock Converse API
    
    Returns
    -------
    dict
        Tool configuration for the Converse API
    """
    return {
        "tools": [
            {
                "toolSpec": {
                    "name": "get_schema",
                    "description": "ALWAYS use this tool FIRST to get the schema of the MES database before attempting any SQL queries. This provides details about all tables, columns, relationships, and sample data.",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": {}
                        }
                    }
                }
            },
            {
                "toolSpec": {
                    "name": "execute_sql",
                    "description": "Execute SQL queries against the MES database ONLY after you have retrieved and examined the schema. Write efficient SQL that joins relevant tables and focuses on answering the user's specific question.",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": {
                                "sql_query": {
                                    "type": "string",
                                    "description": "The SQL query to execute against the MES database. Write clean, efficient SQL that joins necessary tables to answer the user's question in one query when possible."
                                }
                            },
                            "required": [
                                "sql_query"
                            ]
                        }
                    }
                }
            }
        ]
    }

Now let's create functions to handle the tool requests from the model:

In [11]:
# DatabaseQueryTool class to handle SQL queries and schema retrieval
class DatabaseQueryTool:
    """A tool for executing SQL queries against the MES database"""
    
    def __init__(self, db_path="mes.db"):
        """Initialize with the database path"""
        self.db_path = db_path
        self._schema_cache = None
        self._schema_cache_time = None
        self._cache_expiry = 60 * 5  # Cache expires after 5 minutes
    
    def execute_query(self, sql_query):
        """Execute a SQL query and return the results"""
        print(f"Executing SQL query: {sql_query}")
        start_time = time.time()
        
        try:
            # Connect to the database
            conn = sqlite3.connect(self.db_path)
            
            # Execute the query
            df = pd.read_sql_query(sql_query, conn)
            conn.close()
            
            # Process datetime columns for better display
            for col in df.columns:
                if df[col].dtype == 'object':
                    # Try to convert string columns that might be dates
                    try:
                        if df[col].str.contains('-').any() and df[col].str.contains(':').any():
                            df[col] = pd.to_datetime(df[col])
                            # Format datetime for display
                            df[col] = df[col].dt.strftime('%Y-%m-%d %H:%M')
                    except:
                        pass
            
            # Round float columns to 2 decimal places for display
            for col in df.select_dtypes(include=['float']).columns:
                df[col] = df[col].round(2)
            
            # Convert to JSON-serializable format
            result = {
                "success": True,
                "rows": df.to_dict(orient="records"),
                "column_names": df.columns.tolist(),
                "row_count": len(df),
                "execution_time_ms": round((time.time() - start_time) * 1000, 2)
            }
            
            print(f"Query executed successfully: {len(df)} rows returned in {result['execution_time_ms']}ms")
            return result
            
        except Exception as e:
            error_msg = str(e)
            print(f"Error executing SQL query: {error_msg}")
            
            # Provide more helpful error messages for common issues
            if "no such table" in error_msg.lower():
                table_name = error_msg.split("no such table:", 1)[1].strip() if "no such table:" in error_msg else "unknown"
                error_msg = f"Table '{table_name}' doesn't exist. Please check the schema and table names."
            elif "no such column" in error_msg.lower():
                col_name = error_msg.split("no such column:", 1)[1].strip() if "no such column:" in error_msg else "unknown"
                error_msg = f"Column '{col_name}' doesn't exist. Please check the schema and column names."
            elif "syntax error" in error_msg.lower():
                error_msg = f"SQL syntax error: {error_msg}. Please check your query syntax."
            
            return {
                "success": False,
                "error": error_msg,
                "execution_time_ms": round((time.time() - start_time) * 1000, 2)
            }
    
    def get_schema(self):
        """Get the database schema with caching for performance"""
        current_time = time.time()
        
        # Return cached schema if available and fresh
        if (self._schema_cache is not None and 
            self._schema_cache_time is not None and 
            current_time - self._schema_cache_time < self._cache_expiry):
            print("Returning cached schema")
            return self._schema_cache
        
        print("Retrieving fresh database schema")
        start_time = time.time()
        
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # Get all tables
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            tables = cursor.fetchall()
            
            schema = {}
            for table in tables:
                table_name = table[0]
                
                # Get column information
                cursor.execute(f"PRAGMA table_info({table_name});")
                columns = cursor.fetchall()
                
                # Format column information
                column_info = []
                for col in columns:
                    column_info.append({
                        "name": col[1],
                        "type": col[2],
                        "notnull": bool(col[3]),
                        "pk": bool(col[5])
                    })
                
                # Get foreign key relationships
                cursor.execute(f"PRAGMA foreign_key_list({table_name});")
                foreign_keys = cursor.fetchall()
                
                fk_info = []
                for fk in foreign_keys:
                    fk_info.append({
                        "id": fk[0],
                        "seq": fk[1],
                        "table": fk[2],
                        "from": fk[3],
                        "to": fk[4]
                    })
                
                # Get table row count
                cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
                row_count = cursor.fetchone()[0]
                
                # Get sample data (limited to 3 rows for performance)
                cursor.execute(f"SELECT * FROM {table_name} LIMIT 3;")
                sample_data = cursor.fetchall()
                
                # Get column names for the sample data
                column_names = [col[1] for col in columns]
                
                # Format sample data as records
                sample_data_records = []
                for row in sample_data:
                    record = {}
                    for i, value in enumerate(row):
                        record[column_names[i]] = value
                    sample_data_records.append(record)
                
                # Add table information to schema
                schema[table_name] = {
                    "columns": column_info,
                    "foreign_keys": fk_info,
                    "row_count": row_count,
                    "sample_data": sample_data_records
                }
            
            # Add schema metadata
            schema["__metadata__"] = {
                "database_name": self.db_path.split("/")[-1],
                "total_tables": len(tables),
                "generated_at": datetime.now().isoformat(),
                "schema_version": "1.1"
            }
            
            conn.close()
            
            # Update cache
            self._schema_cache = schema
            self._schema_cache_time = current_time
            
            print(f"Schema retrieved in {round((time.time() - start_time) * 1000, 2)}ms")
            return schema
            
        except Exception as e:
            print(f"Error retrieving schema: {e}")
            return {
                "error": f"Failed to retrieve schema: {str(e)}",
                "timestamp": datetime.now().isoformat()
            }

Now let's create a function to handle tool requests from the model:

In [12]:
from datetime import datetime

# Initialize the database tool
db_tool = DatabaseQueryTool()

def handle_tool_request(tool, model_id, conversation, query_timeout=60):
    """
    Handle tool requests from the model
    
    Parameters
    ----------
    tool : dict
        The tool request from the model
    model_id : str
        The model ID to use for the conversation
    conversation : list
        The conversation history
    query_timeout : int
        Timeout for SQL queries in seconds
        
    Returns
    -------
    tuple
        (response, conversation, tool_response)
    """
    tool_use = tool["toolUse"]
    tool_use_id = tool_use["toolUseId"]
    tool_name = tool_use["name"]
    
    print(f"Tool request received: {tool_name}, ID: {tool_use_id}")
    
    # Store tool responses for later display in the assistant message
    tool_response = {
        "type": tool_name,
        "data": None
    }
    
    # Execute the appropriate tool
    if tool_name == "execute_sql":
        sql_query = tool_use["input"]["sql_query"]
        
        # Save SQL for display
        tool_response["sql_query"] = sql_query
        
        # Execute the SQL query
        start_time = time.time()
        result = db_tool.execute_query(sql_query)
        elapsed_time = time.time() - start_time
        
        if result["success"]:
            tool_response["success"] = True
            tool_response["execution_time"] = elapsed_time
            tool_response["row_count"] = result["row_count"]
            
            # Convert to dataframe for display
            if result["row_count"] > 0:
                df = pd.DataFrame(result["rows"])
                tool_response["dataframe"] = df
                
                # Display the query results
                print(f"\nQuery results ({result['row_count']} rows):")
                print(df.head().to_string())
                if result["row_count"] > 5:
                    print(f"...and {result['row_count'] - 5} more rows")
            
            # Prepare the tool result response
            tool_result = {
                "toolUseId": tool_use_id,
                "content": [{"json": result}]
            }
        else:
            tool_response["success"] = False
            tool_response["error"] = result["error"]
            tool_response["execution_time"] = elapsed_time
            
            # Prepare the error response
            tool_result = {
                "toolUseId": tool_use_id,
                "content": [{"text": f"Error executing SQL: {result['error']}"}],
                "status": "error"
            }
    
    elif tool_name == "get_schema":
        # Get the database schema
        schema = db_tool.get_schema()
        
        # Save schema info for display
        # Filter out metadata entry when counting columns
        total_tables = len([k for k in schema.keys() if k != "__metadata__"])
        total_columns = sum(len(table_info.get("columns", [])) 
                          for table_name, table_info in schema.items() 
                          if table_name != "__metadata__")
        
        tool_response["data"] = {
            "total_tables": total_tables,
            "total_columns": total_columns,
            "schema": schema
        }
        
        print(f"\nSchema retrieved: {total_tables} tables, {total_columns} columns")
        
        # Prepare the tool result response
        tool_result = {
            "toolUseId": tool_use_id,
            "content": [{"json": schema}]
        }
    
    else:
        # Unknown tool
        print(f"Unknown tool requested: {tool_name}")
        
        tool_response["success"] = False
        tool_response["error"] = f"Unknown tool: {tool_name}"
        
        tool_result = {
            "toolUseId": tool_use_id,
            "content": [{"text": f"Unknown tool: {tool_name}"}],
            "status": "error"
        }
    
    # Add the tool result to the conversation
    tool_result_message = {
        "role": "user",
        "content": [
            {
                "toolResult": tool_result
            }
        ]
    }
    conversation.append(tool_result_message)
    
    # Send the tool result to the model
    response = bedrock_client.converse(
        modelId=model_id,
        messages=conversation,
        toolConfig=get_tool_config(),
        inferenceConfig={
            "maxTokens": 4096,
            "temperature": 0.1
        }
    )
    
    return response, conversation, tool_response

## Using the Converse API to Answer Questions about the MES

Now let's combine everything to ask questions about our MES database:

In [13]:
def ask_mes_question(question, model_id="anthropic.claude-3-haiku-20240307-v1:0", temperature=0.1):
    """
    Ask a question about the MES using the Bedrock Converse API with tools
    
    Parameters
    ----------
    question : str
        The question to ask about the MES
    model_id : str
        The model ID to use for the conversation
    temperature : float
        The temperature to use for the model
        
    Returns
    -------
    dict
        A dictionary containing the question, response, and any tool outputs
    """
    print(f"Question: {question}")
    print("-" * 50)
    
    # Create system prompt
    system_prompt = """You are an expert manufacturing analyst for a Manufacturing Execution System (MES) for an e-bike manufacturing facility.

Your role is to help users extract insights by querying the MES database that tracks:
- Products (e-bikes, components, and parts)
- Work Orders (production jobs with schedules and status)
- Inventory (raw materials, components, and stock levels)
- Work Centers (manufacturing areas like Frame Fabrication, Wheel Production)
- Machines (equipment with efficiency metrics and maintenance records)
- Quality Control (inspection results, defects, and yield rates)
- Material Consumption (component usage tracking)
- Downtime Events (machine issues and reasons)
- OEE Metrics (Overall Equipment Effectiveness measurements)
- Employees (operators, technicians, and managers)

IMPORTANT GUIDELINES:
1. ALWAYS use the get_schema tool FIRST to understand the database structure.
2. Write efficient SQL queries - prefer JOINs to retrieve related data in a single query and ALWAYS make sure that the query is SQLite compatible
3. For questions about trends or patterns, include visualizable metrics.
4. For inventory questions, consider reorder levels and stock status.
5. For quality questions, look at defect types and rates.
6. For machine questions, consider OEE metrics and maintenance schedules.
7. For production questions, consider work order status and schedule adherence.

FORMAT YOUR RESPONSES:
1. First, briefly restate what you understood from the question
2. Present a concise summary of the key findings
3. Add relevant details or observations beneath your summary
4. If applicable, suggest follow-up questions the user might want to ask

Keep your explanations clear and relevant to manufacturing operations. Avoid excessive technical jargon when explaining results.
"""
    
    # Start timer
    start_time = time.time()
    
    # Create user message
    user_message = {
        "role": "user",
        "content": [{"text": question}]
    }
    
    # Initialize conversation
    conversation = [user_message]
    
    # First model call
    response = bedrock_client.converse(
        modelId=model_id,
        messages=conversation,
        system=[{"text": system_prompt}],
        toolConfig=get_tool_config(),
        inferenceConfig={
            "maxTokens": 4096,
            "temperature": temperature
        }
    )
    
    # Store tool responses
    tool_responses = []
    
    # Handle tool use requests as needed
    stop_reason = response["stopReason"]
    
    while stop_reason == "tool_use":
        # Get the tool request
        tool_requests = response["output"]["message"]["content"]
        
        # Add the assistant message to the conversation
        conversation.append(response["output"]["message"])
        
        # Process each tool request
        for tool_request in tool_requests:
            if "toolUse" in tool_request:
                # Handle the tool request
                response, conversation, tool_response = handle_tool_request(
                    tool_request, model_id, conversation
                )
                
                # Store the tool response
                tool_responses.append(tool_response)
                
                # Check if we need to process another tool request
                stop_reason = response["stopReason"]
    
    # Extract the final text response
    final_message = response["output"]["message"]
    conversation.append(final_message)
    
    # Display the final text response
    final_text = ""
    for content_block in final_message["content"]:
        if "text" in content_block:
            final_text += content_block["text"]
    
    # Add elapsed time
    elapsed_time = round(time.time() - start_time, 2)
    
    print("-" * 50)
    print(f"Response (completed in {elapsed_time}s):")
    print(final_text)
    
    # Return everything for further analysis if needed
    return {
        "question": question,
        "response": final_text,
        "tool_responses": tool_responses,
        "conversation": conversation,
        "elapsed_time": elapsed_time
    }

Now let's try asking some questions about our MES:

In [14]:
# Question about machines
result = ask_mes_question("What is the status of each machine in the facility?")

Question: What is the status of each machine in the facility?
--------------------------------------------------
Tool request received: get_schema, ID: tooluse_xg3Mtjc8Q52wuNUeaCbX3Q
Retrieving fresh database schema
Schema retrieved in 4.71ms

Schema retrieved: 14 tables, 127 columns
Tool request received: execute_sql, ID: tooluse_IrFmZj9fR0aoT2sfLt1JQQ
Executing SQL query: SELECT 
  m.MachineID,
  m.Name,
  m.Type,
  m.Status,
  m.NominalCapacity,
  m.CapacityUOM,
  m.LastMaintenanceDate,
  m.NextMaintenanceDate
FROM Machines m
Query executed successfully: 17 rows returned in 6.09ms

Query results (17 rows):
   MachineID            Name            Type   Status  NominalCapacity  CapacityUOM LastMaintenanceDate NextMaintenanceDate
0          1  Machine Fra-10   Frame Welding     idle             5.18  frames/hour    2025-03-18 10:38    2025-03-26 03:38
1          2  Machine Fra-11   Frame Welding  running             6.56  frames/hour    2025-03-30 10:38    2025-04-06 08:38
2          

Let's try a more complex question that requires joining multiple tables:

In [15]:
# Question about work orders and products
result = ask_mes_question("Show me all completed work orders for eBike products")

Question: Show me all completed work orders for eBike products
--------------------------------------------------
Tool request received: get_schema, ID: tooluse_UFgOiFtkQwGGdoJeE4gWyA
Returning cached schema

Schema retrieved: 14 tables, 127 columns
Tool request received: execute_sql, ID: tooluse_P1UhdL4uQtuh_EiuRfBdgg
Executing SQL query: SELECT 
  wo.OrderID,
  p.Name AS ProductName,
  wo.Quantity,
  wo.ActualProduction,
  wo.ActualStartTime,
  wo.ActualEndTime
FROM WorkOrders wo
JOIN Products p ON wo.ProductID = p.ProductID
WHERE wo.Status = 'completed' AND p.Category = 'Electric Bikes'
Query executed successfully: 1146 rows returned in 18.68ms

Query results (1146 rows):
   OrderID ProductName  Quantity  ActualProduction   ActualStartTime     ActualEndTime
0        1  eBike C150        45                45  2025-04-03 10:45  2025-04-03 12:10
1        2  eBike C150        45                44  2025-04-03 13:39  2025-04-03 14:32
2        3  eBike C150        45                44  202

Let's try a question that requires time-based analysis:

In [16]:
# Question with time component
result = ask_mes_question("What work orders were completed in the past month?")

Question: What work orders were completed in the past month?
--------------------------------------------------


ThrottlingException: An error occurred (ThrottlingException) when calling the Converse operation (reached max retries: 4): Too many tokens, please wait before trying again.

## Comparing Different Models

Let's compare how different models handle the same question:

In [None]:
def compare_models(question, models=None):
    """
    Compare different models on the same question
    
    Parameters
    ----------
    question : str
        The question to ask
    models : list
        List of model IDs to compare
        
    Returns
    -------
    dict
        Dictionary of results by model
    """
    if models is None:
        models = [
            "anthropic.claude-3-haiku-20240307-v1:0",
            "anthropic.claude-3-sonnet-20240229-v1:0",
            "us.amazon.nova-lite-v1:0"
        ]
    
    results = {}
    
    for model_id in models:
        print(f"\n\n{'='*80}")
        print(f"Testing model: {model_id}")
        print(f"{'='*80}\n")
        
        try:
            result = ask_mes_question(question, model_id=model_id)
            results[model_id] = result
        except Exception as e:
            print(f"Error with model {model_id}: {e}")
            print("Make sure you have enabled access to this model in the AWS Bedrock console.")
            results[model_id] = {"error": str(e)}
    
    return results

# Let's compare models on an inventory-related question
comparison = compare_models("Which inventory items are below their reorder level?")

As you can see, when compared to when we first tested each model at the beginning of this notebook, by leveraging the `Converse` API, we do not have to modify our code when swapping between various model families.

## Building an End-to-End Workflow

Let's put everything together in a single function that simulates how the MES chatbot would work in a real-world application:

In [None]:
def mes_chatbot(question, model_id="anthropic.claude-3-haiku-20240307-v1:0", temperature=0.1):
    """
    Simulates the complete MES chatbot workflow
    
    Parameters
    ----------
    question : str
        The user's question about the MES
    model_id : str
        The model ID to use
    temperature : float
        The temperature parameter for the model
        
    Returns
    -------
    dict
        The complete result including all intermediary steps
    """
    print(f"🤖 MES Chatbot")
    print(f"==================")
    print(f"Model: {model_id}")
    print(f"Temperature: {temperature}")
    print(f"==================\n")
    
    print(f"User: {question}\n")
    
    # Start timer
    start_time = time.time()
    
    # Call the question answering function
    result = ask_mes_question(question, model_id, temperature)
    
    # Extract tool outputs for display
    sql_queries = []
    for tool_response in result["tool_responses"]:
        if tool_response["type"] == "execute_sql" and tool_response.get("success", False):
            sql_queries.append({
                "query": tool_response["sql_query"],
                "rows": tool_response.get("row_count", 0),
                "execution_time": tool_response.get("execution_time", 0)
            })
    
    # Print summary
    print("\n==================")
    print(f"✓ Total execution time: {result['elapsed_time']:.2f}s")
    print(f"✓ SQL queries executed: {len(sql_queries)}")
    
    for i, query in enumerate(sql_queries):
        print(f"  - Query {i+1}: {query['rows']} rows in {query['execution_time']:.2f}s")
    
    print("==================\n")
    
    return result

# Try with a business question
mes_chatbot("What's our overall defect rate across all products?")

🤖 MES Chatbot
Model: anthropic.claude-3-haiku-20240307-v1:0
Temperature: 0.1

User: What's our overall defect rate across all products?

Question: What's our overall defect rate across all products?
--------------------------------------------------
Tool request received: get_schema, ID: tooluse_eHryNZd9T0q9oDbRcOuAIw
Returning cached schema

Schema retrieved: 14 tables, 127 columns
Tool request received: execute_sql, ID: tooluse_-ELQygt1Smiv5lV1TEplww
Executing SQL query: SELECT 
  ROUND(AVG(DefectRate), 4) AS overall_defect_rate
FROM QualityControl;
Query executed successfully: 1 rows returned in 9.95ms

Query results (1 rows):
   overall_defect_rate
0                 0.03
--------------------------------------------------
Response (completed in 4.78s):
The query calculates the overall defect rate by taking the average of the DefectRate column from the QualityControl table. This provides the overall defect rate across all products and inspections.

The key steps are:

1. Select the D

{'question': "What's our overall defect rate across all products?",
 'response': 'The query calculates the overall defect rate by taking the average of the DefectRate column from the QualityControl table. This provides the overall defect rate across all products and inspections.\n\nThe key steps are:\n\n1. Select the DefectRate column from the QualityControl table.\n2. Use the AVG() aggregate function to calculate the average of the DefectRate values.\n3. Round the result to 4 decimal places to get the overall defect rate.\n\nThis gives us the overall defect rate of 0.0300 or 3.00% across all products.',
 'tool_responses': [{'type': 'get_schema',
   'data': {'total_tables': 14,
    'total_columns': 127,
    'schema': {'Products': {'columns': [{'name': 'ProductID',
        'type': 'INTEGER',
        'notnull': True,
        'pk': True},
       {'name': 'Name', 'type': 'VARCHAR', 'notnull': True, 'pk': False},
       {'name': 'Description',
        'type': 'VARCHAR',
        'notnull': F

Let's try one more complex question to see how the system handles it:

In [None]:
# Test with a complex question that requires analysis
mes_chatbot("Which work center has the highest production rate and what products are predominantly made there?")

🤖 MES Chatbot
Model: anthropic.claude-3-haiku-20240307-v1:0
Temperature: 0.1

User: Which work center has the highest production rate and what products are predominantly made there?

Question: Which work center has the highest production rate and what products are predominantly made there?
--------------------------------------------------
Tool request received: get_schema, ID: tooluse_xIoJYNmcRgKqfkVXvxdpuw
Returning cached schema

Schema retrieved: 14 tables, 127 columns
Tool request received: execute_sql, ID: tooluse_Nd-lN2tsTfu4ak_j5tpQCA
Executing SQL query: SELECT 
  wc.Name AS WorkCenterName,
  wc.Capacity AS WorkCenterCapacity,
  wc.CapacityUOM,
  p.Name AS ProductName,
  p.StandardProcessTime
FROM WorkCenters wc
JOIN WorkOrders wo ON wc.WorkCenterID = wo.WorkCenterID
JOIN Products p ON wo.ProductID = p.ProductID
GROUP BY wc.WorkCenterID
ORDER BY wc.Capacity DESC
LIMIT 1;
Query executed successfully: 1 rows returned in 7.49ms

Query results (1 rows):
     WorkCenterName  WorkCe

{'question': 'Which work center has the highest production rate and what products are predominantly made there?',
 'response': 'The work center with the highest production rate is the "Wheel Production" work center, which has a capacity of 30 wheels per hour. The predominant product made at this work center is Wheels, which have a standard process time of 1.46 hours.',
 'tool_responses': [{'type': 'get_schema',
   'data': {'total_tables': 14,
    'total_columns': 127,
    'schema': {'Products': {'columns': [{'name': 'ProductID',
        'type': 'INTEGER',
        'notnull': True,
        'pk': True},
       {'name': 'Name', 'type': 'VARCHAR', 'notnull': True, 'pk': False},
       {'name': 'Description',
        'type': 'VARCHAR',
        'notnull': False,
        'pk': False},
       {'name': 'Category', 'type': 'VARCHAR', 'notnull': False, 'pk': False},
       {'name': 'Cost', 'type': 'FLOAT', 'notnull': True, 'pk': False},
       {'name': 'StandardProcessTime',
        'type': 'FLOAT

## Conclusion

In this notebook, we've built a complete system for querying a Manufacturing Execution System (MES) using natural language. This approach leverages Amazon Bedrock's Converse API with tool-calling capabilities to:

1. Understand the user's question
2. Retrieve the database schema to understand available data
3. Generate appropriate SQL queries
4. Execute those queries against the database
5. Interpret the results and provide a helpful, natural language response

By using this approach:

- Users don't need to know SQL or understand the database schema
- The system can adapt to different questions without hardcoded queries
- Responses are contextual and focused on the user's actual question
- The approach can be extended to other industrial data stores

You can apply these techniques to build similar interfaces for other industrial data systems, combining them into a comprehensive solution that gives users natural language access to operational data across your organization.

### Next Steps

- Try modifying the system to connect to other data sources
- Experiment with different models and parameters
- Add visualization capabilities for query results
- Implement conversation history to allow for follow-up questions