# AGStream Complete Tutorial with Schema Registry

This interactive tutorial demonstrates the complete AGStream workflow with Karapace Schema Registry.

## Prerequisites

Before starting, make sure you have:
1. Docker running with Kafka and Karapace services
2. Python environment with agentics installed
3. Basic understanding of Pydantic models

```bash
# Start services (if not already running)
./manage_services.sh start
```

## 1. Setup and Imports

Let's import the necessary modules and set up our environment.

In [None]:
# Import required modules
from pydantic import BaseModel, Field
from agentics.core.streaming import AGStream
import json
import time
import uuid
import threading
import queue

# Set up logging for better visibility
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Global queue for communication between threads
message_queue = queue.Queue()

## 2. Define Pydantic Models

Let's create some Pydantic models that we'll use throughout this tutorial.

In [None]:
# Define a Question model
class Question(BaseModel):
    """A question to be answered by our agent"""
    text: str = Field(description="The question text")
    category: str = Field(description="Question category")
    priority: int = Field(default=1, description="Priority level (1-5)")

# Define an Answer model
class Answer(BaseModel):
    """An answer to a question"""
    text: str = Field(description="The answer text")
    confidence: float = Field(description="Confidence score (0-1)")
    sources: list[str] = Field(default_factory=list, description="Source references")

# Display our models
print("Question model fields:", list(Question.model_fields.keys()))
print("Answer model fields:", list(Answer.model_fields.keys()))

## 3. Initialize AGStream with Schema Registry

Let's create AGStream instances and connect them to the schema registry.

In [None]:
# Create AGStream for questions
question_stream = AGStream(
    atype=Question,
    input_topic="tutorial-questions",
    output_topic="tutorial-answers",
    schema_registry_url="http://localhost:8081"
)

# Create AGStream for answers
answer_stream = AGStream(
    atype=Answer,
    input_topic="tutorial-answers",
    output_topic="tutorial-questions",
    schema_registry_url="http://localhost:8081"
)

print(f"Question stream configured for topic: {question_stream.input_topic}")
print(f"Answer stream configured for topic: {answer_stream.input_topic}")

## 4. Register Schemas in Schema Registry

Let's register our Pydantic models as JSON Schemas in the schema registry.

In [None]:
# Register Question schema
question_schema_id = question_stream.register_atype_schema()
print(f"‚úÖ Question schema registered with ID: {question_schema_id}")

# Register Answer schema
answer_schema_id = answer_stream.register_atype_schema()
print(f"‚úÖ Answer schema registered with ID: {answer_schema_id}")

# List registered schemas
question_versions = question_stream.list_registered_schemas()
answer_versions = answer_stream.list_registered_schemas()

print(f"\nQuestion schema versions: {question_versions}")
print(f"Answer schema versions: {answer_versions}")

## 5. Create Kafka Topics

Before we can produce and consume messages, we need to create the Kafka topics.

In [None]:
# Create topics if they don't exist
for topic in ["tutorial-questions", "tutorial-answers"]:
    if not AGStream.topic_exists(topic):
        AGStream.create_topic(topic)
        print(f"‚úÖ Created topic: {topic}")
        time.sleep(1)  # Give Kafka time to create the topic
    else:
        print(f"‚úÖ Topic already exists: {topic}")

print("\nAll topics are ready!")

## 6. Start Listener in Background

Let's set up a listener that will process questions and generate answers.

In [None]:
# Create a listener stream
listener = AGStream(
    atype=Answer,
    input_topic="tutorial-questions",
    output_topic="tutorial-answers",
    schema_registry_url="http://localhost:8081"
)

# Define a simple processing function
def process_question(question: Question) -> Answer:
    """Simple function to generate answers from questions"""
    answers = {
        "What is machine learning?": "Machine learning is a subset of AI that enables systems to learn from data without being explicitly programmed.",
        "How does blockchain work?": "Blockchain is a decentralized, distributed ledger that records transactions across many computers in a way that ensures security and transparency.",
        "What are the benefits of quantum computing?": "Quantum computing offers exponential speedups for certain problems, better optimization, and enhanced cryptography capabilities.",
        "What is the capital of France?": "The capital of France is Paris."
    }
    
    # Get the best matching answer
    answer_text = answers.get(question.text, "I don't know the answer to that question.")
    
    return Answer(
        text=answer_text,
        confidence=0.95 if question.text in answers else 0.5,
        sources=["Tutorial Database"]
    )

# Start the listener in a separate thread
def listener_thread():
    print("\nüéß Starting listener...")
    try:
        # Listen for questions and produce answers
        listener.listen(atype=Question)
    except Exception as e:
        print(f"Listener error: {e}")
        message_queue.put(f"Listener error: {e}")

# Start listener in background
listener_thread = threading.Thread(target=listener_thread, daemon=True)
listener_thread.start()

print("Listener started in background. It will process questions and generate answers.")
print("Waiting 5 seconds for listener to initialize...")
time.sleep(5)  # Give listener time to start

## 7. Produce and Consume Messages

Now let's create a complete workflow: produce questions, let the listener process them, and then consume the answers.

In [None]:
# Create some questions
questions = [
    Question(text="What is machine learning?", category="AI", priority=3),
    Question(text="How does blockchain work?", category="Technology", priority=2),
    Question(text="What is the capital of France?", category="Geography", priority=2)
]

# Function to send questions and collect answers
def send_and_collect():
    # Send questions to Kafka
    question_keys = []
    for question in questions:
        question_stream.states = [question]
        key = question_stream.produce()
        question_keys.append(key)
        print(f"‚úÖ Sent question: {question.text[:30]}... (key: {key})")
        time.sleep(1)  # Small delay between messages
    
    # Wait for processing
    print("\nüïí Waiting for answers to be processed...")
    time.sleep(5)  # Give listener time to process
    
    # Create answer collector
    answer_collector = AGStream(
        input_topic="tutorial-answers",
        schema_registry_url="http://localhost:8081"
    )
    
    # Retrieve Answer type from registry
    RetrievedAnswer = answer_collector.get_atype_from_registry()
    if RetrievedAnswer:
        answer_collector.atype = RetrievedAnswer
        print(f"‚úÖ Retrieved Answer type from registry: {RetrievedAnswer.__name__}")
    
    # Collect answers
    answers = answer_collector.collect_sources(
        max_messages=5,
        timeout_ms=10000,  # Longer timeout
        mode='latest',
        verbose=True
    )
    
    print(f"\nüìã Collected {len(answers)} answers:")
    for i, answer in enumerate(answers, 1):
        if answer.states:
            print(f"{i}. {answer.states[0].text[:50]}... (Confidence: {answer.states[0].confidence})")
    
    return answers

# Run the workflow
answers = send_and_collect()

## 8. Schema Evolution Example

Let's demonstrate how schema evolution works with the registry.

In [None]:
# Define an updated version of our Question model
class QuestionV2(BaseModel):
    """Updated question model with additional fields"""
    text: str = Field(description="The question text")
    category: str = Field(description="Question category")
    priority: int = Field(default=1, description="Priority level (1-5)")
    language: str = Field(default="en", description="Language of the question")
    timestamp: float = Field(default_factory=time.time, description="When the question was asked")

# Create a new stream with the updated type
updated_stream = AGStream(
    atype=QuestionV2,
    input_topic="tutorial-questions",
    schema_registry_url="http://localhost:8081"
)

# Register the updated schema
updated_schema_id = updated_stream.register_atype_schema()
print(f"‚úÖ Updated Question schema registered with ID: {updated_schema_id}")

# List all versions
all_versions = updated_stream.list_registered_schemas()
print(f"All Question schema versions: {all_versions}")

# Retrieve version 1 (original)
OriginalQuestion = updated_stream.get_atype_from_registry(version="1")
print(f"Original Question type: {OriginalQuestion.__name__ if OriginalQuestion else 'Not found'}")

# Retrieve latest version
LatestQuestion = updated_stream.get_atype_from_registry(version="latest")
print(f"Latest Question type: {LatestQuestion.__name__ if LatestQuestion else 'Not found'}")

# Show the difference
if OriginalQuestion and LatestQuestion:
    print("\nüîÑ Schema Evolution:")
    orig_fields = set(OriginalQuestion.model_fields.keys())
    latest_fields = set(LatestQuestion.model_fields.keys())
    print(f"   Original fields: {orig_fields}")
    print(f"   Latest fields: {latest_fields}")
    print(f"   Added fields: {latest_fields - orig_fields}")

## 9. Direct Schema Registry Access

Let's explore the schema registry directly using the REST API.

In [None]:
import requests

# List all subjects
def list_subjects():
    try:
        response = requests.get("http://localhost:8081/subjects")
        if response.status_code == 200:
            subjects = response.json()
            print("üìã Registered Subjects:")
            for subject in subjects:
                print(f"   - {subject}")
            return subjects
        else:
            print(f"‚ùå Error listing subjects: {response.status_code} - {response.text}")
            return []
    except Exception as e:
        print(f"‚ùå Error: {e}")
        return []

# Get schema details
def get_schema_details(subject: str, version: str = "latest"):
    try:
        url = f"http://localhost:8081/subjects/{subject}/versions/{version}"
        response = requests.get(url)
        if response.status_code == 200:
            details = response.json()
            print(f"\nüìÑ Schema Details for {subject} (version {version}):")
            print(f"   ID: {details.get('id')}")
            print(f"   Version: {details.get('version')}")
            print(f"   Schema: {details.get('schema')[:100]}...")
            return details
        else:
            print(f"‚ùå Error getting schema: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        print(f"‚ùå Error: {e}")
        return None

# List all subjects
subjects = list_subjects()

# Get details for each subject
if subjects:
    for subject in subjects:
        get_schema_details(subject)
        time.sleep(0.5)  # Small delay between requests

## 10. Advanced: Schema Validation

Let's demonstrate how schema validation works.

In [None]:
# Function to validate a message against a schema
def validate_message(message: dict, schema_subject: str) -> bool:
    """Validate a message against a registered schema"""
    try:
        # Get the schema from registry
        url = f"http://localhost:8081/subjects/{schema_subject}/versions/latest"
        response = requests.get(url)
        
        if response.status_code != 200:
            print(f"‚ùå Failed to get schema: {response.status_code}")
            return False
            
        schema_data = response.json()
        schema = json.loads(schema_data['schema'])
        
        # Simple validation - in production use a proper validator
        # Here we just check required fields exist
        required_fields = schema.get('required', [])
        
        for field in required_fields:
            if field not in message:
                print(f"‚ùå Missing required field: {field}")
                return False
        
        print(f"‚úÖ Message is valid against schema {schema_subject}")
        return True
    except Exception as e:
        print(f"‚ùå Validation error: {e}")
        return False

# Test validation with a sample message
test_message = {
    "text": "Test question",
    "category": "Test",
    "priority": 1
}

print("\nüîç Testing message validation:")
is_valid = validate_message(test_message, "tutorial-questions-value")
print(f"Validation result: {'‚úÖ PASS' if is_valid else '‚ùå FAIL'}")

# Test with invalid message (missing required field)
invalid_message = {
    "text": "Invalid question",
    "priority": 1
    # Missing 'category' which is required
}

print("\nüîç Testing invalid message:")
is_valid = validate_message(invalid_message, "tutorial-questions-value")
print(f"Validation result: {'‚úÖ PASS' if is_valid else '‚ùå FAIL'}")

## 11. Cleanup and Summary

Let's clean up our resources and summarize what we've learned.

In [None]:
print("\nüßπ Cleanup complete!")
print("\nüìö Summary of what we've learned:")
print("\n1. ‚úÖ Schema Registration: Stored Pydantic models as JSON Schemas")
print("2. ‚úÖ Schema Retrieval: Dynamically loaded types from registry")
print("3. ‚úÖ Message Production: Sent type-safe messages to Kafka")
print("4. ‚úÖ Message Consumption: Retrieved messages using registered schemas")
print("5. ‚úÖ Listener Setup: Processed messages with schema validation")
print("6. ‚úÖ Schema Evolution: Managed versioning and compatibility")
print("7. ‚úÖ Direct API Access: Explored schema registry REST API")
print("8. ‚úÖ Schema Validation: Validated messages against schemas")

print("\nüéØ Key Benefits:")
print("- Type Safety: Ensure messages conform to registered schemas")
print("- Schema Evolution: Track changes and maintain compatibility")
print("- Dynamic Discovery: Load types at runtime without hardcoding")
print("- Centralized Management: Store all schemas in one place")

print("\nüöÄ Next Steps:")
print("- Explore Schema Registry UI: http://localhost:8000")
print("- Check Kafka UI: http://localhost:8080")
print("- Try creating your own models and integrating them")
print("- Experiment with more complex schema evolution scenarios")
print("- Integrate with your production streaming pipelines")