diff --git a/examples/app-voice-agent/mcp_voice_client.py b/examples/app-voice-agent/mcp_voice_client.py index 60bf1be..3b58028 100644 --- a/examples/app-voice-agent/mcp_voice_client.py +++ b/examples/app-voice-agent/mcp_voice_client.py @@ -1158,7 +1158,7 @@ async def main(debug=False, language='en', voice_id=None, mcp_server_url=DEFAULT # Create stream manager stream_manager = BedrockStreamManager( model_id='amazon.nova-sonic-v1:0', - region='us-east-1', + region='eu-north-1', language=language, voice_id=voice_id, mcp_server_url=mcp_server_url, diff --git a/tests/mcp_bench/README.md b/tests/mcp_bench/README.md new file mode 100644 index 0000000..4f9f0de --- /dev/null +++ b/tests/mcp_bench/README.md @@ -0,0 +1,188 @@ +# MCP Performance Benchmark Tool + +A performance testing tool for MCP (Model Context Protocol) servers. + +## Quick Start + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Start Your MCP Server + +Ensure your MCP server is running. For example: +```bash +# Your server should be running at http://localhost:8001/mcp/ +``` + +### 3. Run Performance Test + +```bash +python run_perf_test.py configs/perf_test.json +``` + +## Configuration + +### Basic Configuration + +Create a JSON configuration file with server details and test streams: + +```json +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "stream_01", + "test_config": "configs/real_tools_test.json", + "duration": 30, + "loop": true + } + ] +} +``` + +### Test Cases Configuration + +Define which tools/methods to test in a separate JSON file: + +```json +{ + "test_cases": { + "tool_name": [ + { + "name": "test_name", + "parameters": { + "param1": "value1" + } + } + ] + } +} +``` + +## Available Test Configurations + +- `configs/perf_test.json` - 3 concurrent streams, 30 seconds each +- `configs/minimal_test.json` - Single stream, 5 seconds (quick test) +- `configs/load_test.json` - Heavy load test with multiple streams +- `configs/real_tools_test.json` - Tests with actual MCP tools + +## Example Commands + +### Quick Test (5 seconds) +```bash +python run_perf_test.py configs/minimal_test.json +``` + +### Standard Performance Test (30 seconds) +```bash +python run_perf_test.py configs/perf_test.json +``` + +### Verbose Output (see request/response details) +```bash +python run_perf_test.py configs/minimal_test.json --verbose +``` + +### Custom Server +```bash +# Edit config to point to your server: +# "host": "your-server.com", "port": 8080 +python run_perf_test.py your_config.json +``` + +## Output + +The tool provides: +- Real-time progress during testing +- Per-stream metrics (requests, success rate, response time) +- Overall performance summary +- Throughput in requests per second +- **Detailed JSON reports** saved to `var/mcp-bench/reports/` + +### Verbose Mode +With `--verbose` flag, you'll see: +- Each request's method and parameters +- Response content previews +- Success/failure status with timing +- Detailed error messages + +### Sample Output +``` +============================================================ +RESULTS +============================================================ + +Stream stream_01: + Requests: 1719 + Success Rate: 100.0% + Avg Response: 6.33ms + Throughput: 57.27 req/s + +OVERALL: + Total Requests: 5155 + Successful: 5155 + Failed: 0 + Success Rate: 100.0% +============================================================ + +📊 Detailed report saved to: var/mcp-bench/reports/perf_report_20250924_174226.json +``` + +## Architecture + +- `run_perf_test.py` - Main test runner +- `mcp_streamable_client.py` - MCP client implementation +- `configs/` - Test configuration files + +## Creating Custom Tests + +1. Create a test cases file with your tools: +```json +{ + "test_cases": { + "your_tool": [ + { + "name": "your_test", + "parameters": {} + } + ] + } +} +``` + +2. Create a main config pointing to your test: +```json +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "test", + "test_config": "path/to/your/test.json", + "duration": 10, + "loop": true + } + ] +} +``` + +3. Run the test: +```bash +python run_perf_test.py your_config.json +``` + +## Notes + +- The tool properly handles MCP session initialization +- Supports concurrent test streams +- Automatically discovers available tools on the server +- Measures response time and throughput +- Provides 100% success rate tracking \ No newline at end of file diff --git a/tests/mcp_bench/configs/error_test.json b/tests/mcp_bench/configs/error_test.json new file mode 100644 index 0000000..04b36a7 --- /dev/null +++ b/tests/mcp_bench/configs/error_test.json @@ -0,0 +1,18 @@ +{ + "test_cases": { + "base_readQuery": [ + { + "name": "invalid_query_test", + "parameters": { + "query": "this_is_wrong_parameter_name" + } + } + ], + "sec_rolePermissions": [ + { + "name": "missing_required_param", + "parameters": {} + } + ] + } +} \ No newline at end of file diff --git a/tests/mcp_bench/configs/error_test_config.json b/tests/mcp_bench/configs/error_test_config.json new file mode 100644 index 0000000..c0ad733 --- /dev/null +++ b/tests/mcp_bench/configs/error_test_config.json @@ -0,0 +1,14 @@ +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "error_test_01", + "test_config": "tests/mcp_bench/configs/error_test.json", + "duration": 5, + "loop": false + } + ] +} \ No newline at end of file diff --git a/tests/mcp_bench/configs/load_test.json b/tests/mcp_bench/configs/load_test.json new file mode 100644 index 0000000..ec783fa --- /dev/null +++ b/tests/mcp_bench/configs/load_test.json @@ -0,0 +1,26 @@ +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "stream_01", + "test_config": "tests/mcp_bench/configs/protocol_test.json", + "duration": 30, + "loop": true + }, + { + "stream_id": "stream_02", + "test_config": "tests/mcp_bench/configs/protocol_test.json", + "duration": 30, + "loop": true + }, + { + "stream_id": "stream_03", + "test_config": "tests/mcp_bench/configs/protocol_test.json", + "duration": 30, + "loop": true + } + ] +} \ No newline at end of file diff --git a/tests/mcp_bench/configs/minimal_test.json b/tests/mcp_bench/configs/minimal_test.json new file mode 100644 index 0000000..fca9fd8 --- /dev/null +++ b/tests/mcp_bench/configs/minimal_test.json @@ -0,0 +1,14 @@ +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "test_01", + "test_config": "tests/mcp_bench/configs/tool_tests.json", + "duration": 5, + "loop": false + } + ] +} \ No newline at end of file diff --git a/tests/mcp_bench/configs/perf_test.json b/tests/mcp_bench/configs/perf_test.json new file mode 100644 index 0000000..7f41ee0 --- /dev/null +++ b/tests/mcp_bench/configs/perf_test.json @@ -0,0 +1,26 @@ +{ + "server": { + "host": "localhost", + "port": 8001 + }, + "streams": [ + { + "stream_id": "stream_01", + "test_config": "tests/mcp_bench/configs/tool_tests.json", + "duration": 30, + "loop": true + }, + { + "stream_id": "stream_02", + "test_config": "tests/mcp_bench/configs/tool_tests.json", + "duration": 30, + "loop": true + }, + { + "stream_id": "stream_03", + "test_config": "tests/mcp_bench/configs/tool_tests.json", + "duration": 30, + "loop": true + } + ] +} \ No newline at end of file diff --git a/tests/mcp_bench/configs/tool_tests.json b/tests/mcp_bench/configs/tool_tests.json new file mode 100644 index 0000000..cc64a50 --- /dev/null +++ b/tests/mcp_bench/configs/tool_tests.json @@ -0,0 +1,27 @@ +{ + "test_cases": { + "base_databaseList": [ + { + "name": "database_list_test", + "parameters": {} + } + ], + "base_readQuery": [ + { + "name": "simple_query_test", + "parameters": { + "sql": "select 1" + } + } + ], + "base_tableDDL": [ + { + "name": "table_ddl_test", + "parameters": { + "database_name": "dbc", + "table_name": "tvm" + } + } + ] + } +} \ No newline at end of file diff --git a/tests/mcp_bench/mcp_streamable_client.py b/tests/mcp_bench/mcp_streamable_client.py new file mode 100644 index 0000000..b0354b0 --- /dev/null +++ b/tests/mcp_bench/mcp_streamable_client.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +""" +MCP Client for Performance Testing - MCP SDK Implementation +""" + +import asyncio +import json +import logging +import time +from datetime import datetime +from typing import Optional, Dict, Any, List +from dataclasses import dataclass, field +from pathlib import Path + +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +@dataclass +class ClientMetrics: + """Metrics collected for each client stream.""" + stream_id: str + start_time: float = 0 + end_time: float = 0 + total_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + request_times: List[float] = field(default_factory=list) + errors: List[Dict[str, Any]] = field(default_factory=list) + + @property + def avg_response_time(self) -> float: + return sum(self.request_times) / len(self.request_times) if self.request_times else 0 + + @property + def min_response_time(self) -> float: + return min(self.request_times) if self.request_times else 0 + + @property + def max_response_time(self) -> float: + return max(self.request_times) if self.request_times else 0 + + @property + def success_rate(self) -> float: + return (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 + + @property + def duration(self) -> float: + return self.end_time - self.start_time if self.end_time else time.time() - self.start_time + + @property + def requests_per_second(self) -> float: + return self.total_requests / self.duration if self.duration > 0 else 0 + + +class MCPStreamableClient: + """MCP SDK client for performance testing.""" + + def __init__( + self, + stream_id: str, + server_url: str, + test_config_path: str, + duration_seconds: int, + loop_tests: bool = False, + auth: Optional[Dict[str, str]] = None, + logger: Optional[logging.Logger] = None + ): + self.stream_id = stream_id + # Ensure we have the correct URL for MCP SDK + if not server_url.endswith('/'): + server_url += '/' + if not server_url.endswith('mcp/'): + server_url += 'mcp/' + self.server_url = server_url.rstrip('/') + self.test_config_path = Path(test_config_path) + self.duration_seconds = duration_seconds + self.loop_tests = loop_tests + self.auth = auth or {} + self.logger = logger or logging.getLogger(f"stream_{stream_id}") + + self.metrics = ClientMetrics(stream_id=stream_id) + self.test_cases: List[Dict[str, Any]] = [] + self._stop_event = asyncio.Event() + + async def message_handler(self, message): + """Handle incoming messages from the server (optional for basic usage).""" + pass + + async def load_test_config(self): + """Load test cases from configuration file.""" + try: + with open(self.test_config_path, 'r') as f: + config = json.load(f) + + # Extract test cases + if 'test_cases' in config: + for tool_name, cases in config['test_cases'].items(): + for case in cases: + self.test_cases.append({ + 'tool': tool_name, + 'name': case.get('name', f"{tool_name}_test"), + 'parameters': case.get('parameters', {}) + }) + elif 'tests' in config: + self.test_cases = config['tests'] + + self.logger.info(f"Loaded {len(self.test_cases)} test cases") + except Exception as e: + self.logger.error(f"Failed to load test config: {e}") + raise + + async def list_tools(self, session: ClientSession) -> List[str]: + """List available tools using MCP SDK.""" + try: + tools_result = await session.list_tools() + tool_names = [tool.name for tool in tools_result.tools] + self.logger.info(f"Available tools ({len(tool_names)}): {tool_names[:5]}") + return tool_names + except Exception as e: + self.logger.error(f"Failed to list tools: {e}") + return [] + + async def execute_test(self, session: ClientSession, test_case: Dict[str, Any]) -> Dict[str, Any]: + """Execute a single test case using MCP SDK.""" + tool_name = test_case['tool'] + test_name = test_case.get('name', 'unnamed') + parameters = test_case.get('parameters', {}) + + # Verbose logging - show request + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"=== REQUEST: {test_name} ===") + self.logger.debug(f"Tool: {tool_name}") + self.logger.debug(f"Arguments: {json.dumps(parameters, indent=2)}") + + start_time = time.time() + result = { + 'test_name': test_name, + 'tool': tool_name, + 'start_time': datetime.now().isoformat(), + 'success': False, + 'response_time': 0, + 'error': None, + 'response_data': None + } + + try: + # Use MCP SDK to call the tool + response = await session.call_tool(tool_name, arguments=parameters) + + response_time = time.time() - start_time + result['response_time'] = response_time + result['response_data'] = response.content + + # Check if response contains an error by examining content + is_error = False + error_message = None + + if response.content and len(response.content) > 0: + first_content = response.content[0] + if hasattr(first_content, 'text') and first_content.text: + # Check if the response text indicates an error + text = first_content.text.lower() + if ('error' in text and ('validation' in text or 'failed' in text or 'exception' in text)) or \ + 'input validation error' in text or \ + 'traceback' in text or \ + 'failed to connect' in text: + is_error = True + error_message = first_content.text[:200] + '...' if len(first_content.text) > 200 else first_content.text + + if is_error: + result['success'] = False + result['error'] = error_message + self.metrics.failed_requests += 1 + + # Verbose logging - show error response + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"=== RESPONSE: {test_name} ===") + self.logger.debug(f"Status: ERROR ({response_time:.3f}s)") + self.logger.debug(f"Error content: {error_message}") + else: + self.logger.error(f"✗ {test_name} failed: {error_message}") + else: + result['success'] = True + self.metrics.successful_requests += 1 + self.metrics.request_times.append(response_time) + + # Verbose logging - show successful response + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"=== RESPONSE: {test_name} ===") + self.logger.debug(f"Status: SUCCESS ({response_time:.3f}s)") + if response.content and len(response.content) > 0: + first_content = response.content[0] + if hasattr(first_content, 'text'): + preview = (first_content.text[:200] + '...') if len(first_content.text) > 200 else first_content.text + self.logger.debug(f"Content preview: {preview}") + else: + self.logger.debug(f"Content type: {type(first_content)}") + else: + self.logger.debug("Empty content") + else: + self.logger.info(f"✓ {test_name} succeeded in {response_time:.3f}s") + + except Exception as e: + response_time = time.time() - start_time + result['response_time'] = response_time + result['error'] = str(e) + self.metrics.failed_requests += 1 + + # Verbose logging - show error response + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"=== RESPONSE: {test_name} ===") + self.logger.debug(f"Status: ERROR ({response_time:.3f}s)") + self.logger.debug(f"Error: {e}") + else: + self.logger.error(f"✗ {test_name} failed: {e}") + + finally: + self.metrics.total_requests += 1 + + return result + + async def run_test_loop(self, session: ClientSession): + """Run test cases in a loop until duration expires.""" + self.metrics.start_time = time.time() + end_time = self.metrics.start_time + self.duration_seconds + test_index = 0 + test_results = [] + + self.logger.info(f"Starting test loop for {self.duration_seconds} seconds") + + while time.time() < end_time and not self._stop_event.is_set(): + if test_index >= len(self.test_cases): + if self.loop_tests: + test_index = 0 + else: + self.logger.info("All tests completed, stopping stream") + break + + test_case = self.test_cases[test_index] + result = await self.execute_test(session, test_case) + test_results.append(result) + + test_index += 1 + + # Small delay to prevent overwhelming the server + await asyncio.sleep(0.01) + + self.metrics.end_time = time.time() + self.logger.info(f"Test loop completed. Duration: {self.metrics.duration:.2f}s, " + f"Tests executed: {len(test_results)}, " + f"Successful: {self.metrics.successful_requests}, " + f"Failed: {self.metrics.failed_requests}") + + return test_results + + async def run(self): + """Main run method using MCP SDK.""" + try: + # Load test configuration + await self.load_test_config() + + self.logger.info(f"Connecting to {self.server_url}") + + # Use MCP SDK streamablehttp_client - much simpler! + async with streamablehttp_client(self.server_url, headers=self.auth) as streams: + read_stream, write_stream, get_session_id_callback = streams + + async with ClientSession( + read_stream, + write_stream, + message_handler=self.message_handler + ) as session: + # Initialize session - SDK handles all the protocol details + await session.initialize() + self.logger.info(f"Session initialized successfully") + + # List available tools + tools = await self.list_tools(session) + + # Run test loop + test_results = await self.run_test_loop(session) + + self.logger.info(f"Stream {self.stream_id} completed successfully") + return test_results + + except Exception as e: + self.logger.error(f"Stream {self.stream_id} failed: {e}") + raise + + finally: + self.logger.info(f"Stream {self.stream_id} finished") + + def stop(self): + """Signal the client to stop.""" + self._stop_event.set() + + def get_metrics(self) -> Dict[str, Any]: + """Get current metrics as dictionary.""" + return { + 'stream_id': self.metrics.stream_id, + 'duration': self.metrics.duration, + 'total_requests': self.metrics.total_requests, + 'successful_requests': self.metrics.successful_requests, + 'failed_requests': self.metrics.failed_requests, + 'success_rate': self.metrics.success_rate, + 'avg_response_time': self.metrics.avg_response_time, + 'min_response_time': self.metrics.min_response_time, + 'max_response_time': self.metrics.max_response_time, + 'requests_per_second': self.metrics.requests_per_second, + 'errors': self.metrics.errors + } \ No newline at end of file diff --git a/tests/mcp_bench/requirements.txt b/tests/mcp_bench/requirements.txt new file mode 100644 index 0000000..a632c2a --- /dev/null +++ b/tests/mcp_bench/requirements.txt @@ -0,0 +1,2 @@ +mcp +httpx>=0.24.0 \ No newline at end of file diff --git a/tests/mcp_bench/run_perf_test.py b/tests/mcp_bench/run_perf_test.py new file mode 100644 index 0000000..428e154 --- /dev/null +++ b/tests/mcp_bench/run_perf_test.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +"""Simple MCP Performance Test Runner""" + +import asyncio +import json +import logging +import sys +import time +from pathlib import Path +from datetime import datetime +from typing import Dict, List +from mcp_streamable_client import MCPStreamableClient + + +def load_config(config_file: str) -> Dict: + with open(config_file, 'r') as f: + return json.load(f) + + +async def run_test(config_file: str, verbose: bool = False): + config = load_config(config_file) + + # Setup logging with verbose flag + log_level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + server = config['server'] + server_url = f"http://{server['host']}:{server['port']}" + + print(f"\n{'='*60}") + print(f"MCP PERFORMANCE TEST") + print(f"Server: {server_url}") + print(f"Streams: {len(config['streams'])}") + print(f"{'='*60}\n") + + test_start_time = time.time() + + # Create and run clients + clients = [] + for stream_config in config['streams']: + client = MCPStreamableClient( + stream_id=stream_config.get('stream_id', 'test'), + server_url=server_url, + test_config_path=stream_config['test_config'], + duration_seconds=stream_config.get('duration', 10), + loop_tests=stream_config.get('loop', False), + auth=stream_config.get('auth') + ) + clients.append(client) + + # Run all clients with staggered starts to avoid initialization conflicts + tasks = [] + for i, client in enumerate(clients): + # Stagger starts by 0.5 seconds + async def run_with_delay(c, delay): + await asyncio.sleep(delay) + return await c.run() + + tasks.append(run_with_delay(client, i * 0.5)) + + await asyncio.gather(*tasks) + test_end_time = time.time() + + # Collect all metrics + all_metrics = [] + total_requests = 0 + total_successful = 0 + total_failed = 0 + total_duration = test_end_time - test_start_time + + for client in clients: + metrics = client.get_metrics() + all_metrics.append(metrics) + total_requests += metrics['total_requests'] + total_successful += metrics['successful_requests'] + total_failed += metrics['failed_requests'] + + # Display results + print(f"\n{'='*60}") + print(f"RESULTS") + print(f"{'='*60}") + + for metrics in all_metrics: + print(f"\nStream {metrics['stream_id']}:") + print(f" Requests: {metrics['total_requests']}") + print(f" Success Rate: {metrics['success_rate']:.1f}%") + print(f" Avg Response: {metrics['avg_response_time']*1000:.2f}ms") + print(f" Throughput: {metrics['requests_per_second']:.2f} req/s") + + print(f"\nOVERALL:") + print(f" Total Requests: {total_requests}") + print(f" Successful: {total_successful}") + print(f" Failed: {total_failed}") + if total_requests > 0: + print(f" Success Rate: {(total_successful/total_requests*100):.1f}%") + print(f" Overall Throughput: {total_requests/total_duration:.2f} req/s") + print(f"{'='*60}\n") + + # Generate detailed report + generate_report(config, all_metrics, test_start_time, test_end_time, total_duration) + + +def generate_report(config: Dict, metrics_list: List[Dict], start_time: float, end_time: float, duration: float): + """Generate detailed performance report.""" + # Create reports directory + reports_dir = Path("var/mcp-bench/reports") + reports_dir.mkdir(parents=True, exist_ok=True) + + # Generate timestamp for report file + timestamp = datetime.fromtimestamp(start_time).strftime("%Y%m%d_%H%M%S") + report_file = reports_dir / f"perf_report_{timestamp}.json" + + # Aggregate metrics + total_requests = sum(m['total_requests'] for m in metrics_list) + total_successful = sum(m['successful_requests'] for m in metrics_list) + total_failed = sum(m['failed_requests'] for m in metrics_list) + + # Calculate aggregate response time + all_response_times = [] + for metrics in metrics_list: + if metrics['avg_response_time'] > 0: + all_response_times.append(metrics['avg_response_time']) + avg_response_time = sum(all_response_times) / len(all_response_times) if all_response_times else 0 + + # Build detailed report + report_data = { + "timestamp": datetime.fromtimestamp(start_time).isoformat(), + "test_duration": duration, + "configuration": config, + "summary": { + "total_requests": total_requests, + "successful_requests": total_successful, + "failed_requests": total_failed, + "success_rate": (total_successful / total_requests * 100) if total_requests > 0 else 0, + "avg_response_time_ms": avg_response_time * 1000, + "overall_throughput_rps": total_requests / duration if duration > 0 else 0 + }, + "streams": [] + } + + # Add per-stream details + for metrics in metrics_list: + stream_data = { + "stream_id": metrics['stream_id'], + "metrics": { + "total_requests": metrics['total_requests'], + "successful_requests": metrics['successful_requests'], + "failed_requests": metrics['failed_requests'], + "success_rate": metrics['success_rate'], + "avg_response_time_ms": metrics['avg_response_time'] * 1000, + "min_response_time_ms": metrics['min_response_time'] * 1000, + "max_response_time_ms": metrics['max_response_time'] * 1000, + "throughput_rps": metrics['requests_per_second'], + "duration": metrics['duration'] + } + } + report_data["streams"].append(stream_data) + + # Save report to file + with open(report_file, 'w') as f: + json.dump(report_data, f, indent=2) + + print(f"📊 Detailed report saved to: {report_file}") + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="MCP Performance Test Runner") + parser.add_argument("config", help="Configuration file path") + parser.add_argument("-v", "--verbose", action="store_true", + help="Show detailed request/response information") + args = parser.parse_args() + + try: + asyncio.run(run_test(args.config, args.verbose)) + except KeyboardInterrupt: + print("\nTest interrupted") + except Exception as e: + print(f"Error: {e}") + sys.exit(1) \ No newline at end of file