# NATS Stream Diagnostic Notebook

This notebook helps diagnose issues with NATS streams in the observability-agent project, specifically focusing on streams that aren't receiving data.

## Setup

Install required libraries and set up connection to NATS.

In [None]:
# Install necessary packages if not already installed
!pip install nats-py asyncio pandas

In [None]:
import asyncio
import json
import nats
import pandas as pd
from datetime import datetime
import time
import random

# NATS server URL - update if needed
NATS_URL = "nats://localhost:4222"

## Check Stream Status

Connect to NATS and get information about all streams.

In [None]:
async def get_stream_info():
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    # Get all stream names
    streams = await js.streams_info()
    
    stream_data = []
    for stream in streams:
        info = await js.stream_info(stream.config.name)
        stream_data.append({
            'Name': info.config.name,
            'Subjects': ', '.join(info.config.subjects),
            'Messages': info.state.messages,
            'Bytes': info.state.bytes,
            'First Seq': info.state.first_seq,
            'Last Seq': info.state.last_seq
        })
    
    await nc.close()
    return stream_data

stream_info = await get_stream_info()
df = pd.DataFrame(stream_info)
df

## Test Message Publication

Publish test messages to the empty streams to see if they can receive data.

In [None]:
async def publish_test_message(stream, subject):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    # Create test message based on stream type
    test_data = None
    if stream == "METRICS":
        test_data = {
            "timestamp": datetime.now().isoformat(),
            "service": "test-service",
            "name": "cpu_usage",
            "value": random.randint(1, 100)
        }
    elif stream == "LOGS":
        test_data = {
            "timestamp": datetime.now().isoformat(),
            "service": "test-service",
            "level": "INFO",
            "message": "Test log message"
        }
    elif stream == "ALERT_DATA" or stream == "ALERTS":
        test_data = {
            "id": f"alert-{int(time.time())}",
            "labels": {
                "alertname": "TestAlert",
                "service": "test-service",
                "severity": "info"
            },
            "annotations": {
                "summary": "Test alert",
                "description": "This is a test alert"
            },
            "startsAt": datetime.now().isoformat()
        }
    
    if test_data:
        # Try to publish using JetStream
        try:
            ack = await js.publish(subject, json.dumps(test_data).encode())
            print(f"Published to {subject} with JetStream, sequence: {ack.seq}")
        except Exception as e:
            print(f"JetStream publish failed: {e}")
            # Fallback to regular NATS publish
            nc.publish(subject, json.dumps(test_data).encode())
            print(f"Published to {subject} using regular NATS")
    
    await nc.close()
    return test_data

In [None]:
# Test publishing to METRICS stream
await publish_test_message("METRICS", "metrics.test_service.cpu_usage")

In [None]:
# Test publishing to LOGS stream
await publish_test_message("LOGS", "logs.test_service.info")

In [None]:
# Test publishing to ALERTS stream
await publish_test_message("ALERTS", "alerts.test_alert")

## Check Stream Configuration

Examine the detailed configuration of the empty streams.

In [None]:
async def check_stream_config(stream_name):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    try:
        info = await js.stream_info(stream_name)
        config = info.config
        print(f"Stream: {config.name}")
        print(f"Subjects: {config.subjects}")
        print(f"Retention: {config.retention}")
        print(f"Storage: {config.storage}")
        print(f"Max Messages: {config.max_msgs}")
        print(f"Max Age: {config.max_age}")
        print(f"Description: {config.description}")
    except Exception as e:
        print(f"Error getting stream config: {e}")
    
    await nc.close()

In [None]:
# Check configuration for the empty streams
await check_stream_config("METRICS")

In [None]:
await check_stream_config("LOGS")

In [None]:
await check_stream_config("ALERT_DATA")

## Verify Subject Mapping

Check if the subjects used in the scripts match what's configured in the streams.

In [None]:
# This function helps diagnose issues with subject mapping
async def check_subject_matches(stream_name, test_subject):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    try:
        info = await js.stream_info(stream_name)
        subjects = info.config.subjects
        
        print(f"Stream: {stream_name}")
        print(f"Configured subjects: {subjects}")
        print(f"Test subject: {test_subject}")
        
        # Check if the test subject would match any of the configured patterns
        matches = False
        for subject_pattern in subjects:
            # Simple pattern matching (could be more sophisticated)
            if subject_pattern == test_subject:
                matches = True
                print(f"✅ Direct match with: {subject_pattern}")
            elif '>' in subject_pattern:
                prefix = subject_pattern.split('>', 1)[0].strip('.')
                if test_subject.startswith(prefix):
                    matches = True
                    print(f"✅ Wildcard match with: {subject_pattern}")
            elif '*' in subject_pattern:
                parts = subject_pattern.split('.')
                test_parts = test_subject.split('.')
                if len(parts) == len(test_parts):
                    part_match = True
                    for i, part in enumerate(parts):
                        if part != '*' and part != test_parts[i]:
                            part_match = False
                            break
                    if part_match:
                        matches = True
                        print(f"✅ Token match with: {subject_pattern}")
        
        if not matches:
            print(f"❌ No match found! The test subject may not be captured by this stream.")
            
    except Exception as e:
        print(f"Error checking subject matches: {e}")
    
    await nc.close()

In [None]:
# Check if the subjects used in the scripts match the stream configuration
await check_subject_matches("METRICS", "metrics.test_service.cpu_usage")
await check_subject_matches("LOGS", "logs.test_service.info")
await check_subject_matches("ALERTS", "alerts.test_alert")
await check_subject_matches("ALERT_DATA", "alerts.test_alert")

## Generate and Publish Data with Fixed Subjects

Create functions to generate proper data for each stream type and publish it with the correct subjects.

In [None]:
# Function to generate and publish metrics data
async def publish_metrics_data(count=5, service="test-service"):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    metrics = ["cpu_usage", "memory_usage", "request_rate", "error_rate"]
    for i in range(count):
        metric = random.choice(metrics)
        value = random.randint(1, 100)
        
        data = {
            "timestamp": datetime.now().isoformat(),
            "service": service,
            "name": metric,
            "value": value
        }
        
        subject = f"metrics.{service.replace('-', '_')}.{metric}"
        
        try:
            ack = await js.publish(subject, json.dumps(data).encode())
            print(f"Published metric {metric} = {value} to {subject} (seq: {ack.seq})")
        except Exception as e:
            print(f"JetStream publish failed: {e}")
            # Fallback to regular NATS publish
            nc.publish(subject, json.dumps(data).encode())
            print(f"Published metric {metric} = {value} to {subject} (regular NATS)")
        
        await asyncio.sleep(0.2)  # Small delay between publications
    
    await nc.close()
    return count

In [None]:
# Function to generate and publish logs data
async def publish_logs_data(count=5, service="test-service"):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    levels = ["INFO", "WARN", "ERROR", "DEBUG"]
    messages = [
        "Service started",
        "Request processed",
        "Database query completed",
        "Connection timeout",
        "Authentication failed",
        "Resource not found"
    ]
    
    for i in range(count):
        level = random.choice(levels)
        message = random.choice(messages)
        
        data = {
            "timestamp": datetime.now().isoformat(),
            "service": service,
            "level": level,
            "message": f"{message} (test {i+1})"
        }
        
        subject = f"logs.{service.replace('-', '_')}.{level.lower()}"
        
        try:
            ack = await js.publish(subject, json.dumps(data).encode())
            print(f"Published log [{level}] {message} to {subject} (seq: {ack.seq})")
        except Exception as e:
            print(f"JetStream publish failed: {e}")
            # Fallback to regular NATS publish
            nc.publish(subject, json.dumps(data).encode())
            print(f"Published log [{level}] {message} to {subject} (regular NATS)")
        
        await asyncio.sleep(0.2)  # Small delay between publications
    
    await nc.close()
    return count

In [None]:
# Function to generate and publish alerts data
async def publish_alerts_data(count=5, service="test-service"):
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()
    
    alert_names = ["HighCpuUsage", "HighMemoryUsage", "HighLatency", "HighErrorRate"]
    severities = ["info", "warning", "critical"]
    
    for i in range(count):
        alert_name = random.choice(alert_names)
        severity = random.choice(severities)
        
        data = {
            "id": f"alert-{int(time.time())}-{i}",
            "labels": {
                "alertname": alert_name,
                "service": service,
                "severity": severity
            },
            "annotations": {
                "summary": f"{alert_name} detected",
                "description": f"Test alert for {service}",
                "value": f"{random.randint(80, 100)}%",
                "threshold": "80%"
            },
            "startsAt": datetime.now().isoformat(),
            "status": random.choice(["open", "acknowledged", "resolved"])
        }
        
        # Try both subject formats to see which one works
        subject1 = f"alerts.{service.replace('-', '_')}.{alert_name.lower()}"
        subject2 = f"alert.{alert_name.lower()}"
        
        try:
            # Try first subject format
            ack = await js.publish(subject1, json.dumps(data).encode())
            print(f"Published alert {alert_name} to {subject1} (seq: {ack.seq})")
        except Exception as e:
            print(f"JetStream publish to {subject1} failed: {e}")
            try:
                # Try second subject format
                ack = await js.publish(subject2, json.dumps(data).encode())
                print(f"Published alert {alert_name} to {subject2} (seq: {ack.seq})")
            except Exception as e2:
                print(f"JetStream publish to {subject2} failed: {e2}")
                # Fallback to regular NATS publish
                nc.publish(subject1, json.dumps(data).encode())
                print(f"Published alert {alert_name} to {subject1} (regular NATS)")
        
        await asyncio.sleep(0.2)  # Small delay between publications
    
    await nc.close()
    return count

In [None]:
# Generate test data for metrics
await publish_metrics_data(5)

In [None]:
# Generate test data for logs
await publish_logs_data(5)

In [None]:
# Generate test data for alerts
await publish_alerts_data(5)

## Check Stream Status Again

Check if our test data was correctly published to the streams.

In [None]:
# Check stream status again to see if our test data was published
stream_info = await get_stream_info()
df = pd.DataFrame(stream_info)
df

## Recommendations

Based on the diagnostics above, here are some recommendations to fix the issue with missing data in streams:

1. **Check Subject Patterns**: Ensure that the subjects used in your data generation scripts match the patterns configured in the streams.

2. **Stream Configuration**: Make sure all streams are properly configured with the correct subject patterns.

3. **Fix Data Generation Scripts**: Update the scripts to use the correct subject patterns when publishing data.

4. **Use JetStream Publishing**: Make sure your scripts are using JetStream publish rather than regular NATS publish.

5. **Stream Creation**: Verify that streams are created before trying to publish to them.

You can use the provided functions in this notebook to test your fixes before running the full generation scripts.