# Azure Backend API Testing

This notebook allows direct testing of the Azure backend API endpoints.

## MMLU Testing Cell

In [None]:
# MMLU Complete Dataset Benchmarking with Adaptive Backend
import requests
import json
import time
import pandas as pd
from datetime import datetime
import random
from datasets import load_dataset

# Configuration
BASE_URL = "https://backend-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io"


def run_complete_mmlu_benchmark():
    """
    Run MMLU benchmark against the adaptive backend on the COMPLETE dataset
    Tests ALL 59 subjects with ALL questions (approximately 14,000+ questions)
    
    WARNING: This will take several hours to complete!
    """
    print("=== COMPLETE MMLU DATASET BENCHMARKING ===")
    print("🚨 WARNING: This will test ~14,000 questions across 59 subjects!")
    print("⏰ Estimated time: 3-6 hours depending on response times")
    print("💰 Cost: Significant API usage costs\n")
    
    # Confirmation
    print("⚡ Starting in 10 seconds... (stop the cell if you want to cancel)")
    time.sleep(10)
    
    # Load MMLU dataset
    print("📚 Loading complete MMLU dataset...")
    try:
        dataset = load_dataset("cais/mmlu", "all")
        test_data = dataset["test"]
        print(f"✅ Loaded {len(test_data)} total test questions")
    except Exception as e:
        print(f"❌ Failed to load dataset: {e}")
        return
    
    # Get all subjects
    all_subjects = list(set(test_data["subject"]))
    all_subjects.sort()  # Sort for consistent order
    print(f"📖 Testing ALL {len(all_subjects)} subjects")
    print(f"📋 Subjects: {', '.join(all_subjects[:10])}{'...' if len(all_subjects) > 10 else ''}")
    
    results = []
    total_questions = 0
    correct_answers = 0
    start_time = datetime.now()
    
    headers = {"Content-Type": "application/json"}
    if API_KEY != "your-api-key":
        headers["X-Stainless-API-Key"] = API_KEY
    
    # Process each subject
    for subject_idx, subject in enumerate(all_subjects, 1):
        print(f"\n🔬 [{subject_idx}/{len(all_subjects)}] Testing subject: {subject}")
        
        # Filter questions for this subject
        subject_questions = [q for q in test_data if q["subject"] == subject]
        subject_total = len(subject_questions)
        subject_correct = 0
        
        print(f"   📊 {subject_total} questions in this subject")
        
        for i, question in enumerate(subject_questions):
            if (i + 1) % 10 == 0 or i == 0:  # Progress indicator
                print(f"   Question {i+1}/{subject_total}...", end=" ")
            
            # Format question for the API
            choices = [question["choices"][j] for j in range(len(question["choices"]))]
            question_text = f"""Question: {question['question']}
            
A) {choices[0]}
B) {choices[1]}
C) {choices[2]}
D) {choices[3]}

Please answer with only the letter (A, B, C, or D)."""
            
            # Prepare API request with varied parameters for better testing
            chat_data = {
                "messages": [{"role": "user", "content": question_text}],
                "max_tokens": 10,
                "temperature": 0.1,  # Low temperature for consistent answers
                "provider_constraint": ["openai", "deepseek", "anthropic"],  # Multiple providers
                "cost_bias": random.uniform(0.2, 0.8)  # Vary cost bias to test routing
            }
            
            try:
                request_start_time = time.time()
                response = requests.post(
                    f"{BASE_URL}/v1/chat/completions",
                    json=chat_data,
                    headers=headers,
                    timeout=60  # Longer timeout for complete test
                )
                response_time = time.time() - request_start_time
                
                if response.status_code == 200:
                    result = response.json()
                    ai_answer = result["choices"][0]["message"]["content"].strip()
                    
                    # Extract adaptive backend selection info
                    selected_model = result.get("model", "unknown")
                    selected_provider = result.get("provider", "unknown")
                    
                    # Extract letter from AI response
                    ai_letter = None
                    for char in ai_answer.upper():
                        if char in ['A', 'B', 'C', 'D']:
                            ai_letter = char
                            break
                    
                    # Convert correct answer index to letter
                    correct_letter = ['A', 'B', 'C', 'D'][question["answer"]]
                    
                    is_correct = ai_letter == correct_letter
                    if is_correct:
                        subject_correct += 1
                        correct_answers += 1
                    
                    # Store result with detailed backend selection info
                    results.append({
                        "subject": subject,
                        "question_id": f"{subject}_{i}",
                        "question": question["question"][:200] + "...",  # Longer preview
                        "correct_answer": correct_letter,
                        "ai_answer": ai_letter,
                        "ai_response_full": ai_answer,
                        "is_correct": is_correct,
                        "response_time": response_time,
                        "selected_model": selected_model,
                        "selected_provider": selected_provider,
                        "model_provider_combo": f"{selected_provider}/{selected_model}",
                        "completion_tokens": result.get("usage", {}).get("completion_tokens", 0),
                        "prompt_tokens": result.get("usage", {}).get("prompt_tokens", 0),
                        "total_tokens": result.get("usage", {}).get("total_tokens", 0),
                        "cost_bias_used": chat_data["cost_bias"],
                        "timestamp": datetime.now().isoformat()
                    })
                    
                    if (i + 1) % 10 == 0:
                        accuracy_so_far = (subject_correct / (i + 1)) * 100
                        print(f"✅ Accuracy: {accuracy_so_far:.1f}%")
                        
                else:
                    print(f"❌ API Error: {response.status_code}")
                    
            except Exception as e:
                print(f"❌ Request failed: {str(e)}")
                
            total_questions += 1
            
            # Save progress every 100 questions
            if total_questions % 100 == 0:
                temp_df = pd.DataFrame(results)
                temp_filename = f"mmlu_progress_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
                temp_df.to_csv(temp_filename, index=False)
                
                elapsed = datetime.now() - start_time
                rate = total_questions / elapsed.total_seconds() * 60  # questions per minute
                remaining = (len(test_data) - total_questions) / rate if rate > 0 else 0
                
                print(f"\n💾 Progress saved: {total_questions}/{len(test_data)} questions")
                print(f"⏱️  Rate: {rate:.1f} questions/min | ETA: {remaining:.0f} minutes")
            
            # Rate limiting
            time.sleep(0.05)  # Small delay to avoid overwhelming the API
        
        subject_accuracy = (subject_correct / subject_total) * 100
        print(f"   📊 {subject}: {subject_correct}/{subject_total} ({subject_accuracy:.1f}%)")
        
        # Save intermediate results after each subject
        if results:
            intermediate_df = pd.DataFrame(results)
            intermediate_filename = f"mmlu_intermediate_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
            intermediate_df.to_csv(intermediate_filename, index=False)
    
    # Calculate final results
    overall_accuracy = (correct_answers / total_questions) * 100 if total_questions > 0 else 0
    total_time = datetime.now() - start_time
    
    print(f"\n🎯 FINAL COMPLETE DATASET RESULTS:")
    print(f"📊 Overall Accuracy: {correct_answers}/{total_questions} ({overall_accuracy:.1f}%)")
    print(f"⏱️  Total Time: {total_time}")
    print(f"⚡ Average Response Time: {sum(r['response_time'] for r in results) / len(results):.2f}s")
    
    # Create comprehensive results DataFrame
    df = pd.DataFrame(results)
    
    if len(df) > 0:
        print(f"\n📈 COMPREHENSIVE PERFORMANCE ANALYSIS:")
        
        # Subject-wise performance
        subject_stats = df.groupby('subject').agg({
            'is_correct': ['count', 'sum', 'mean'],
            'response_time': 'mean',
            'selected_model': lambda x: x.mode().iloc[0] if not x.empty else 'unknown',
            'selected_provider': lambda x: x.mode().iloc[0] if not x.empty else 'unknown',
            'total_tokens': 'sum'
        }).round(3)
        
        subject_stats.columns = ['Questions', 'Correct', 'Accuracy', 'Avg_Time', 'Most_Used_Model', 'Most_Used_Provider', 'Total_Tokens']
        
        # Show top and bottom performing subjects
        subject_stats_sorted = subject_stats.sort_values('Accuracy', ascending=False)
        print(f"\n🏆 TOP 10 PERFORMING SUBJECTS:")
        print(subject_stats_sorted.head(10)[['Questions', 'Correct', 'Accuracy', 'Most_Used_Model']])
        
        print(f"\n📉 BOTTOM 10 PERFORMING SUBJECTS:")
        print(subject_stats_sorted.tail(10)[['Questions', 'Correct', 'Accuracy', 'Most_Used_Model']])
        
        print(f"\n🤖 COMPLETE ADAPTIVE BACKEND MODEL SELECTION ANALYSIS:")
        model_provider_usage = df['model_provider_combo'].value_counts()
        for combo, count in model_provider_usage.items():
            accuracy = df[df['model_provider_combo'] == combo]['is_correct'].mean() * 100
            avg_time = df[df['model_provider_combo'] == combo]['response_time'].mean()
            print(f"  {combo}: {count:,} questions ({count/len(df)*100:.1f}%) - Accuracy: {accuracy:.1f}% - Avg Time: {avg_time:.2f}s")
        
        print(f"\n💰 COMPLETE TOKEN USAGE ANALYSIS:")
        total_tokens_used = df['total_tokens'].sum()
        avg_tokens_per_question = df['total_tokens'].mean()
        print(f"  Total tokens consumed: {total_tokens_used:,}")
        print(f"  Average tokens per question: {avg_tokens_per_question:.1f}")
        print(f"  Estimated API cost (rough): ${total_tokens_used * 0.000002:.2f}")  # Rough estimate
        
        provider_token_usage = df.groupby('selected_provider')['total_tokens'].agg(['sum', 'mean', 'count']).round(1)
        print(f"\n📊 TOKEN USAGE BY PROVIDER:")
        for provider in provider_token_usage.index:
            total = provider_token_usage.loc[provider, 'sum']
            avg = provider_token_usage.loc[provider, 'mean']
            count = provider_token_usage.loc[provider, 'count']
            print(f"  {provider}: {total:,} total tokens ({avg:.1f} avg) across {count:,} questions")
        
        # Cost bias analysis
        print(f"\n💸 COST BIAS EFFECTIVENESS ANALYSIS:")
        df['cost_bias_bin'] = pd.cut(df['cost_bias_used'], bins=[0, 0.3, 0.7, 1.0], labels=['Low (0-0.3)', 'Med (0.3-0.7)', 'High (0.7-1.0)'])
        cost_bias_analysis = df.groupby('cost_bias_bin').agg({
            'selected_model': lambda x: x.mode().iloc[0] if not x.empty else 'unknown',
            'selected_provider': lambda x: x.mode().iloc[0] if not x.empty else 'unknown',
            'is_correct': 'mean',
            'total_tokens': 'mean'
        }).round(3)
        print(cost_bias_analysis)
        
        # Save final comprehensive results
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"mmlu_complete_benchmark_results_{timestamp}.csv"
        df.to_csv(filename, index=False)
        
        # Save summary statistics
        summary_filename = f"mmlu_complete_summary_{timestamp}.csv"
        subject_stats.to_csv(summary_filename)
        
        print(f"\n💾 COMPLETE RESULTS SAVED:")
        print(f"  📋 Detailed results: {filename}")
        print(f"  📊 Subject summary: {summary_filename}")
        
        print(f"\n🔄 ADAPTIVE ROUTING EFFECTIVENESS (COMPLETE ANALYSIS):")
        print(f"  Total questions tested: {len(df):,}")
        print(f"  Subjects covered: {len(df['subject'].unique())}")
        print(f"  Unique model/provider combinations used: {len(model_provider_usage)}")
        print(f"  Most frequently selected: {model_provider_usage.index[0] if len(model_provider_usage) > 0 else 'N/A'}")
        print(f"  Routing diversity: {len(model_provider_usage)} different combinations")
        
        # Performance by subject category analysis
        print(f"\n📚 SUBJECT CATEGORY ANALYSIS:")
        # Group subjects by domain (rough categorization)
        science_subjects = [s for s in df['subject'].unique() if any(term in s.lower() for term in ['physics', 'chemistry', 'biology', 'astronomy', 'anatomy'])]
        math_subjects = [s for s in df['subject'].unique() if any(term in s.lower() for term in ['math', 'statistics', 'calculus', 'algebra', 'geometry'])]
        humanities_subjects = [s for s in df['subject'].unique() if any(term in s.lower() for term in ['history', 'philosophy', 'literature', 'religion'])]
        
        for category, subjects in [('Science', science_subjects), ('Mathematics', math_subjects), ('Humanities', humanities_subjects)]:
            if subjects:
                category_df = df[df['subject'].isin(subjects)]
                if len(category_df) > 0:
                    category_accuracy = category_df['is_correct'].mean() * 100
                    print(f"  {category}: {category_accuracy:.1f}% accuracy across {len(subjects)} subjects")
    
    print(f"\n✨ COMPLETE MMLU BENCHMARK FINISHED!")
    print(f"🏁 Total time: {total_time}")
    print(f"📊 Overall performance: {overall_accuracy:.1f}% accuracy")
    return df

# Run the complete benchmark
print("🚀 Starting COMPLETE MMLU dataset benchmark...")
print("📋 This will test your adaptive backend against ALL ~14,000 academic questions")
print("🔍 Comprehensive analysis of model selection across all 59 subjects")
print("⚠️  WARNING: This will take several hours and consume significant API credits!\n")

# Run the complete benchmark
complete_benchmark_results = run_complete_mmlu_benchmark()

print("\n🎉 COMPLETE BENCHMARK ANALYSIS FINISHED!")

In [43]:
# Enhanced Backend URL Tester - Compatible with Adaptive Backend
import requests
import json
import time
from datetime import datetime

# Configuration - UPDATE THESE VALUES
BASE_URL = "https://backend-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io"
PYTHON_SERVICE_URL = "https://prompt-classifer-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io"

def quick_test(url, endpoint="/health", method="GET", data=None):
    """Quick test function for any backend endpoint"""
    full_url = f"{url.rstrip('/')}/{endpoint.lstrip('/')}"
    
    headers = {"Content-Type": "application/json"}
    
    print(f"🔍 Testing: {method} {full_url}")
    start_time = time.time()
    
    try:
        response = requests.request(method, full_url, headers=headers, json=data, timeout=10)
        response_time = time.time() - start_time
        
        print(f"✅ Status: {response.status_code} | Time: {response_time:.2f}s")
        
        try:
            result = response.json()
            print(f"📄 Response: {json.dumps(result, indent=2)}")
        except:
            print(f"📄 Response: {response.text}")
            
        return {"success": True, "status": response.status_code, "data": result if 'result' in locals() else response.text}
        
    except Exception as e:
        print(f"❌ Error: {str(e)}")
        return {"success": False, "error": str(e)}

# Test Examples - Compatible with Adaptive Backend Architecture
print("=== ADAPTIVE BACKEND COMPATIBILITY TESTER ===\n")

# 1. Go Backend Health Check
print("1. Go Backend Health Check:")
quick_test(BASE_URL, "/health")

print("\n" + "="*60 + "\n")



print("\n" + "="*60 + "\n")

# 3. Python AI Service Protocol Selection Test
print("3. Python AI Service Protocol Selection:")
predict_data = {
    "messages": [{"role": "user", "content": "Test protocol selection with a complex mathematical question about quantum mechanics."}],
    "user_id": "test-user",
    "provider_constraint": ["openai", "deepseek", "anthropic"],
    "cost_bias": 0.7
}
python_result = quick_test(PYTHON_SERVICE_URL, "/predict", "POST", predict_data)

print("\n" + "="*60 + "\n")

# 4. Go Backend Chat Completions - Basic Test
print("4. Go Backend Chat Completions (Basic):")
basic_chat_data = {
    "messages": [{"role": "user", "content": "Hello! Test basic message."}],

}
quick_test(BASE_URL, "/v1/chat/completions", "POST", basic_chat_data)

print("\n" + "="*60 + "\n")

# 5. Go Backend Chat Completions - Adaptive Features Test
print("5. Go Backend Chat Completions (Adaptive Features):")
adaptive_chat_data = {
    "messages": [{"role": "user", "content": "Explain quantum computing in simple terms."}],
    "provider_constraint": ["openai", "deepseek"],  # Multiple providers
    "cost_bias": 0.8  # Favor cheaper options
}
adaptive_result = quick_test(BASE_URL, "/v1/chat/completions", "POST", adaptive_chat_data)

print("\n" + "="*60 + "\n")

# 6. Go Backend Chat Completions - Cost Bias Variation Test
print("6. Cost Bias Variation Test:")
test_prompts = [
    {"content": "Simple math: What is 2+2?", "cost_bias": 0.2, "description": "Low cost bias (favor premium models)"},
    {"content": "Quick question: What's the capital of France?", "cost_bias": 0.9, "description": "High cost bias (favor cheaper models)"}
]

for i, test in enumerate(test_prompts, 1):
    print(f"\n   6.{i} {test['description']}:")
    cost_test_data = {
        "messages": [{"role": "user", "content": test["content"]}],
        "model": "gpt-4",
        "max_tokens": 80,
        "provider_constraint": ["openai", "deepseek"],
        "cost_bias": test["cost_bias"]
    }
    
    result = quick_test(BASE_URL, "/v1/chat/completions", "POST", cost_test_data)
    
    # Extract model selection info if available
    if result.get("success") and "data" in result:
        try:
            data = result["data"] if isinstance(result["data"], dict) else json.loads(result["data"])
            selected_model = data.get("model", "unknown")
            selected_provider = data.get("provider", "unknown")
            print(f"   → Selected: {selected_provider}/{selected_model}")
        except:
            print("   → Model selection info not available")

print("\n" + "="*60 + "\n")

# 7. Backend Integration Analysis
print("7. Backend Integration Analysis:")

# Check if Python service is working
python_working = False
if "data" in str(python_result):
    try:
        if isinstance(python_result.get("data"), dict):
            protocol = python_result["data"].get("protocol")
            if protocol:
                python_working = True
                print(f"   ✅ Python AI Service: Working (Protocol: {protocol})")
        else:
            print("   ⚠️  Python AI Service: Responding but format unclear")
    except:
        print("   ❌ Python AI Service: Response parsing failed")
else:
    print("   ❌ Python AI Service: Not responding properly")

# Check if adaptive features are working in Go backend
adaptive_working = False
if "data" in str(adaptive_result):
    try:
        if isinstance(adaptive_result.get("data"), dict):
            model = adaptive_result["data"].get("model")
            provider = adaptive_result["data"].get("provider")
            if model and provider:
                adaptive_working = True
                print(f"   ✅ Go Backend Adaptive Features: Working ({provider}/{model})")
        else:
            print("   ⚠️  Go Backend: Basic functionality working")
    except:
        print("   ❌ Go Backend: Response parsing failed")

# Integration status
print(f"\n   🔗 Integration Status:")
if python_working and adaptive_working:
    print("   ✅ FULL INTEGRATION: Both services working and communicating")
elif adaptive_working:
    print("   ⚠️  PARTIAL INTEGRATION: Go backend working, check Python service connection")
elif python_working:
    print("   ⚠️  SERVICES ISOLATED: Python service working but Go backend not using it")
else:
    print("   ❌ INTEGRATION ISSUES: Check both service configurations")

print("\n" + "="*60 + "\n")

# 8. Configuration Recommendations
print("8. Configuration Recommendations:")
print("   📋 Required Environment Variables for Go Backend:")
print("   • OPENAI_API_KEY: For semantic cache initialization")
print("   • ADAPTIVE_AI_BASE_URL: Points to Python service")
print("   • ADDR: Server port (default :8080)")
print("   • ALLOWED_ORIGINS: CORS configuration")

print("\n   🔧 Expected Request Format for Chat Completions:")
expected_format = {
    "messages": [{"role": "user", "content": "Your message"}],
    "provider_constraint": ["openai", "deepseek"],  # Adaptive feature
    "cost_bias": 0.5,  # Adaptive feature (0.0-1.0)
    "stream": False  # Optional 
}
print(f"   {json.dumps(expected_format, indent=2)}")

print("\n   🌐 Service Endpoints:")
print(f"   • Go Backend: {BASE_URL}")
print(f"   • Python AI Service: {PYTHON_SERVICE_URL}")
print("   • Go Health: GET /health")
print("   • Go Chat: POST /v1/chat/completions")
print("   • Python Health: GET /health")
print("   • Python Predict: POST /predict")

print("\n✨ Compatibility test complete!")
print("💡 Use this information to verify your adaptive backend setup.")

=== ADAPTIVE BACKEND COMPATIBILITY TESTER ===

1. Go Backend Health Check:
🔍 Testing: GET https://backend-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io/health
✅ Status: 200 | Time: 0.33s
📄 Response: {
  "go_version": "go1.24.5",
  "goroutines": 14,
  "status": "healthy",
  "timestamp": "2025-07-12T12:16:07.700473908Z",
  "uptime": "40ns"
}




3. Python AI Service Protocol Selection:
🔍 Testing: POST https://prompt-classifer-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io/predict
❌ Error: HTTPSConnectionPool(host='prompt-classifer-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io', port=443): Read timed out. (read timeout=10)


4. Go Backend Chat Completions (Basic):
🔍 Testing: POST https://backend-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io/v1/chat/completions
✅ Status: 200 | Time: 1.46s
📄 Response: {
  "id": "chatcmpl-BsTRKsIdepbqer4xsjQlQEPPW5hAm",
  "choices": [
    {
      "finish_reason": "stop",
      "index": 0,
      "logprobs

In [27]:
# SIMPLE VERSION:
import requests
import json

# Replace with your backend URL and endpoint
BASE_URL = "https://backend-dev.mangoplant-a7a21605.swedencentral.azurecontainerapps.io"
ENDPOINT = "/v1/chat/completions"

# Example request data
data = {
    "messages": [{"role": "user", "content": "Hello! Simple test message."}],
}

def test_backend():
    url = f"{BASE_URL.rstrip('/')}/{ENDPOINT.lstrip('/')}"
    headers = {"Content-Type": "application/json"}
    try:
        response = requests.post(url, headers=headers, json=data, timeout=10)
        print(f"Status: {response.status_code}")
        print("Response:")
        print(json.dumps(response.json(), indent=2))
    except Exception as e:
        print(f"Error: {e}")

test_backend()


Status: 500
Response:
{
  "code": 500,
  "error": "runtime error: invalid memory address or nil pointer dereference"
}
