## Imports

Imports the necessary libraries to perform the tests

In [1]:
import json
import requests
from typing import Dict

## Definition of Constants

In [2]:
TEST_CASES_PATH = "../../assets/endpoint_test_cases/tests.json"
BASE_URL = "http://localhost:8000/"
PROCESS_URL = f"{BASE_URL}extract_incident"
HEALTH_URL = f"{BASE_URL}health"
HEADERS = {"Content-Type": "application/json"}

## Reading the Mock Data

Reads the json file containing the mock data for testing.

In [3]:
# Read the JSON file containing the example messages to parse
with open(TEST_CASES_PATH, 'r', encoding='utf-8') as file:
    data = json.load(file)

test_cases_data = data["test_cases"]

## Requesting the Agentic Workflow Endpoint(s)

In [4]:

def process_requests(
    headers: Dict[str, str] = HEADERS,
    url: str = PROCESS_URL,
    health_url: str = HEALTH_URL,
):
    # Begins by testing the health of the remote server
    health_reply = requests.get(health_url).json()
    print("Remove server health check:")
    print(f"{health_reply}")
    print("-" * 80)
    print("Starting the requests for the agentic workflow...")
    print("-" * 80)
    for case in test_cases_data:
        input_text = case['input']
        payload = {"base_text": input_text}
        
        try:
            response = requests.post(url, json=payload, headers=headers)
            response.raise_for_status()  # Raise an exception for HTTP errors
            result = response.json()
        except requests.exceptions.RequestException as e:
            print(f"Request failed for input: {input_text}")
            print(f"Error: {e}")
            result = None
        
        print("Input text:", input_text)
        print("Returned answer:", result)
        print("Expected output:", {'status': 'success', 'message': case['output']})
        print("-" * 80)


process_requests()

Remove server health check:
{'status': 'healthy', 'workflow_initialized': True}
--------------------------------------------------------------------------------
Starting the requests for the agentic workflow...
--------------------------------------------------------------------------------
Input text: Na última segunda-feira (10/03) às 09:30, na filial do Rio de Janeiro, ocorreu um vazamento de dados sensíveis que comprometeu informações de clientes por aproximadamente 5 horas.
Returned answer: {'status': 'success', 'message': {'data_ocorrencia': '2025-03-10 09:30', 'local': 'Rio de Janeiro', 'tipo_incidente': 'Vazamento de dados sensíveis', 'impacto': 'Informações de clientes comprometidas por aproximadamente 5 horas'}}
Expected output: {'status': 'success', 'message': {'data_ocorrencia': '2025-03-10 09:30', 'local': 'Rio de Janeiro', 'tipo_incidente': 'Vazamento de dados', 'impacto': 'Informações de clientes comprometidas por 5 horas'}}
----------------------------------------------