# Trends.Earth API - System Monitoring & Rate Limiting

This notebook focuses on testing system monitoring, rate limiting, and administrative features of the Trends.Earth API.

## Table of Contents
1. [Setup and Configuration](#setup)
2. [System Status Monitoring](#system-status)
3. [Docker Swarm Monitoring](#swarm-monitoring)
4. [Rate Limiting Tests](#rate-limiting)
5. [Performance Testing](#performance)

## Setup and Configuration {#setup}

In [None]:
# Import the shared utilities
import time

from IPython.display import HTML, display
import matplotlib.pyplot as plt
import pandas as pd
from trends_earth_api_utils import (
    TEST_USERS,
    TrendsEarthAPIClient,
    display_system_overview,
    get_rate_limit_status,
    get_swarm_status,
    get_system_status,
    test_rate_limiting,
)

# Configuration
API_URL = "http://localhost:5000"  # Update this for your environment

print(f"🌍 Trends.Earth API URL: {API_URL}")

# Initialize and login as admin (required for most monitoring functions)
client = TrendsEarthAPIClient(API_URL)
admin_user = TEST_USERS["admin"]
login_result = client.login(admin_user["email"], admin_user["password"])

if login_result:
    print(f"✅ Logged in as admin: {admin_user['email']}")
else:
    print("❌ Admin login failed - some tests will be skipped")
    print("   System monitoring typically requires admin privileges")

## System Status Monitoring {#system-status}

In [None]:
# Get comprehensive system overview
print("🖥️  Getting system overview...")
display_system_overview(client)

print("\n" + "=" * 50)

In [None]:
# Analyze status trends
status_logs = get_system_status(client, per_page=10, sort="-timestamp")

if status_logs and len(status_logs) >= 2:
    latest = status_logs[0]
    previous = status_logs[1]

    # Calculate changes
    active_change = (
        latest.get('executions_active', 0) - previous.get('executions_active', 0)
    )
    running_change = (
        latest.get('executions_running', 0) - previous.get('executions_running', 0)
    )
    users_change = latest.get('users_count', 0) - previous.get('users_count', 0)

    print("\n📈 Trends (compared to previous log):")
    active_sign = '+' if active_change >= 0 else ''
    print(
        f"   Active Executions: {latest.get('executions_active', 0)} "
        f"({active_sign}{active_change})"
    )
    running_sign = '+' if running_change >= 0 else ''
    print(
        f"   Running Executions: {latest.get('executions_running', 0)} "
        f"({running_sign}{running_change})"
    )
    users_sign = '+' if users_change >= 0 else ''
    print(
        f"   Total Users: {latest.get('users_count', 0)} "
        f"({users_sign}{users_change})"
    )
else:
    print("\n📈 Not enough data for trend analysis")

In [None]:
# Visualize system metrics over time (if matplotlib is available and we have data)
if status_logs and len(status_logs) > 5:
    print("📊 Creating system metrics visualization...")

    try:
        # Prepare data for visualization
        timestamps = []
        active_executions = []
        running_executions = []
        ready_executions = []

        for log in reversed(status_logs[:10]):  # Last 10, oldest first
            if log.get("timestamp"):
                timestamps.append(log["timestamp"][:16])  # YYYY-MM-DD HH:MM
                active_executions.append(log.get("executions_active", 0))
                running_executions.append(log.get("executions_running", 0))
                ready_executions.append(log.get("executions_ready", 0))

        if timestamps:
            # Create the plot
            fig, ax = plt.subplots(figsize=(12, 6))

            ax.plot(
                range(len(timestamps)),
                active_executions,
                "b-o",
                label="Active Executions",
                markersize=4,
            )
            ax.plot(
                range(len(timestamps)),
                running_executions,
                "g-s",
                label="Running Executions",
                markersize=4,
            )
            ax.plot(
                range(len(timestamps)),
                ready_executions,
                "r-^",
                label="Ready Executions",
                markersize=4,
            )

            ax.set_xlabel("Time")
            ax.set_ylabel("Number of Executions")
            ax.set_title("System Execution Metrics Over Time")
            ax.legend()
            ax.grid(True, alpha=0.3)

            # Set x-axis labels (show every other timestamp to avoid crowding)
            step = max(1, len(timestamps) // 5)
            ax.set_xticks(range(0, len(timestamps), step))
            ax.set_xticklabels(
                [timestamps[i] for i in range(0, len(timestamps), step)], rotation=45
            )

            plt.tight_layout()
            plt.show()

            print("✅ System metrics visualization created")
        else:
            print("⚠️  No valid timestamps for visualization")

    except Exception as e:
        print(f"⚠️  Could not create visualization: {e}")
else:
    print("⚠️  Insufficient data for visualization (need at least 5 status logs)")

## Docker Swarm Monitoring {#swarm-monitoring}

In [None]:
# Get Docker Swarm status
print("🐳 Getting Docker Swarm status...")

swarm_info = get_swarm_status(client)

if swarm_info:
    print("✅ Docker Swarm information retrieved:")

    # Display swarm overview
    swarm_df = pd.DataFrame(
        [
            {
                "Metric": k.replace("_", " ").title(),
                "Value": str(v) if v is not None else "N/A",
            }
            for k, v in swarm_info.items()
        ]
    )

    display(HTML(swarm_df.to_html(index=False)))

    # Analyze swarm health
    print("\n🏥 Swarm Health Analysis:")

    is_active = swarm_info.get("swarm_active", False)
    total_nodes = swarm_info.get("total_nodes", 0)
    managers = swarm_info.get("total_managers", 0)
    workers = swarm_info.get("total_workers", 0)

    if is_active:
        print("   ✅ Swarm is active")

        if managers >= 1:
            print(f"   ✅ Sufficient managers ({managers})")
        else:
            print(f"   ⚠️  Low manager count ({managers})")

        if total_nodes > 0:
            print(f"   ✅ Nodes available ({total_nodes} total, {workers} workers)")
        else:
            print("   ⚠️  No nodes detected")

        # Check for any additional swarm metrics
        if "services" in swarm_info:
            services = swarm_info["services"]
            if isinstance(services, list):
                print(f"   📋 Services running: {len(services)}")
            elif isinstance(services, int):
                print(f"   📋 Services running: {services}")

        if "tasks" in swarm_info:
            tasks = swarm_info["tasks"]
            if isinstance(tasks, (list, dict)):
                task_count = (
                    len(tasks) if isinstance(tasks, list) else tasks.get("total", 0)
                )
                print(f"   🎯 Active tasks: {task_count}")
    else:
        print("   ❌ Swarm is not active")
else:
    print("❌ Could not retrieve Docker Swarm status")
    print("   This may be normal if not running in swarm mode")

## Rate Limiting Tests {#rate-limiting}

In [None]:
# Get current rate limiting status
print("🚦 Getting rate limiting status...")

rate_status = get_rate_limit_status(client)

if rate_status:
    print("✅ Rate limiting status retrieved:")

    # Display rate limit information
    if isinstance(rate_status, dict):
        rate_df = pd.DataFrame(
            [
                {
                    "Metric": k.replace("_", " ").title(),
                    "Value": str(v) if v is not None else "N/A",
                }
                for k, v in rate_status.items()
            ]
        )

        display(HTML(rate_df.to_html(index=False)))
    else:
        print(f"   Status: {rate_status}")
else:
    print("❌ Could not retrieve rate limiting status")
    print("   This feature may require superadmin privileges")

In [None]:
# Test rate limiting with different request patterns
print("🧪 Testing rate limiting patterns...")

# Test 1: Burst requests
print("\n1️⃣  Burst Request Test (10 requests, no delay):")
burst_results = test_rate_limiting(
    client, endpoint="/user/me", requests_count=10, delay=0
)

# Analyze burst results
burst_rate_limited = sum(1 for r in burst_results if r.get("rate_limited", False))
burst_avg_time = sum(r.get("response_time", 0) for r in burst_results) / len(
    burst_results
)

print(f"   Rate limited requests: {burst_rate_limited}/{len(burst_results)}")
print(f"   Average response time: {burst_avg_time:.2f}ms")

In [None]:
# Test 2: Sustained requests
print("\n2️⃣  Sustained Request Test (15 requests, 0.5s delay):")
sustained_results = test_rate_limiting(
    client, endpoint="/user/me", requests_count=15, delay=0.5
)

# Analyze sustained results
sustained_rate_limited = sum(
    1 for r in sustained_results if r.get("rate_limited", False)
)
sustained_avg_time = sum(r.get("response_time", 0) for r in sustained_results) / len(
    sustained_results
)

print(f"   Rate limited requests: {sustained_rate_limited}/{len(sustained_results)}")
print(f"   Average response time: {sustained_avg_time:.2f}ms")

In [None]:
# Combine all test results for analysis
results = []
if 'burst_results' in locals():
    results.extend(burst_results)
if 'sustained_results' in locals():
    results.extend(sustained_results)

if results:
    # Calculate statistics
    successful = sum(1 for r in results if r['success'])
    rate_limited = sum(1 for r in results if r.get('rate_limited', False))
    avg_time = (
        sum(r['response_time'] for r in results if r['success']) /
        max(successful, 1)
    )

    print(f"📊 Rate Limiting Test Results ({len(results)} requests):")
    print(f"     Successful: {successful}/{len(results)}")
    rate_limited_pct = (
        (rate_limited / len(results) * 100) if len(results) > 0 else 0
    )
    print(
        f"     Rate limited: {rate_limited}/{len(results)} "
        f"({rate_limited_pct:.1f}%)"
    )
    print(f"     Avg response: {avg_time:.2f}ms")

    if rate_limited > 0:
        print("⚠️  Rate limiting is active!")
    else:
        print("✅ No rate limiting detected")
else:
    print("❌ No results from rate limiting test")

In [None]:
# Sustained load test (100 requests over 30 seconds)
print("\n🔄 Running sustained load test...")
sustained_results = test_rate_limiting(client, num_requests=100, delay=0.3)

if sustained_results:
    sustained_rate_limited = sum(
        1 for r in sustained_results if r.get('rate_limited', False)
    )
    successful_sustained = sum(1 for r in sustained_results if r['success'])
    sustained_avg_time = (
        sum(r['response_time'] for r in sustained_results if r['success']) /
        max(successful_sustained, 1)
    )

    sustained_rate_limited_pct = (
        (sustained_rate_limited / len(sustained_results) * 100)
        if len(sustained_results) > 0 else 0
    )

    print("📊 Sustained Load Test Summary:")
    sustained_summary = {
        "Test Type": "Sustained Load",
        "Duration": "30 seconds",
        "Requests": len(sustained_results),
        "Rate Limited": sustained_rate_limited,
        "Rate Limited %": f"{sustained_rate_limited_pct:.1f}%",
        "Avg Response (ms)": f"{sustained_avg_time:.2f}",
    }

    for key, value in sustained_summary.items():
        print(f"   {key}: {value}")

    # Compare with status endpoint rate limits
    try:
        status_data = get_rate_limit_status(client)
        if status_data and 'rate_limit' in status_data:
            rate_limit_info = status_data['rate_limit']
            print("\n📋 Rate Limit Configuration:")
            for endpoint, data in rate_limit_info.items():
                if data['total'] > 0:
                    rate_limited_pct = (
                        data['rate_limited'] / data['total'] * 100
                    )
                    print(f"   {endpoint}:")
                    print(f"     Total Requests: {data['total']}")
                    print(f"     Rate Limited: {data['rate_limited']}")
                    print(f"     Rate Limited %: {rate_limited_pct:.1f}%")
                    avg_response = data['avg_response_time']
                    print(f"     Avg Response (ms): {avg_response:.2f}")
    except Exception as e:
        print(f"⚠️  Could not retrieve rate limit status: {e}")

else:
    print("❌ Sustained load test failed")

## Performance Testing {#performance}

In [None]:
import concurrent.futures

import requests

# Configuration for concurrent testing
BASE_URL = API_URL
headers = (
    {"Authorization": f"Bearer {client.token}"}
    if hasattr(client, 'token')
    else {}
)

# Concurrent user simulation test
print("🔄 Starting concurrent test with 3 clients (5 requests each)...")

concurrent_results = []

def client_test(client_id, num_requests=5):
    """Simulate a client making multiple requests"""
    results = []
    for i in range(num_requests):
        try:
            response = requests.get(
                f"{BASE_URL}/api/v1/executions",
                headers=headers,
                timeout=30
            )
            results.append({
                'client_id': client_id,
                'request_num': i + 1,
                'success': response.status_code == 200,
                'status_code': response.status_code,
                'response_time': response.elapsed.total_seconds() * 1000,
                'rate_limited': response.status_code == 429
            })
        except Exception as e:
            results.append({
                'client_id': client_id,
                'request_num': i + 1,
                'success': False,
                'error': str(e),
                'response_time': 0,
                'rate_limited': False
            })
        time.sleep(0.1)  # Small delay between requests
    return results

# Run concurrent clients
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(client_test, i+1) for i in range(3)]
    for future in concurrent.futures.as_completed(futures):
        concurrent_results.extend(future.result())

# Analyze concurrent results
if concurrent_results:
    print("\n📊 Concurrent Test Results:")

    # Group by client
    for client_id in range(1, 4):
        client_results = [
            r for r in concurrent_results
            if r.get('client_id') == client_id
        ]
        successful = sum(1 for r in client_results if r['success'])
        avg_time = (
            sum(r['response_time'] for r in client_results if r['success']) /
            max(successful, 1)
        )
        print(
            f"   Client {client_id}: {successful}/{len(client_results)} "
            f"successful, avg {avg_time:.2f}ms"
        )

    # Overall statistics
    total_requests = len(concurrent_results)
    total_successful = sum(1 for r in concurrent_results if r['success'])
    avg_response_time = (
        sum(r['response_time'] for r in concurrent_results if r['success']) /
        max(total_successful, 1)
    )

    print("\n🎯 Overall Concurrent Results:")
    print(f"   Total Requests: {total_requests}")
    success_pct = (
        (total_successful / total_requests * 100) if total_requests > 0 else 0
    )
    print(
        f"   Successful: {total_successful} "
        f"({success_pct:.1f}%)"
    )
    print(f"   Average Response Time: {avg_response_time:.2f}ms")

    if success_pct >= 90:
        print("✅ Excellent concurrent performance!")
    elif success_pct >= 70:
        print("⚠️  Good concurrent performance, minor issues detected")
    else:
        print("🚨 Concurrent performance issues detected!")
else:
    print("❌ No concurrent test results")

In [None]:
# Test system under different load patterns
print("\n⚡ Testing system responsiveness under load...")

# Baseline test - single request
print("\n📊 Baseline Performance:")
baseline_start = time.time()
try:
    response = client.make_request("GET", "/user/me")
    baseline_time = (time.time() - baseline_start) * 1000
    print(f"   Single request: {baseline_time:.2f}ms (status: {response.status_code})")
except Exception as e:
    print(f"   Baseline test failed: {e}")
    baseline_time = 0

# Load test - rapid sequential requests
print("\n🔄 Load Test (20 sequential requests):")
load_times = []
load_start = time.time()

for i in range(20):
    try:
        req_start = time.time()
        response = client.make_request("GET", "/user/me")
        req_time = (time.time() - req_start) * 1000
        load_times.append(req_time)

        if response.status_code == 429:
            print(f"   Request {i + 1}: Rate limited after {req_time:.2f}ms")
        elif i % 5 == 0:  # Print every 5th request
            print(f"   Request {i + 1}: {req_time:.2f}ms")

    except Exception as e:
        print(f"   Request {i + 1}: Error - {e}")

    time.sleep(0.05)  # 50ms delay

total_load_time = (time.time() - load_start) * 1000

if load_times:
    avg_load_time = sum(load_times) / len(load_times)
    print("\n📊 Load Test Results:")
    print(f"   Requests completed: {len(load_times)}/20")
    print(f"   Total time: {total_load_time:.2f}ms")
    print(f"   Average per request: {avg_load_time:.2f}ms")
    print(
        f"   Performance vs baseline: {(avg_load_time / baseline_time):.2f}x"
        if baseline_time > 0
        else "N/A"
    )
    print(
        f"   Throughput: {len(load_times) / (total_load_time / 1000):.2f} requests/sec"
    )

In [None]:
# Final system status check
print("\n🔍 Final system status check after testing...")

# Get fresh system status
final_status = get_system_status(client, per_page=1, sort="-timestamp")
if final_status:
    latest_status = final_status[0]
    print("📊 Current System State:")
    print(f"   Active Executions: {latest_status.get('executions_active', 0)}")
    print(f"   Running Executions: {latest_status.get('executions_running', 0)}")
    print(f"   Ready Executions: {latest_status.get('executions_ready', 0)}")
    print(f"   Timestamp: {latest_status.get('timestamp', 'N/A')}")

# Summary
print("\n📊 SYSTEM MONITORING & RATE LIMITING TEST SUMMARY")
print("=" * 65)
print("✅ System status monitoring tested")
print("✅ Docker Swarm monitoring tested")
print("✅ Rate limiting functionality tested")
print("✅ Performance under various loads tested")
print("✅ Concurrent request handling tested")

# Logout
client.logout()
print("\n🎉 System Monitoring and Rate Limiting tests completed!")