# 🔌 API Connectivity Testing

## Purpose
This notebook tests the MGPT API connectivity step-by-step to debug connection issues, validate API responses, and ensure proper communication with your model server.

## What This Tests
- Configuration loading and validation
- Basic API connectivity (health checks)
- Individual endpoint testing (`/embeddings`, `/embeddings_batch`, `/generate`, `/generate_batch`)
- Error handling and retry mechanisms
- Response format validation
- Performance and timeout testing

In [None]:
# Import required libraries
import sys
import os
import yaml
import json
import time
import requests
from pathlib import Path
import pandas as pd

# Add the mgpt_eval directory to Python path to import pipeline modules
mgpt_eval_path = Path.cwd().parent if Path.cwd().name == 'examples' else Path.cwd()
sys.path.insert(0, str(mgpt_eval_path))

# Import actual pipeline modules
from models.config_models import PipelineConfig
from utils.logging_utils import setup_logging

print(f"✅ Working directory: {Path.cwd()}")
print(f"✅ MGPT-eval path: {mgpt_eval_path}")
print(f"✅ Imports successful")

## Step 1: Load Configuration
First, we'll load a test configuration and validate it using the actual config validation code.

In [None]:
# Load configuration from template
config_path = mgpt_eval_path / "configs" / "templates" / "04_full_pipeline.yaml"

print(f"📄 Loading configuration from: {config_path}")

with open(config_path, 'r') as f:
    config_dict = yaml.safe_load(f)

print("\n🔧 Raw configuration loaded:")
print(json.dumps({
    'job_name': config_dict.get('job', {}).get('name'),
    'model_api_url': config_dict.get('model_api', {}).get('base_url'),
    'batch_size': config_dict.get('model_api', {}).get('batch_size'),
    'timeout': config_dict.get('model_api', {}).get('timeout')
}, indent=2))

In [None]:
# Validate configuration using actual PipelineConfig model
try:
    config = PipelineConfig(**config_dict)
    print("✅ Configuration validation successful")
    
    # Extract API configuration
    api_config = config.model_api
    print(f"\n🌐 API Configuration:")
    print(f"  Base URL: {api_config.base_url}")
    print(f"  Batch size: {api_config.batch_size}")
    print(f"  Timeout: {api_config.timeout}s")
    print(f"  Max retries: {api_config.max_retries}")
    
except Exception as e:
    print(f"❌ Configuration validation failed: {e}")
    # You can modify the config_dict here to fix issues
    print("\n💡 Tip: Check the configuration format and required fields")

## Step 2: Basic Connectivity Test
Test basic HTTP connectivity to the API server.

In [None]:
# Override API URL for testing (modify as needed)
# Uncomment and modify the line below to test with your actual API
# api_config.base_url = "http://localhost:8000"  # Your actual API URL

base_url = api_config.base_url
timeout = api_config.timeout

print(f"🔍 Testing connectivity to: {base_url}")

# Test basic connectivity
try:
    start_time = time.time()
    response = requests.get(f"{base_url}/health", timeout=10)
    response_time = time.time() - start_time
    
    print(f"✅ Connection successful!")
    print(f"   Status code: {response.status_code}")
    print(f"   Response time: {response_time:.2f}s")
    print(f"   Response headers: {dict(response.headers)}")
    
    # Try to parse response
    try:
        response_data = response.json()
        print(f"   Response data: {json.dumps(response_data, indent=2)}")
    except:
        print(f"   Response text: {response.text[:200]}...")
        
except requests.exceptions.ConnectionError as e:
    print(f"❌ Connection failed: {e}")
    print("\n💡 Troubleshooting tips:")
    print("   1. Check if the server is running")
    print("   2. Verify the URL and port")
    print("   3. Check firewall/network settings")
    
except requests.exceptions.Timeout as e:
    print(f"❌ Connection timeout: {e}")
    print("\n💡 Troubleshooting tips:")
    print("   1. Increase timeout value")
    print("   2. Check server performance")
    
except Exception as e:
    print(f"❌ Unexpected error: {e}")

## Step 3: Test Individual Endpoints
Test each API endpoint individually with sample data.

In [None]:
# Sample medical claims data for testing
sample_claims = [
    "N6320 G0378 |eoc| Z91048 M1710",
    "E119 76642 |eoc| K9289 O0903",
    "I10 E785 |eoc| Z1239 M549"
]

print("🧪 Sample claims for testing:")
for i, claim in enumerate(sample_claims, 1):
    print(f"  {i}. {claim}")

In [None]:
# Test /embeddings endpoint (single embedding)
def test_embeddings_endpoint():
    endpoint = f"{base_url}/embeddings"
    payload = {"text": sample_claims[0]}
    
    print(f"\n🔬 Testing /embeddings endpoint")
    print(f"   URL: {endpoint}")
    print(f"   Payload: {payload}")
    
    try:
        start_time = time.time()
        response = requests.post(
            endpoint,
            json=payload,
            timeout=timeout,
            headers={"Content-Type": "application/json"}
        )
        response_time = time.time() - start_time
        
        print(f"   ✅ Status: {response.status_code}")
        print(f"   ⏱️  Response time: {response_time:.2f}s")
        
        if response.status_code == 200:
            data = response.json()
            if 'embedding' in data:
                embedding = data['embedding']
                print(f"   📊 Embedding received: {len(embedding)} dimensions")
                print(f"   📊 Sample values: {embedding[:5]}...")
                return True, data
            else:
                print(f"   ❌ No 'embedding' field in response: {data}")
                return False, data
        else:
            print(f"   ❌ Error response: {response.text}")
            return False, None
            
    except Exception as e:
        print(f"   ❌ Request failed: {e}")
        return False, None

# Run the test
embeddings_success, embeddings_data = test_embeddings_endpoint()

In [None]:
# Test /embeddings_batch endpoint (multiple embeddings)
def test_embeddings_batch_endpoint():
    endpoint = f"{base_url}/embeddings_batch"
    payload = {"texts": sample_claims}
    
    print(f"\n🔬 Testing /embeddings_batch endpoint")
    print(f"   URL: {endpoint}")
    print(f"   Payload: {len(sample_claims)} texts")
    
    try:
        start_time = time.time()
        response = requests.post(
            endpoint,
            json=payload,
            timeout=timeout,
            headers={"Content-Type": "application/json"}
        )
        response_time = time.time() - start_time
        
        print(f"   ✅ Status: {response.status_code}")
        print(f"   ⏱️  Response time: {response_time:.2f}s")
        print(f"   📊 Throughput: {len(sample_claims)/response_time:.1f} texts/sec")
        
        if response.status_code == 200:
            data = response.json()
            if 'embeddings' in data:
                embeddings = data['embeddings']
                print(f"   📊 Embeddings received: {len(embeddings)} embeddings")
                print(f"   📊 Embedding dimensions: {len(embeddings[0])}")
                print(f"   📊 Sample values: {embeddings[0][:3]}...")
                return True, data
            else:
                print(f"   ❌ No 'embeddings' field in response: {data}")
                return False, data
        else:
            print(f"   ❌ Error response: {response.text}")
            return False, None
            
    except Exception as e:
        print(f"   ❌ Request failed: {e}")
        return False, None

# Run the test
batch_embeddings_success, batch_embeddings_data = test_embeddings_batch_endpoint()

In [None]:
# Test /generate endpoint (single text generation)
def test_generate_endpoint():
    endpoint = f"{base_url}/generate"
    payload = {
        "prompt": sample_claims[0],
        "max_new_tokens": 50,
        "temperature": 0.8,
        "top_k": 50
    }
    
    print(f"\n🔬 Testing /generate endpoint")
    print(f"   URL: {endpoint}")
    print(f"   Prompt: {payload['prompt']}")
    print(f"   Parameters: max_tokens={payload['max_new_tokens']}, temp={payload['temperature']}")
    
    try:
        start_time = time.time()
        response = requests.post(
            endpoint,
            json=payload,
            timeout=timeout,
            headers={"Content-Type": "application/json"}
        )
        response_time = time.time() - start_time
        
        print(f"   ✅ Status: {response.status_code}")
        print(f"   ⏱️  Response time: {response_time:.2f}s")
        
        if response.status_code == 200:
            data = response.json()
            if 'generated_text' in data:
                generated_text = data['generated_text']
                print(f"   📝 Generated text: {generated_text}")
                print(f"   📊 Length: {len(generated_text.split())} tokens")
                return True, data
            else:
                print(f"   ❌ No 'generated_text' field in response: {data}")
                return False, data
        else:
            print(f"   ❌ Error response: {response.text}")
            return False, None
            
    except Exception as e:
        print(f"   ❌ Request failed: {e}")
        return False, None

# Run the test
generate_success, generate_data = test_generate_endpoint()

In [None]:
# Test /generate_batch endpoint (multiple text generations)
def test_generate_batch_endpoint():
    endpoint = f"{base_url}/generate_batch"
    payload = {
        "prompts": sample_claims[:2],  # Test with 2 prompts
        "max_new_tokens": 50,
        "temperature": 0.8,
        "top_k": 50,
        "num_return_sequences": 3  # Generate 3 sequences per prompt
    }
    
    print(f"\n🔬 Testing /generate_batch endpoint")
    print(f"   URL: {endpoint}")
    print(f"   Prompts: {len(payload['prompts'])} prompts")
    print(f"   Sequences per prompt: {payload['num_return_sequences']}")
    
    try:
        start_time = time.time()
        response = requests.post(
            endpoint,
            json=payload,
            timeout=timeout * 2,  # Longer timeout for batch generation
            headers={"Content-Type": "application/json"}
        )
        response_time = time.time() - start_time
        
        print(f"   ✅ Status: {response.status_code}")
        print(f"   ⏱️  Response time: {response_time:.2f}s")
        
        if response.status_code == 200:
            data = response.json()
            if 'generated_texts' in data:
                generated_texts = data['generated_texts']
                print(f"   📝 Generated texts: {len(generated_texts)} prompt groups")
                
                for i, prompt_generations in enumerate(generated_texts):
                    print(f"   📝 Prompt {i+1}: {len(prompt_generations)} generations")
                    for j, text in enumerate(prompt_generations[:2]):  # Show first 2
                        print(f"      {j+1}. {text[:100]}...")
                        
                return True, data
            else:
                print(f"   ❌ No 'generated_texts' field in response: {data}")
                return False, data
        else:
            print(f"   ❌ Error response: {response.text}")
            return False, None
            
    except Exception as e:
        print(f"   ❌ Request failed: {e}")
        return False, None

# Run the test
batch_generate_success, batch_generate_data = test_generate_batch_endpoint()

## Step 4: Test Error Handling and Retry Logic
Test how the API handles various error conditions.

In [None]:
# Test error handling scenarios
def test_error_scenarios():
    print("\n🧪 Testing error handling scenarios:")
    
    # Test 1: Invalid endpoint
    print("\n1. Testing invalid endpoint...")
    try:
        response = requests.get(f"{base_url}/invalid_endpoint", timeout=10)
        print(f"   Status: {response.status_code}")
        print(f"   Response: {response.text[:100]}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    # Test 2: Invalid JSON payload
    print("\n2. Testing invalid JSON payload...")
    try:
        response = requests.post(
            f"{base_url}/embeddings",
            json={"invalid_field": "test"},
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        print(f"   Response: {response.text[:200]}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    # Test 3: Empty text
    print("\n3. Testing empty text...")
    try:
        response = requests.post(
            f"{base_url}/embeddings",
            json={"text": ""},
            timeout=10
        )
        print(f"   Status: {response.status_code}")
        print(f"   Response: {response.text[:200]}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    # Test 4: Very large batch
    print("\n4. Testing very large batch...")
    large_batch = ["E119 I10 N6320"] * 1000  # 1000 identical claims
    try:
        start_time = time.time()
        response = requests.post(
            f"{base_url}/embeddings_batch",
            json={"texts": large_batch},
            timeout=30  # Longer timeout for large batch
        )
        response_time = time.time() - start_time
        print(f"   Status: {response.status_code}")
        print(f"   Response time: {response_time:.2f}s")
        if response.status_code == 200:
            data = response.json()
            print(f"   Embeddings received: {len(data.get('embeddings', []))}")
        else:
            print(f"   Response: {response.text[:200]}")
    except Exception as e:
        print(f"   ❌ Error: {e}")

test_error_scenarios()

## Step 5: Performance Testing
Test API performance with different batch sizes and measure throughput.

In [None]:
# Performance testing with different batch sizes
def test_performance():
    print("\n⚡ Performance testing:")
    
    batch_sizes = [1, 5, 10, 20, 32]
    performance_results = []
    
    for batch_size in batch_sizes:
        print(f"\n📊 Testing batch size: {batch_size}")
        
        # Create test batch
        test_batch = (sample_claims * ((batch_size // len(sample_claims)) + 1))[:batch_size]
        
        try:
            start_time = time.time()
            response = requests.post(
                f"{base_url}/embeddings_batch",
                json={"texts": test_batch},
                timeout=timeout
            )
            response_time = time.time() - start_time
            
            if response.status_code == 200:
                throughput = batch_size / response_time
                avg_time_per_item = response_time / batch_size
                
                result = {
                    'batch_size': batch_size,
                    'response_time': response_time,
                    'throughput': throughput,
                    'avg_time_per_item': avg_time_per_item,
                    'status': 'success'
                }
                
                print(f"   ✅ Success: {response_time:.2f}s, {throughput:.1f} items/sec")
            else:
                result = {
                    'batch_size': batch_size,
                    'status': 'failed',
                    'error': response.text[:100]
                }
                print(f"   ❌ Failed: {response.status_code}")
                
        except Exception as e:
            result = {
                'batch_size': batch_size,
                'status': 'error',
                'error': str(e)
            }
            print(f"   ❌ Error: {e}")
        
        performance_results.append(result)
        
        # Brief pause between tests
        time.sleep(1)
    
    return performance_results

# Run performance tests
if batch_embeddings_success:  # Only run if basic batch endpoint works
    perf_results = test_performance()
    
    # Display results summary
    print("\n📈 Performance Summary:")
    successful_results = [r for r in perf_results if r['status'] == 'success']
    
    if successful_results:
        best_throughput = max(successful_results, key=lambda x: x['throughput'])
        print(f"   🏆 Best throughput: {best_throughput['throughput']:.1f} items/sec (batch size {best_throughput['batch_size']})")
        
        fastest_response = min(successful_results, key=lambda x: x['avg_time_per_item'])
        print(f"   ⚡ Fastest per item: {fastest_response['avg_time_per_item']:.3f}s (batch size {fastest_response['batch_size']})")
else:
    print("\n⚠️  Skipping performance tests (batch endpoint not working)")

## Step 6: Test Retry Logic
Test the retry mechanism by simulating network issues.

In [None]:
# Test retry logic with timeout simulation
def test_retry_logic():
    print("\n🔄 Testing retry logic:")
    
    max_retries = api_config.max_retries
    
    for attempt in range(max_retries + 1):
        print(f"\n   Attempt {attempt + 1}/{max_retries + 1}:")
        
        try:
            # Use very short timeout to simulate network issues
            response = requests.post(
                f"{base_url}/embeddings",
                json={"text": sample_claims[0]},
                timeout=0.001  # Very short timeout to force failure
            )
            
            if response.status_code == 200:
                print(f"      ✅ Success on attempt {attempt + 1}")
                break
            else:
                print(f"      ⚠️  HTTP {response.status_code} on attempt {attempt + 1}")
                
        except requests.exceptions.Timeout:
            print(f"      ⏱️  Timeout on attempt {attempt + 1}")
            if attempt < max_retries:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"      🔄 Waiting {wait_time}s before retry...")
                time.sleep(wait_time)
            else:
                print(f"      ❌ Max retries ({max_retries}) exceeded")
                
        except Exception as e:
            print(f"      ❌ Error on attempt {attempt + 1}: {e}")
            break
    
    print("\n💡 In production, the pipeline implements exponential backoff with jitter")
    print("   to handle temporary network issues gracefully.")

# Run retry test
test_retry_logic()

## Step 7: Summary and Recommendations
Analyze all test results and provide recommendations.

In [None]:
# Summarize test results
print("\n📋 API Connectivity Test Summary:")
print("=" * 50)

# Test results summary
tests = [
    ("Single Embedding", embeddings_success),
    ("Batch Embeddings", batch_embeddings_success),
    ("Single Generation", generate_success),
    ("Batch Generation", batch_generate_success)
]

passed = sum(1 for _, success in tests if success)
total = len(tests)

print(f"\n🎯 Overall Success Rate: {passed}/{total} ({passed/total*100:.0f}%)")

for test_name, success in tests:
    status = "✅ PASS" if success else "❌ FAIL"
    print(f"   {status} {test_name}")

# Recommendations
print("\n💡 Recommendations:")

if passed == total:
    print("   🎉 All tests passed! Your API is ready for pipeline integration.")
    print("   ✅ Proceed to test the embedding pipeline with this configuration.")
elif passed >= total * 0.5:
    print("   ⚠️  Some tests failed. Check the specific endpoint issues above.")
    print("   🔧 Focus on fixing the failed endpoints before running the full pipeline.")
else:
    print("   🚨 Major connectivity issues detected.")
    print("   🔧 Check server status, URL configuration, and network connectivity.")

print("\n🔧 Configuration for pipeline:")
print(f"   model_api:")
print(f"     base_url: \"{base_url}\"")
print(f"     batch_size: {api_config.batch_size}")
print(f"     timeout: {api_config.timeout}")
print(f"     max_retries: {api_config.max_retries}")

if 'perf_results' in locals() and successful_results:
    optimal_batch = best_throughput['batch_size']
    print(f"\n⚡ Performance optimization:")
    print(f"   💡 Consider using batch_size: {optimal_batch} for optimal throughput")
    print(f"   📊 Expected throughput: {best_throughput['throughput']:.1f} items/sec")

print("\n📚 Next steps:")
print("   1. Fix any failed endpoint tests")
print("   2. Run test_02_Data_Loading_Validation.ipynb")
print("   3. Then proceed to test_03_Embedding_Pipeline_Debug.ipynb")

## 🔧 Debug Cell (Run if needed)
Use this cell to test specific scenarios or debug issues found above.

In [None]:
# Debug cell - modify as needed for specific testing

# Example: Test with custom URL
# custom_url = "http://your-server:8000"
# response = requests.get(f"{custom_url}/health", timeout=10)
# print(f"Custom URL test: {response.status_code}")

# Example: Test with custom payload
# custom_payload = {"text": "Your custom medical claim here"}
# response = requests.post(f"{base_url}/embeddings", json=custom_payload, timeout=30)
# print(f"Custom payload test: {response.status_code}")

# Example: Check response headers for debugging
# if 'response' in locals():
#     print("Response headers:")
#     for key, value in response.headers.items():
#         print(f"  {key}: {value}")

print("💡 Use this cell to run custom tests and debug specific issues.")