# Kubernetes Microservices Observability Data Generator

This notebook generates a **realistic Kubernetes-deployed e-commerce platform** with:

**Indices Created:**
- `application-logs` - Application logs from all microservices
- `kubernetes-events` - K8s cluster events (pod restarts, scaling, failures)
- `system-metrics` - CPU, memory, network, disk metrics per pod/node
- `distributed-traces` - Request traces spanning multiple services

**Architecture:**
```
                    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                    ‚îÇ   Ingress/LB    ‚îÇ
                    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                             ‚îÇ
                    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                    ‚îÇ   API Gateway   ‚îÇ (3 replicas)
                    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
           ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
           ‚îÇ                 ‚îÇ                 ‚îÇ
    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
    ‚îÇ User Service‚îÇ   ‚îÇOrder Service‚îÇ   ‚îÇSearch Svc   ‚îÇ
    ‚îÇ (2 replicas)‚îÇ   ‚îÇ(3 replicas) ‚îÇ   ‚îÇ(2 replicas) ‚îÇ
    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
           ‚îÇ                 ‚îÇ
    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
    ‚îÇ Auth Service‚îÇ   ‚îÇPayment Svc  ‚îÇ   ‚îÇRecommendation‚îÇ
    ‚îÇ (2 replicas)‚îÇ   ‚îÇ(2 replicas) ‚îÇ   ‚îÇ(2 replicas) ‚îÇ
    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                             ‚îÇ
                      ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                      ‚îÇInventory Svc‚îÇ   ‚îÇNotification ‚îÇ
                      ‚îÇ(2 replicas) ‚îÇ   ‚îÇ(1 replica)  ‚îÇ
                      ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

**Simulated Incidents:**
1. Memory leak in recommendation-engine causing OOM kills
2. Database connection pool exhaustion in order-service
3. Cascading timeout failures from payment-service
4. Network partition affecting inventory-service


In [None]:
import random
import uuid
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch, helpers


In [None]:
# Connect to Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])

# Verify connection
if es.ping():
    print("Connected to Elasticsearch!")
else:
    raise Exception("Could not connect to Elasticsearch")


In [None]:
# Kubernetes cluster configuration
K8S_CLUSTER = "prod-us-east-1"
K8S_NAMESPACE = "ecommerce"

# Node pool (simulating a 5-node cluster)
K8S_NODES = [
    {"name": "node-pool-1-abc12", "zone": "us-east-1a", "instance_type": "m5.xlarge", "cpu_cores": 4, "memory_gb": 16},
    {"name": "node-pool-1-def34", "zone": "us-east-1b", "instance_type": "m5.xlarge", "cpu_cores": 4, "memory_gb": 16},
    {"name": "node-pool-1-ghi56", "zone": "us-east-1c", "instance_type": "m5.xlarge", "cpu_cores": 4, "memory_gb": 16},
    {"name": "node-pool-2-jkl78", "zone": "us-east-1a", "instance_type": "m5.2xlarge", "cpu_cores": 8, "memory_gb": 32},
    {"name": "node-pool-2-mno90", "zone": "us-east-1b", "instance_type": "m5.2xlarge", "cpu_cores": 8, "memory_gb": 32},
]

# Services with full Kubernetes metadata
services = []
service_definitions = [
    {"name": "api-gateway", "replicas": 3, "cpu_request": "500m", "memory_request": "512Mi", "image": "ecommerce/api-gateway:2.4.1"},
    {"name": "user-service", "replicas": 2, "cpu_request": "250m", "memory_request": "256Mi", "image": "ecommerce/user-service:1.8.3"},
    {"name": "auth-service", "replicas": 2, "cpu_request": "200m", "memory_request": "256Mi", "image": "ecommerce/auth-service:3.1.0"},
    {"name": "order-service", "replicas": 3, "cpu_request": "500m", "memory_request": "512Mi", "image": "ecommerce/order-service:2.2.7"},
    {"name": "payment-service", "replicas": 2, "cpu_request": "300m", "memory_request": "384Mi", "image": "ecommerce/payment-service:1.5.2"},
    {"name": "inventory-service", "replicas": 2, "cpu_request": "250m", "memory_request": "256Mi", "image": "ecommerce/inventory-service:1.9.0"},
    {"name": "notification-service", "replicas": 1, "cpu_request": "100m", "memory_request": "128Mi", "image": "ecommerce/notification-service:1.3.4"},
    {"name": "search-service", "replicas": 2, "cpu_request": "400m", "memory_request": "1Gi", "image": "ecommerce/search-service:2.0.1"},
    {"name": "recommendation-engine", "replicas": 2, "cpu_request": "1000m", "memory_request": "2Gi", "image": "ecommerce/recommendation:3.5.0"},
    {"name": "cache-service", "replicas": 3, "cpu_request": "200m", "memory_request": "1Gi", "image": "redis:7.2-alpine"},
]

# Generate pod instances for each service
for svc_def in service_definitions:
    for replica_idx in range(svc_def["replicas"]):
        pod_suffix = f"{uuid.uuid4().hex[:5]}"
        node = random.choice(K8S_NODES)
        services.append({
            "name": svc_def["name"],
            "k8s.pod.name": f"{svc_def['name']}-{replica_idx}-{pod_suffix}",
            "k8s.pod.uid": str(uuid.uuid4()),
            "k8s.namespace": K8S_NAMESPACE,
            "k8s.node.name": node["name"],
            "k8s.deployment.name": svc_def["name"],
            "k8s.replicaset.name": f"{svc_def['name']}-{uuid.uuid4().hex[:10]}",
            "container.image.name": svc_def["image"],
            "container.id": f"docker://{uuid.uuid4().hex}",
            "host.name": node["name"],
            "cloud.availability_zone": node["zone"],
            "env": "production",
            "cpu_request": svc_def["cpu_request"],
            "memory_request": svc_def["memory_request"],
        })

print(f"Generated {len(services)} pod instances across {len(K8S_NODES)} nodes")

log_levels = ["DEBUG", "INFO", "WARN", "ERROR", "FATAL"]
log_level_weights = [10, 60, 20, 8, 2]

http_status_codes = [200, 201, 204, 301, 302, 400, 401, 403, 404, 408, 429, 500, 502, 503, 504]
http_status_weights = [50, 10, 5, 2, 2, 5, 3, 2, 8, 2, 3, 4, 1, 2, 1]

user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15",
    "Mozilla/5.0 (Linux; Android 14) AppleWebKit/537.36 Chrome/120.0.0.0 Mobile",
    "PostmanRuntime/7.35.0",
    "python-requests/2.31.0",
    "curl/8.4.0",
]


In [None]:
# Error message templates by service - realistic production errors
error_templates = {
    "api-gateway": [
        "Connection timeout to upstream service {service} after {timeout}ms",
        "Rate limit exceeded for client IP {ip} - {requests} requests in {window}s",
        "Circuit breaker OPEN for service {service}, failing fast",
        "SSL handshake failed with upstream: {reason}",
        "Request body too large: {size}MB exceeds limit of 10MB",
        "Invalid JWT token: {reason}",
        "Upstream service {service} returned {status}: {message}",
    ],
    "user-service": [
        "Failed to authenticate user {user_id}: {reason}",
        "Database connection pool exhausted, {waiting} requests waiting",
        "User {user_id} not found in database",
        "Password hash verification failed for user {user_id}",
        "Email validation failed for {email}: {reason}",
        "Session expired for user {user_id}, last activity {minutes} minutes ago",
        "Failed to send verification email to {email}: SMTP error {code}",
    ],
    "order-service": [
        "Order {order_id} processing failed: insufficient inventory for SKU {sku}",
        "Payment authorization failed for order {order_id}: {reason}",
        "Order {order_id} stuck in {status} state for {hours} hours",
        "Failed to calculate shipping for order {order_id}: invalid address",
        "Duplicate order detected: {order_id} matches existing order",
        "Order cancellation failed: order {order_id} already shipped",
        "Tax calculation service timeout for order {order_id}",
    ],
    "payment-service": [
        "Payment gateway {gateway} returned error: {error_code} - {message}",
        "Card validation failed for transaction {txn_id}: {reason}",
        "Refund request failed: original transaction not found",
        "3DS authentication required but not provided for transaction {txn_id}",
        "Payment timeout: transaction {txn_id} did not complete in {timeout}s",
        "Fraud detection triggered for transaction {txn_id}: risk score {score}",
        "Currency conversion failed: {from_curr} to {to_curr} rate unavailable",
    ],
    "inventory-service": [
        "Stock level mismatch for SKU {sku}: expected {expected}, actual {actual}",
        "Warehouse {warehouse_id} sync failed: connection refused",
        "Reserved inventory expired for order {order_id}, releasing {quantity} units",
        "Low stock alert: SKU {sku} has only {quantity} units remaining",
        "Failed to update inventory: optimistic locking conflict for SKU {sku}",
    ],
    "notification-service": [
        "Push notification failed for device {device_id}: token expired",
        "Email delivery failed to {email}: bounced with code {code}",
        "SMS gateway error: {provider} returned {error_code}",
        "Notification queue backlog: {count} messages pending, oldest {age}s",
        "Template rendering failed for {template}: missing variable {var}",
    ],
    "search-service": [
        "Elasticsearch cluster health RED: {shards} unassigned shards",
        "Search query timeout after {timeout}ms for query: {query}",
        "Index {index} not found, falling back to default search",
        "Aggregation query too expensive: {buckets} buckets requested",
        "Reindex operation failed at document {doc_id}: {reason}",
    ],
    "recommendation-engine": [
        "Model inference failed: GPU memory exhausted",
        "Feature store connection timeout after {timeout}ms",
        "Cold start problem for new user {user_id}: no interaction history",
        "Model version mismatch: expected v{expected}, got v{actual}",
        "Recommendation cache miss rate {percent}% exceeds threshold",
    ],
    "cache-service": [
        "Redis cluster node {node} unreachable, failover initiated",
        "Cache eviction storm: {evicted} keys evicted in {seconds}s",
        "Memory usage critical: {percent}% of max, consider scaling",
        "Key {key} serialization failed: object too large",
        "Distributed lock acquisition timeout for key {key}",
    ],
    "auth-service": [
        "Brute force attack detected from IP {ip}: {attempts} failed attempts",
        "OAuth provider {provider} returned error: {error}",
        "Token refresh failed for user {user_id}: refresh token revoked",
        "MFA verification failed for user {user_id}: code expired",
        "API key revoked due to suspicious activity",
        "LDAP sync failed: connection to {server} refused",
    ],
}


In [None]:
# Info/success message templates
info_templates = {
    "api-gateway": [
        "Request {method} {path} completed in {duration}ms with status {status}",
        "New client connection from {ip}, total active connections: {count}",
        "Rate limit reset for client {ip}",
        "Circuit breaker CLOSED for service {service}, resuming normal operation",
        "Health check passed for all upstream services",
    ],
    "user-service": [
        "User {user_id} successfully authenticated from {ip}",
        "New user registration completed: {user_id} ({email})",
        "Password changed successfully for user {user_id}",
        "User profile updated: {user_id} changed {fields}",
        "Session created for user {user_id}, expires in {hours} hours",
    ],
    "order-service": [
        "Order {order_id} created successfully, total: ${amount}",
        "Order {order_id} status changed: {from_status} -> {to_status}",
        "Order {order_id} shipped via {carrier}, tracking: {tracking}",
        "Order {order_id} delivered to {city}, {country}",
        "Bulk order import completed: {count} orders processed",
    ],
    "payment-service": [
        "Payment {txn_id} authorized successfully: ${amount}",
        "Payment {txn_id} captured: ${amount} via {method}",
        "Refund processed: ${amount} to {method}",
        "Daily settlement completed: {count} transactions, ${total}",
        "Payment method added for user {user_id}: {type} ending in {last4}",
    ],
    "inventory-service": [
        "Stock updated for SKU {sku}: {old_qty} -> {new_qty}",
        "Inventory reserved for order {order_id}: {quantity} x {sku}",
        "Warehouse {warehouse_id} sync completed: {count} SKUs updated",
        "Restock shipment received: {count} units for {sku}",
        "Inventory audit completed: {matched} matched, {discrepancies} discrepancies",
    ],
    "notification-service": [
        "Email sent to {email}: {subject}",
        "Push notification delivered to {device_count} devices",
        "SMS sent to {phone}: delivery confirmed",
        "Notification batch processed: {count} messages in {duration}ms",
        "User {user_id} notification preferences updated",
    ],
    "search-service": [
        "Search query completed in {duration}ms: {results} results for '{query}'",
        "Index {index} refreshed, {docs} documents searchable",
        "Search suggestions generated for '{prefix}': {count} suggestions",
        "Elasticsearch cluster health GREEN: {nodes} nodes, {shards} shards",
        "Search analytics aggregated: {queries} queries, {clicks} clicks",
    ],
    "recommendation-engine": [
        "Recommendations generated for user {user_id}: {count} items in {duration}ms",
        "Model v{version} deployed successfully to {replicas} replicas",
        "Feature store updated: {features} features for {users} users",
        "A/B test {test_id} started: {variants} variants, {traffic}% traffic",
        "Batch inference completed: {count} users processed",
    ],
    "cache-service": [
        "Cache hit for key {key}: served in {duration}us",
        "Cache warmed: {count} keys preloaded",
        "Redis cluster rebalanced: {slots} slots migrated",
        "Memory usage nominal: {percent}% of max capacity",
        "Cache statistics: {hits} hits, {misses} misses, {ratio}% hit rate",
    ],
    "auth-service": [
        "OAuth login successful: user {user_id} via {provider}",
        "Access token issued for user {user_id}, expires in {minutes} minutes",
        "API key created for user {user_id}",
        "MFA enabled for user {user_id}: {method}",
        "Security audit: {active} active sessions for user {user_id}",
    ],
}


In [None]:
# Stack trace templates for errors
stack_traces = [
    """java.lang.NullPointerException: Cannot invoke method on null object
    at com.example.service.UserService.getUser(UserService.java:142)
    at com.example.controller.UserController.handleRequest(UserController.java:87)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:897)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:750)""",
    
    """java.sql.SQLException: Connection pool exhausted, no available connections
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:155)
    at com.example.repository.OrderRepository.findById(OrderRepository.java:45)
    at com.example.service.OrderService.processOrder(OrderService.java:201)
    at com.example.controller.OrderController.createOrder(OrderController.java:63)""",
    
    """redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:84)
    at com.example.cache.CacheService.get(CacheService.java:56)
    at com.example.service.ProductService.getProduct(ProductService.java:78)""",
    
    """org.elasticsearch.ElasticsearchTimeoutException: Timeout waiting for search response
    at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1012)
    at com.example.search.SearchService.executeSearch(SearchService.java:145)
    at com.example.controller.SearchController.search(SearchController.java:52)""",
    
    """com.stripe.exception.CardException: Your card was declined
    at com.stripe.net.LiveStripeResponseGetter.handleError(LiveStripeResponseGetter.java:214)
    at com.example.payment.StripePaymentGateway.charge(StripePaymentGateway.java:89)
    at com.example.service.PaymentService.processPayment(PaymentService.java:156)""",
    
    """io.grpc.StatusRuntimeException: UNAVAILABLE: upstream connect error or disconnect/reset before headers
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
    at com.example.client.InventoryClient.checkStock(InventoryClient.java:67)
    at com.example.service.OrderService.validateOrder(OrderService.java:112)""",
    
    """org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:999)
    at com.example.messaging.EventPublisher.publish(EventPublisher.java:45)
    at com.example.service.OrderService.publishOrderEvent(OrderService.java:234)""",
]


In [None]:
# Helper functions
def generate_ip():
    return f"{random.randint(1,255)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}"

def generate_user_id():
    return f"user_{random.randint(10000, 99999)}"

def generate_order_id():
    return f"ORD-{random.randint(100000, 999999)}"

def generate_transaction_id():
    return f"txn_{uuid.uuid4().hex[:16]}"

def generate_trace_id():
    return uuid.uuid4().hex

def generate_span_id():
    return uuid.uuid4().hex[:16]

def fill_template(template, service_name):
    """Fill a template with random realistic values"""
    replacements = {
        "{service}": random.choice([s["name"] for s in services]),
        "{timeout}": str(random.choice([1000, 2000, 3000, 5000, 10000, 30000])),
        "{ip}": generate_ip(),
        "{requests}": str(random.randint(100, 1000)),
        "{window}": str(random.choice([60, 300, 600])),
        "{reason}": random.choice(["connection_refused", "timeout", "invalid_response", "certificate_expired", "rate_limited"]),
        "{size}": str(random.randint(10, 100)),
        "{status}": str(random.choice([200, 201, 400, 401, 403, 404, 500, 502, 503, 504])),
        "{message}": random.choice(["Internal Server Error", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "OK"]),
        "{user_id}": generate_user_id(),
        "{waiting}": str(random.randint(10, 100)),
        "{email}": f"user{random.randint(1000,9999)}@example.com",
        "{minutes}": str(random.randint(30, 120)),
        "{code}": str(random.choice([421, 450, 451, 452, 550, 551, 552, 553, 554])),
        "{order_id}": generate_order_id(),
        "{sku}": f"SKU-{random.randint(10000, 99999)}",
        "{hours}": str(random.randint(1, 48)),
        "{txn_id}": generate_transaction_id(),
        "{score}": str(random.randint(70, 99)),
        "{from_curr}": random.choice(["USD", "EUR", "GBP"]),
        "{to_curr}": random.choice(["JPY", "CNY", "INR"]),
        "{gateway}": random.choice(["stripe", "paypal", "braintree", "adyen"]),
        "{error_code}": random.choice(["card_declined", "insufficient_funds", "expired_card", "invalid_cvc"]),
        "{expected}": str(random.randint(100, 500)),
        "{actual}": str(random.randint(0, 99)),
        "{warehouse_id}": f"WH-{random.choice(['US-EAST', 'US-WEST', 'EU-CENTRAL', 'APAC'])}01",
        "{quantity}": str(random.randint(1, 100)),
        "{device_id}": f"device_{uuid.uuid4().hex[:12]}",
        "{provider}": random.choice(["twilio", "sendgrid", "aws_sns", "firebase"]),
        "{count}": str(random.randint(10, 1000)),
        "{age}": str(random.randint(60, 3600)),
        "{template}": random.choice(["order_confirmation", "password_reset", "welcome", "shipping_update"]),
        "{var}": random.choice(["customer_name", "order_total", "tracking_number", "delivery_date"]),
        "{shards}": str(random.randint(1, 10)),
        "{query}": random.choice(["laptop gaming", "iphone case", "running shoes", "wireless headphones"]),
        "{index}": random.choice(["products", "orders", "users", "logs"]),
        "{buckets}": str(random.randint(10000, 100000)),
        "{doc_id}": f"doc_{uuid.uuid4().hex[:8]}",
        "{percent}": str(random.randint(10, 50)),
        "{test_id}": f"test_{random.randint(100, 999)}",
        "{node}": f"redis-{random.randint(1, 5)}",
        "{evicted}": str(random.randint(1000, 10000)),
        "{seconds}": str(random.randint(1, 60)),
        "{key}": f"cache:{random.choice(['user', 'product', 'session', 'config'])}:{random.randint(1000, 9999)}",
        "{attempts}": str(random.randint(5, 50)),
        "{error}": random.choice(["access_denied", "invalid_grant", "invalid_client", "expired_token"]),
        "{server}": f"ldap.{random.choice(['us', 'eu', 'ap'])}.corp.internal",
        "{method}": random.choice(["GET", "POST", "PUT", "DELETE", "PATCH"]),
        "{path}": random.choice(["/api/v1/users", "/api/v1/orders", "/api/v1/products", "/api/v1/search", "/health"]),
        "{duration}": str(random.randint(5, 2000)),
        "{fields}": random.choice(["email,phone", "name", "address", "preferences"]),
        "{amount}": f"{random.randint(10, 500)}.{random.randint(0, 99):02d}",
        "{from_status}": random.choice(["pending", "processing", "shipped"]),
        "{to_status}": random.choice(["processing", "shipped", "delivered"]),
        "{carrier}": random.choice(["fedex", "ups", "usps", "dhl"]),
        "{tracking}": f"{random.choice(['1Z', 'FX', 'DHL'])}{random.randint(1000000000, 9999999999)}",
        "{city}": random.choice(["New York", "Los Angeles", "Chicago", "Houston", "London", "Paris", "Tokyo"]),
        "{country}": random.choice(["USA", "UK", "France", "Germany", "Japan", "Canada"]),
        "{old_qty}": str(random.randint(10, 100)),
        "{new_qty}": str(random.randint(0, 200)),
        "{matched}": str(random.randint(900, 1000)),
        "{discrepancies}": str(random.randint(0, 10)),
        "{subject}": random.choice(["Order Confirmation", "Shipping Update", "Password Reset", "Welcome!"]),
        "{device_count}": str(random.randint(1, 5)),
        "{phone}": f"+1{random.randint(2000000000, 9999999999)}",
        "{results}": str(random.randint(10, 10000)),
        "{docs}": str(random.randint(10000, 1000000)),
        "{prefix}": random.choice(["lapt", "phon", "shoe", "head"]),
        "{nodes}": str(random.randint(3, 10)),
        "{queries}": str(random.randint(1000, 100000)),
        "{clicks}": str(random.randint(100, 10000)),
        "{version}": f"{random.randint(1, 5)}.{random.randint(0, 9)}.{random.randint(0, 99)}",
        "{replicas}": str(random.randint(2, 10)),
        "{features}": str(random.randint(50, 200)),
        "{users}": str(random.randint(10000, 1000000)),
        "{variants}": str(random.randint(2, 4)),
        "{traffic}": str(random.randint(5, 50)),
        "{slots}": str(random.randint(100, 1000)),
        "{hits}": str(random.randint(10000, 100000)),
        "{misses}": str(random.randint(100, 1000)),
        "{ratio}": str(random.randint(90, 99)),
        "{active}": str(random.randint(1, 10)),
        "{type}": random.choice(["visa", "mastercard", "amex"]),
        "{last4}": f"{random.randint(1000, 9999)}",
        "{total}": f"{random.randint(10000, 100000)}.{random.randint(0, 99):02d}",
    }
    
    result = template
    for placeholder, value in replacements.items():
        result = result.replace(placeholder, value)
    return result


In [None]:
def generate_log_entry(base_time, service_info):
    """Generate a single log entry with full Kubernetes metadata"""
    
    log_level = random.choices(log_levels, weights=log_level_weights)[0]
    service_name = service_info["name"]
    
    # Generate message based on log level
    if log_level in ["ERROR", "FATAL"]:
        templates = error_templates.get(service_name, error_templates["api-gateway"])
        message = fill_template(random.choice(templates), service_name)
        stack_trace = random.choice(stack_traces) if random.random() < 0.7 else None
    elif log_level == "WARN":
        if random.random() < 0.5:
            templates = error_templates.get(service_name, error_templates["api-gateway"])
        else:
            templates = info_templates.get(service_name, info_templates["api-gateway"])
        message = fill_template(random.choice(templates), service_name)
        stack_trace = None
    else:
        templates = info_templates.get(service_name, info_templates["api-gateway"])
        message = fill_template(random.choice(templates), service_name)
        stack_trace = None
    
    timestamp = base_time + timedelta(seconds=random.randint(0, 3600), milliseconds=random.randint(0, 999))
    trace_id = generate_trace_id()
    span_id = generate_span_id()
    
    log_entry = {
        "@timestamp": timestamp.isoformat(),
        "log.level": log_level,
        "message": message,
        "service.name": service_name,
        # Kubernetes metadata
        "kubernetes.pod.name": service_info.get("k8s.pod.name", f"{service_name}-0-xxxxx"),
        "kubernetes.pod.uid": service_info.get("k8s.pod.uid", str(uuid.uuid4())),
        "kubernetes.namespace": service_info.get("k8s.namespace", K8S_NAMESPACE),
        "kubernetes.node.name": service_info.get("k8s.node.name", "node-pool-1-abc12"),
        "kubernetes.deployment.name": service_info.get("k8s.deployment.name", service_name),
        "kubernetes.replicaset.name": service_info.get("k8s.replicaset.name", f"{service_name}-xxxxx"),
        "container.image.name": service_info.get("container.image.name", f"ecommerce/{service_name}:latest"),
        "container.id": service_info.get("container.id", f"docker://{uuid.uuid4().hex}"),
        # Host/cloud info
        "host.name": service_info.get("host.name", service_info.get("host", "unknown")),
        "cloud.availability_zone": service_info.get("cloud.availability_zone", "us-east-1a"),
        "cloud.provider": "aws",
        "cloud.region": "us-east-1",
        # Tracing
        "trace.id": trace_id,
        "span.id": span_id,
        "process.pid": random.randint(1, 100),  # Container PIDs are low
        "process.thread.name": f"worker-{random.randint(1, 16)}",
    }
    
    # Add HTTP fields
    if service_name in ["api-gateway", "user-service", "order-service", "search-service"]:
        http_status = random.choices(http_status_codes, weights=http_status_weights)[0]
        log_entry["http.request.method"] = random.choice(["GET", "POST", "PUT", "DELETE"])
        log_entry["http.response.status_code"] = http_status
        log_entry["http.request.duration_ms"] = random.randint(1, 5000)
        log_entry["client.ip"] = generate_ip()
        log_entry["user_agent.original"] = random.choice(user_agents)
    
    if stack_trace:
        log_entry["error.stack_trace"] = stack_trace
        log_entry["error.type"] = stack_trace.split(":")[0]
    
    return log_entry


In [None]:
# Create all indices with appropriate mappings
LOGS_INDEX = "application-logs"
METRICS_INDEX = "system-metrics"
K8S_EVENTS_INDEX = "kubernetes-events"
TRACES_INDEX = "distributed-traces"

logs_mapping = {
    "mappings": {
        "properties": {
            "@timestamp": {"type": "date"},
            "log.level": {"type": "keyword"},
            "message": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 512}}},
            "service.name": {"type": "keyword"},
            # Kubernetes fields
            "kubernetes.pod.name": {"type": "keyword"},
            "kubernetes.pod.uid": {"type": "keyword"},
            "kubernetes.namespace": {"type": "keyword"},
            "kubernetes.node.name": {"type": "keyword"},
            "kubernetes.deployment.name": {"type": "keyword"},
            "kubernetes.replicaset.name": {"type": "keyword"},
            "container.image.name": {"type": "keyword"},
            "container.id": {"type": "keyword"},
            # Host/cloud
            "host.name": {"type": "keyword"},
            "cloud.availability_zone": {"type": "keyword"},
            "cloud.provider": {"type": "keyword"},
            "cloud.region": {"type": "keyword"},
            # Tracing
            "trace.id": {"type": "keyword"},
            "span.id": {"type": "keyword"},
            "process.pid": {"type": "integer"},
            "process.thread.name": {"type": "keyword"},
            # HTTP
            "http.request.method": {"type": "keyword"},
            "http.response.status_code": {"type": "integer"},
            "http.request.duration_ms": {"type": "integer"},
            "client.ip": {"type": "ip"},
            "user_agent.original": {"type": "text"},
            # Errors
            "error.stack_trace": {"type": "text"},
            "error.type": {"type": "keyword"},
        }
    }
}

metrics_mapping = {
    "mappings": {
        "properties": {
            "@timestamp": {"type": "date"},
            "kubernetes.pod.name": {"type": "keyword"},
            "kubernetes.namespace": {"type": "keyword"},
            "kubernetes.node.name": {"type": "keyword"},
            "kubernetes.deployment.name": {"type": "keyword"},
            "service.name": {"type": "keyword"},
            "container.id": {"type": "keyword"},
            # CPU metrics
            "system.cpu.total.pct": {"type": "float"},
            "system.cpu.user.pct": {"type": "float"},
            "system.cpu.system.pct": {"type": "float"},
            "kubernetes.pod.cpu.usage.limit.pct": {"type": "float"},
            # Memory metrics
            "system.memory.used.bytes": {"type": "long"},
            "system.memory.total.bytes": {"type": "long"},
            "system.memory.used.pct": {"type": "float"},
            "kubernetes.pod.memory.usage.bytes": {"type": "long"},
            "kubernetes.pod.memory.limit.bytes": {"type": "long"},
            # Network
            "system.network.in.bytes": {"type": "long"},
            "system.network.out.bytes": {"type": "long"},
            "system.network.in.packets": {"type": "long"},
            "system.network.out.packets": {"type": "long"},
            # Disk
            "system.diskio.read.bytes": {"type": "long"},
            "system.diskio.write.bytes": {"type": "long"},
        }
    }
}

k8s_events_mapping = {
    "mappings": {
        "properties": {
            "@timestamp": {"type": "date"},
            "kubernetes.event.type": {"type": "keyword"},
            "kubernetes.event.reason": {"type": "keyword"},
            "kubernetes.event.message": {"type": "text"},
            "kubernetes.event.count": {"type": "integer"},
            "kubernetes.event.first_timestamp": {"type": "date"},
            "kubernetes.event.last_timestamp": {"type": "date"},
            "kubernetes.pod.name": {"type": "keyword"},
            "kubernetes.namespace": {"type": "keyword"},
            "kubernetes.node.name": {"type": "keyword"},
            "kubernetes.deployment.name": {"type": "keyword"},
            "kubernetes.event.involved_object.kind": {"type": "keyword"},
            "kubernetes.event.involved_object.name": {"type": "keyword"},
        }
    }
}

traces_mapping = {
    "mappings": {
        "properties": {
            "@timestamp": {"type": "date"},
            "trace.id": {"type": "keyword"},
            "span.id": {"type": "keyword"},
            "parent.span.id": {"type": "keyword"},
            "span.name": {"type": "keyword"},
            "span.kind": {"type": "keyword"},
            "service.name": {"type": "keyword"},
            "kubernetes.pod.name": {"type": "keyword"},
            "span.duration.us": {"type": "long"},
            "span.status.code": {"type": "keyword"},
            "http.method": {"type": "keyword"},
            "http.url": {"type": "keyword"},
            "http.status_code": {"type": "integer"},
            "db.type": {"type": "keyword"},
            "db.statement": {"type": "text"},
        }
    }
}

# Create all indices
for idx_name, mapping in [(LOGS_INDEX, logs_mapping), (METRICS_INDEX, metrics_mapping), 
                           (K8S_EVENTS_INDEX, k8s_events_mapping), (TRACES_INDEX, traces_mapping)]:
    if es.indices.exists(index=idx_name):
        es.indices.delete(index=idx_name)
        print(f"Deleted existing index: {idx_name}")
    es.indices.create(index=idx_name, body=mapping)
    print(f"Created index: {idx_name}")


In [None]:
# Generate logs for the past 24 hours with realistic incident patterns
def generate_and_index_logs(num_logs=5000):
    print(f"Generating {num_logs} log entries...")
    
    actions = []
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=24)
    
    # Define incident windows (simulating real outages)
    incidents = [
        {"start": 2, "end": 3, "service": "recommendation-engine", "type": "oom"},
        {"start": 6, "end": 7, "service": "order-service", "type": "db_pool"},
        {"start": 14, "end": 15, "service": "payment-service", "type": "timeout"},
        {"start": 20, "end": 21, "service": "inventory-service", "type": "network"},
    ]
    
    for i in range(num_logs):
        hours_ago = random.uniform(0, 24)
        base_time = end_time - timedelta(hours=hours_ago)
        
        # Check if we're in an incident window - increase error rate
        in_incident = False
        incident_service = None
        for inc in incidents:
            if inc["start"] <= (24 - hours_ago) <= inc["end"]:
                in_incident = True
                incident_service = inc["service"]
                break
        
        # Bias service selection during incidents
        if in_incident and random.random() < 0.4:
            service_info = random.choice([s for s in services if s["name"] == incident_service])
        else:
            service_info = random.choice(services)
        
        log_entry = generate_log_entry(base_time, service_info)
        
        # Increase error rate during incidents
        if in_incident and service_info["name"] == incident_service:
            if random.random() < 0.6:
                log_entry["log.level"] = random.choice(["ERROR", "FATAL", "WARN"])
        
        actions.append({"_index": LOGS_INDEX, "_source": log_entry})
        
        if len(actions) >= 1000:
            helpers.bulk(es, actions)
            print(f"Indexed {i + 1} logs...")
            actions = []
    
    if actions:
        helpers.bulk(es, actions)
    
    print(f"Successfully indexed {num_logs} log entries!")

generate_and_index_logs(5000)


In [None]:
# Generate system metrics for pods
def generate_metrics(num_metrics=3000):
    print(f"Generating {num_metrics} metric entries...")
    
    actions = []
    end_time = datetime.now()
    
    # Memory leak simulation for recommendation-engine (hours 2-3)
    memory_leak_pods = [s for s in services if s["name"] == "recommendation-engine"]
    
    for i in range(num_metrics):
        hours_ago = random.uniform(0, 24)
        timestamp = end_time - timedelta(hours=hours_ago)
        
        service_info = random.choice(services)
        node = next((n for n in K8S_NODES if n["name"] == service_info.get("k8s.node.name")), K8S_NODES[0])
        
        # Base metrics
        cpu_base = random.uniform(0.1, 0.4)
        memory_base = random.uniform(0.3, 0.6)
        
        # Simulate memory leak in recommendation-engine (hours 2-3)
        if service_info["name"] == "recommendation-engine" and 2 <= (24 - hours_ago) <= 3:
            progress = ((24 - hours_ago) - 2) / 1.0  # 0 to 1 over the hour
            memory_base = 0.6 + (progress * 0.35)  # Ramp up to 95%
            cpu_base = 0.3 + (progress * 0.5)  # CPU spikes too
        
        # High CPU during order-service DB issue (hours 6-7)
        if service_info["name"] == "order-service" and 6 <= (24 - hours_ago) <= 7:
            cpu_base = random.uniform(0.7, 0.95)
        
        memory_limit_bytes = 2 * 1024 * 1024 * 1024  # 2GB
        memory_used = int(memory_limit_bytes * memory_base)
        
        metric = {
            "@timestamp": timestamp.isoformat(),
            "kubernetes.pod.name": service_info.get("k8s.pod.name"),
            "kubernetes.namespace": K8S_NAMESPACE,
            "kubernetes.node.name": service_info.get("k8s.node.name"),
            "kubernetes.deployment.name": service_info["name"],
            "service.name": service_info["name"],
            "container.id": service_info.get("container.id"),
            # CPU
            "system.cpu.total.pct": round(cpu_base, 3),
            "system.cpu.user.pct": round(cpu_base * 0.7, 3),
            "system.cpu.system.pct": round(cpu_base * 0.3, 3),
            "kubernetes.pod.cpu.usage.limit.pct": round(cpu_base * 100, 1),
            # Memory
            "system.memory.used.bytes": memory_used,
            "system.memory.total.bytes": memory_limit_bytes,
            "system.memory.used.pct": round(memory_base, 3),
            "kubernetes.pod.memory.usage.bytes": memory_used,
            "kubernetes.pod.memory.limit.bytes": memory_limit_bytes,
            # Network
            "system.network.in.bytes": random.randint(100000, 10000000),
            "system.network.out.bytes": random.randint(100000, 10000000),
            "system.network.in.packets": random.randint(100, 10000),
            "system.network.out.packets": random.randint(100, 10000),
            # Disk
            "system.diskio.read.bytes": random.randint(10000, 1000000),
            "system.diskio.write.bytes": random.randint(10000, 1000000),
        }
        
        actions.append({"_index": METRICS_INDEX, "_source": metric})
        
        if len(actions) >= 500:
            helpers.bulk(es, actions)
            actions = []
    
    if actions:
        helpers.bulk(es, actions)
    
    print(f"Successfully indexed {num_metrics} metrics!")

generate_metrics(3000)


In [None]:
# Generate Kubernetes events
def generate_k8s_events():
    print("Generating Kubernetes events...")
    
    actions = []
    end_time = datetime.now()
    
    # Normal events
    normal_events = [
        {"reason": "Scheduled", "message": "Successfully assigned {ns}/{pod} to {node}", "type": "Normal"},
        {"reason": "Pulled", "message": "Container image \"{image}\" already present on machine", "type": "Normal"},
        {"reason": "Created", "message": "Created container {container}", "type": "Normal"},
        {"reason": "Started", "message": "Started container {container}", "type": "Normal"},
        {"reason": "SuccessfulCreate", "message": "Created pod: {pod}", "type": "Normal"},
        {"reason": "ScalingReplicaSet", "message": "Scaled up replica set {rs} to {count}", "type": "Normal"},
    ]
    
    # Warning events (correlated with incidents)
    warning_events = [
        {"reason": "OOMKilled", "message": "Container {container} was OOM killed", "type": "Warning", "service": "recommendation-engine", "hour": 2.5},
        {"reason": "OOMKilled", "message": "Container {container} exceeded memory limit", "type": "Warning", "service": "recommendation-engine", "hour": 2.8},
        {"reason": "BackOff", "message": "Back-off restarting failed container", "type": "Warning", "service": "recommendation-engine", "hour": 2.9},
        {"reason": "Unhealthy", "message": "Readiness probe failed: connection refused", "type": "Warning", "service": "order-service", "hour": 6.2},
        {"reason": "Unhealthy", "message": "Liveness probe failed: HTTP probe failed with statuscode: 503", "type": "Warning", "service": "order-service", "hour": 6.5},
        {"reason": "FailedScheduling", "message": "0/5 nodes are available: insufficient memory", "type": "Warning", "service": "recommendation-engine", "hour": 3.0},
        {"reason": "NetworkNotReady", "message": "network is not ready: container runtime network not ready", "type": "Warning", "service": "inventory-service", "hour": 20.2},
    ]
    
    # Generate normal events throughout the day
    for _ in range(100):
        hours_ago = random.uniform(0, 24)
        timestamp = end_time - timedelta(hours=hours_ago)
        service_info = random.choice(services)
        event_template = random.choice(normal_events)
        
        event = {
            "@timestamp": timestamp.isoformat(),
            "kubernetes.event.type": event_template["type"],
            "kubernetes.event.reason": event_template["reason"],
            "kubernetes.event.message": event_template["message"].format(
                ns=K8S_NAMESPACE,
                pod=service_info.get("k8s.pod.name"),
                node=service_info.get("k8s.node.name"),
                image=service_info.get("container.image.name"),
                container=service_info["name"],
                rs=service_info.get("k8s.replicaset.name"),
                count=random.randint(1, 3)
            ),
            "kubernetes.event.count": 1,
            "kubernetes.event.first_timestamp": timestamp.isoformat(),
            "kubernetes.event.last_timestamp": timestamp.isoformat(),
            "kubernetes.pod.name": service_info.get("k8s.pod.name"),
            "kubernetes.namespace": K8S_NAMESPACE,
            "kubernetes.node.name": service_info.get("k8s.node.name"),
            "kubernetes.deployment.name": service_info["name"],
            "kubernetes.event.involved_object.kind": "Pod",
            "kubernetes.event.involved_object.name": service_info.get("k8s.pod.name"),
        }
        actions.append({"_index": K8S_EVENTS_INDEX, "_source": event})
    
    # Generate warning events at specific incident times
    for event_template in warning_events:
        target_service = event_template.get("service")
        matching_pods = [s for s in services if s["name"] == target_service]
        
        for pod in matching_pods:
            timestamp = end_time - timedelta(hours=(24 - event_template["hour"]))
            event = {
                "@timestamp": timestamp.isoformat(),
                "kubernetes.event.type": event_template["type"],
                "kubernetes.event.reason": event_template["reason"],
                "kubernetes.event.message": event_template["message"].format(container=target_service),
                "kubernetes.event.count": random.randint(1, 5),
                "kubernetes.event.first_timestamp": timestamp.isoformat(),
                "kubernetes.event.last_timestamp": (timestamp + timedelta(minutes=random.randint(1, 30))).isoformat(),
                "kubernetes.pod.name": pod.get("k8s.pod.name"),
                "kubernetes.namespace": K8S_NAMESPACE,
                "kubernetes.node.name": pod.get("k8s.node.name"),
                "kubernetes.deployment.name": target_service,
                "kubernetes.event.involved_object.kind": "Pod",
                "kubernetes.event.involved_object.name": pod.get("k8s.pod.name"),
            }
            actions.append({"_index": K8S_EVENTS_INDEX, "_source": event})
    
    helpers.bulk(es, actions)
    print(f"Successfully indexed {len(actions)} Kubernetes events!")

generate_k8s_events()


In [None]:
# Generate distributed traces (request flows through services)
def generate_traces(num_traces=500):
    print(f"Generating {num_traces} distributed traces...")
    
    actions = []
    end_time = datetime.now()
    
    # Define typical request flows
    flows = [
        ["api-gateway", "user-service", "auth-service"],  # Login
        ["api-gateway", "order-service", "payment-service", "inventory-service", "notification-service"],  # Checkout
        ["api-gateway", "search-service", "recommendation-engine"],  # Browse
        ["api-gateway", "user-service", "order-service"],  # Order history
        ["api-gateway", "inventory-service", "cache-service"],  # Stock check
    ]
    
    endpoints = {
        "api-gateway": ["/api/v1/login", "/api/v1/checkout", "/api/v1/search", "/api/v1/orders", "/api/v1/products"],
        "user-service": ["/users/profile", "/users/auth", "/users/preferences"],
        "auth-service": ["/auth/token", "/auth/verify", "/auth/refresh"],
        "order-service": ["/orders/create", "/orders/history", "/orders/status"],
        "payment-service": ["/payments/authorize", "/payments/capture", "/payments/refund"],
        "inventory-service": ["/inventory/check", "/inventory/reserve", "/inventory/release"],
        "search-service": ["/search/products", "/search/suggest"],
        "recommendation-engine": ["/recommend/similar", "/recommend/personalized"],
        "notification-service": ["/notify/email", "/notify/push"],
        "cache-service": ["/cache/get", "/cache/set"],
    }
    
    for _ in range(num_traces):
        hours_ago = random.uniform(0, 24)
        base_time = end_time - timedelta(hours=hours_ago)
        
        flow = random.choice(flows)
        trace_id = generate_trace_id()
        parent_span_id = None
        
        # Check if this trace is during an incident
        is_error_trace = False
        for inc_start, inc_end, inc_service in [(2, 3, "recommendation-engine"), (6, 7, "order-service"), 
                                                  (14, 15, "payment-service"), (20, 21, "inventory-service")]:
            if inc_start <= (24 - hours_ago) <= inc_end and inc_service in flow:
                is_error_trace = random.random() < 0.5
                break
        
        span_time = base_time
        for i, service_name in enumerate(flow):
            service_pods = [s for s in services if s["name"] == service_name]
            if not service_pods:
                continue
            pod = random.choice(service_pods)
            
            span_id = generate_span_id()
            duration_us = random.randint(1000, 50000)  # 1-50ms
            
            # Increase latency for error traces
            if is_error_trace and i > 0:
                duration_us = random.randint(100000, 500000)  # 100-500ms (slow)
            
            status_code = 200
            if is_error_trace and i == len(flow) - 1:
                status_code = random.choice([500, 502, 503, 504])
            
            span = {
                "@timestamp": span_time.isoformat(),
                "trace.id": trace_id,
                "span.id": span_id,
                "parent.span.id": parent_span_id,
                "span.name": random.choice(endpoints.get(service_name, ["/unknown"])),
                "span.kind": "SERVER" if i > 0 else "CLIENT",
                "service.name": service_name,
                "kubernetes.pod.name": pod.get("k8s.pod.name"),
                "span.duration.us": duration_us,
                "span.status.code": "ERROR" if status_code >= 400 else "OK",
                "http.method": random.choice(["GET", "POST"]),
                "http.url": f"http://{service_name}:8080{random.choice(endpoints.get(service_name, ['/']))}",
                "http.status_code": status_code,
            }
            
            # Add DB span for data services
            if service_name in ["user-service", "order-service", "inventory-service"]:
                span["db.type"] = "postgresql"
                span["db.statement"] = f"SELECT * FROM {service_name.replace('-service', 's')} WHERE id = ?"
            
            actions.append({"_index": TRACES_INDEX, "_source": span})
            
            parent_span_id = span_id
            span_time = span_time + timedelta(microseconds=duration_us)
    
    helpers.bulk(es, actions)
    print(f"Successfully indexed {len(actions)} trace spans!")

generate_traces(500)


In [None]:
# Verify all data
print("=" * 80)
print("DATA GENERATION COMPLETE - SUMMARY")
print("=" * 80)

for idx in [LOGS_INDEX, METRICS_INDEX, K8S_EVENTS_INDEX, TRACES_INDEX]:
    es.indices.refresh(index=idx)
    count = es.count(index=idx)["count"]
    print(f"\n{idx}: {count} documents")

# Log distribution
print("\n" + "-" * 40)
print("APPLICATION LOGS DISTRIBUTION:")
agg_result = es.search(
    index=LOGS_INDEX, size=0,
    body={"aggs": {"by_level": {"terms": {"field": "log.level"}}, 
                   "by_service": {"terms": {"field": "service.name", "size": 15}}}}
)
print("\nBy log level:")
for bucket in agg_result["aggregations"]["by_level"]["buckets"]:
    print(f"  {bucket['key']}: {bucket['doc_count']}")

print("\nBy service:")
for bucket in agg_result["aggregations"]["by_service"]["buckets"]:
    print(f"  {bucket['key']}: {bucket['doc_count']}")

# K8s events
print("\n" + "-" * 40)
print("KUBERNETES EVENTS:")
k8s_agg = es.search(
    index=K8S_EVENTS_INDEX, size=0,
    body={"aggs": {"by_reason": {"terms": {"field": "kubernetes.event.reason"}}}}
)
for bucket in k8s_agg["aggregations"]["by_reason"]["buckets"]:
    print(f"  {bucket['key']}: {bucket['doc_count']}")


In [None]:
# Show sample data from each index
print("\n" + "=" * 80)
print("SAMPLE DATA FOR ELASTIC-SCRIPT ANALYSIS")
print("=" * 80)

# Recent errors with K8s context
print("\nüìã RECENT ERROR LOGS (with Kubernetes context):")
error_logs = es.search(
    index=LOGS_INDEX, size=3,
    body={"query": {"terms": {"log.level": ["ERROR", "FATAL"]}}, "sort": [{"@timestamp": "desc"}]}
)
for hit in error_logs["hits"]["hits"]:
    s = hit["_source"]
    print(f"\n  [{s['log.level']}] {s['service.name']}")
    print(f"    Pod: {s.get('kubernetes.pod.name', 'N/A')}")
    print(f"    Node: {s.get('kubernetes.node.name', 'N/A')}")
    print(f"    Message: {s['message'][:80]}...")

# K8s warning events
print("\n\n‚ö†Ô∏è KUBERNETES WARNING EVENTS:")
k8s_warnings = es.search(
    index=K8S_EVENTS_INDEX, size=3,
    body={"query": {"term": {"kubernetes.event.type": "Warning"}}, "sort": [{"@timestamp": "desc"}]}
)
for hit in k8s_warnings["hits"]["hits"]:
    s = hit["_source"]
    print(f"\n  [{s['kubernetes.event.reason']}] {s.get('kubernetes.deployment.name', 'N/A')}")
    print(f"    {s['kubernetes.event.message']}")

# High memory pods
print("\n\nüìä PODS WITH HIGH MEMORY USAGE:")
high_mem = es.search(
    index=METRICS_INDEX, size=3,
    body={"query": {"range": {"system.memory.used.pct": {"gte": 0.7}}}, "sort": [{"system.memory.used.pct": "desc"}]}
)
for hit in high_mem["hits"]["hits"]:
    s = hit["_source"]
    print(f"  {s.get('kubernetes.pod.name', 'N/A')}: {s['system.memory.used.pct']*100:.1f}% memory")

print("\n\n" + "=" * 80)
print("AVAILABLE INDICES FOR ELASTIC-SCRIPT:")
print("=" * 80)
print("""
‚Ä¢ application-logs  - Logs with K8s metadata (pod, node, namespace)
‚Ä¢ system-metrics    - CPU, memory, network metrics per pod
‚Ä¢ kubernetes-events - K8s events (OOMKilled, Unhealthy, BackOff, etc.)  
‚Ä¢ distributed-traces - Request traces spanning multiple services
""")
