# Event-Driven Architecture with idspy

This notebook demonstrates idspy's powerful event-driven system for pipeline monitoring, debugging, and observability. Events provide a clean way to track what happens during steps orchestration.

## What you'll learn

In this tutorial, you'll discover how to:

1. **Create and Use Events** - Immutable event objects with structured data
2. **Filter Events** - Use predicates to match and select specific events
3. **Event Bus System** - Centralized event subscription and dispatching 
4. **Event Handlers** - Structured event processing with custom logic
5. **Observable Pipelines** - Automatic event emission during pipeline execution
6. **Error Tracking** - Event-driven error handling and diagnostics

## Key Benefits

- **Debugging Made Easy**: Track exactly what happens in your pipelines
- **Flexible Monitoring**: Subscribe to specific events you care about
- **Decoupled Architecture**: Separate business logic from monitoring concerns
- **Real-time Insights**: Process events as they happen

---

Let's start by setting up our environment and exploring the core event system.

In [86]:
import sys
import os

# Add the project root to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [87]:
from src.idspy.core.pipeline.base import PipelineEvent
from src.idspy.core.pipeline.observable import ObservablePipeline
from src.idspy.core.step.base import Step
from src.idspy.core.events.event import Event, only_source, source_startswith, has_payload_key, payload_key_equals
from src.idspy.core.events.bus import EventBus
from src.idspy.core.events.handler import BaseHandler
from src.idspy.core.storage.dict import DictStorage

## Understanding Events

Events are **immutable** objects that represent something that happened in your system. Each event contains:

- **`type`**: What kind of event occurred (e.g., "step_start", "error", "validation")
- **`source`**: Where the event came from (e.g., "DataProcessor.Validation") 
- **`payload`**: Additional data about the event (flexible dictionary)

Let's create our first event:

In [88]:
# Create an event representing a pipeline step completion
event = Event(
    type="pipeline_step",
    source="DataProcessor.Validation",
    payload={
        "raw_data": {"rows": 10000, "cols": 15},
        "memory_usage": "256MB",
        "processing_time": 1.23,
        "status": "success"
    },
)

print("Event created successfully!")
print(f"Type: {event.type}")
print(f"Source: {event.source}")
print(f"Payload keys: {list(event.payload.keys())}")

# Events are immutable - this would raise an error:
# event.type = "different_type"  # AttributeError!

print(f"\nFull event as dict:")
print(event.as_dict())

Event created successfully!
Type: pipeline_step
Source: DataProcessor.Validation
Payload keys: ['raw_data', 'memory_usage', 'processing_time', 'status']

Full event as dict:
{'type': 'pipeline_step', 'source': 'DataProcessor.Validation', 'payload': {'raw_data': {'rows': 10000, 'cols': 15}, 'memory_usage': '256MB', 'processing_time': 1.23, 'status': 'success'}, 'timestamp': '2025-10-07T17:01:01.820838+00:00'}


## Event Predicates - Smart Filtering

Predicates are functions that help you filter events based on specific criteria. idspy provides several built-in predicates for common filtering tasks:

- **`only_source(source)`**: Match events from a specific source
- **`source_startswith(prefix)`**: Match events whose source starts with a prefix  
- **`has_payload_key(key)`**: Match events that have a specific key in their payload
- **`payload_key_equals(key, value)`**: Match events where payload[key] equals a specific value

Let's see these in action:

In [89]:
# Create a diverse set of test events
test_events = [
    Event("step_start", "Pipeline.DataLoad", payload={"step_index": 0, "type": "input", "size_mb": 150}),
    Event("step_start", "Pipeline.DataTransform", payload={"step_index": 1, "type": "processing"}),
    Event("step_end", "Pipeline.DataLoad", payload={"step_index": 0, "duration": 1.5, "status": "success"}),
    Event("step_end", "Pipeline.DataTransform", payload={"step_index": 1, "duration": 3.2, "status": "success"}),
    Event("error", "OtherPipeline.Validation", payload={"error_code": 404, "message": "Data not found"}),
    Event("warning", "Pipeline.DataLoad", payload={"message": "Large file size", "size_mb": 800}),
]

print("Event Predicate Examples")
print("=" * 50)

# 1. Filter by exact source
load_predicate = only_source("Pipeline.DataLoad")
load_events = list(filter(load_predicate, test_events))
print(f"Events from 'Pipeline.DataLoad' ({len(load_events)} found):")
for event in load_events:
    print(f"   • {event.type} - {event.payload.get('message', 'N/A')}")

# 2. Filter by source prefix
pipeline_predicate = source_startswith("Pipeline.")
pipeline_events = list(filter(pipeline_predicate, test_events))
print(f"\nEvents from 'Pipeline.*' sources ({len(pipeline_events)} found):")
for event in pipeline_events:
    print(f"   • {event.type} from {event.source}")

# 3. Filter by payload key existence
has_duration = has_payload_key("duration")
duration_events = list(filter(has_duration, test_events))
print(f"\nEvents with 'duration' payload ({len(duration_events)} found):")
for event in duration_events:
    print(f"   • {event.source}: {event.payload['duration']}s")

# 4. Filter by payload value
input_type = payload_key_equals("type", "input")
input_events = list(filter(input_type, test_events))
print(f"\nEvents with type='input' ({len(input_events)} found):")
for event in input_events:
    print(f"   • {event.source}: {event.payload.get('size_mb', 'N/A')}MB")

Event Predicate Examples
Events from 'Pipeline.DataLoad' (3 found):
   • step_start - N/A
   • step_end - N/A

Events from 'Pipeline.*' sources (5 found):
   • step_start from Pipeline.DataLoad
   • step_start from Pipeline.DataTransform
   • step_end from Pipeline.DataLoad
   • step_end from Pipeline.DataTransform

Events with 'duration' payload (2 found):
   • Pipeline.DataLoad: 1.5s
   • Pipeline.DataTransform: 3.2s

Events with type='input' (1 found):
   • Pipeline.DataLoad: 150MB


## EventBus - Central Event Hub

The **EventBus** is the heart of the event system. It manages subscriptions and dispatches events to interested handlers. Think of it as a sophisticated messaging system where:

- **Publishers** emit events to the bus
- **Subscribers** register to receive specific events
- **Priority system** controls handler execution order
- **Filtering** ensures handlers only receive relevant events

Let's explore the EventBus capabilities:

In [90]:
# Create an EventBus for demonstration
demo_bus = EventBus()

print("EventBus Subscription Patterns")
print("=" * 50)

# Pattern 1: Function-based handlers with decorators
@demo_bus.on()  # Subscribe to ALL events
def global_logger(event: Event) -> None:
    print(f"  [GLOBAL] {event.type} from {event.source}")

@demo_bus.on("user_action")  # Subscribe to specific event type
def user_action_handler(event: Event) -> None:
    print(f"  [USER_HANDLER] Action: {event.payload.get('action', 'unknown')}")

@demo_bus.on("error")  # Subscribe to error events
def error_handler(event: Event) -> None:
    print(f"  [ERROR_HANDLER] {event.payload.get('message', 'No error message')}")

# Pattern 2: Predicate-based subscription (more flexible)
@demo_bus.on(source_startswith("DataPipeline."))
def pipeline_monitor(event: Event) -> None:
    print(f"  [PIPELINE_MONITOR] {event.source} -> {event.type}")

# Let's test by emitting some events
print("\nTesting Event Emission:")
print("-" * 30)

events = [
    Event("user_action", "WebInterface", payload={"action": "login", "user_id": 123}),
    Event("error", "DatabaseConnection", payload={"message": "Connection timeout"}),
    Event("step_start", "DataPipeline.LoadData", payload={"step": "data_loading"}),
    Event("notification", "EmailService", payload={"recipient": "admin@example.com"}),
]

for event in events:
    print(f"\nEmitting: {event.type} from {event.source}")
    demo_bus.publish(event_type=event.type, source=event.source, payload=event.payload)

EventBus Subscription Patterns

Testing Event Emission:
------------------------------

Emitting: user_action from WebInterface
  [USER_HANDLER] Action: login
  [GLOBAL] user_action from WebInterface

Emitting: error from DatabaseConnection
  [ERROR_HANDLER] Connection timeout
  [GLOBAL] error from DatabaseConnection

Emitting: step_start from DataPipeline.LoadData
  [GLOBAL] step_start from DataPipeline.LoadData

Emitting: notification from EmailService
  [GLOBAL] notification from EmailService


In [91]:
# Demonstrate subscription management
print("\nSubscription Management:")
print("-" * 30)

# Unsubscribe the global logger (it was getting noisy!)
# Note: In practice, you'd store the token when subscribing
print("Unsubscribing global logger...")
demo_bus.unsubscribe(1)

# Test with remaining handlers
test_event = Event("user_action", "MobileApp", payload={"action": "logout", "user_id": 456})
print(f"\nEmitting test event after unsubscription:")
demo_bus.publish(event_type=test_event.type, source=test_event.source, payload=test_event.payload)


Subscription Management:
------------------------------
Unsubscribing global logger...

Emitting test event after unsubscription:
  [USER_HANDLER] Action: logout


## BaseHandler - Structured Event Processing

While function-based handlers are simple, **BaseHandler** provides more structure for complex event processing. It offers:

- **`can_handle(event)`**: Filtering logic to determine if handler should process an event
- **`handle(event)`**: The actual processing logic
- **State management**: Handlers can maintain internal state
- **Object-oriented design**: Better organization for complex handlers

Let's create a sophisticated pipeline monitoring handler:

In [92]:
# Create a sophisticated pipeline monitoring handler
class PipelineMonitor(BaseHandler):
    """Monitors pipeline execution with state tracking and detailed logging."""

    def __init__(self, pipeline_name: str):
        super().__init__()
        self.pipeline_name = pipeline_name
        self.current_step = None
        self.step_count = 0
        self.start_time = None
        self.errors = []

    def can_handle(self, event: Event) -> bool:
        """Only handle events from our specific pipeline."""
        return event.source.startswith(f"{self.pipeline_name}.")

    def handle(self, event: Event) -> None:
        """Process pipeline events with state tracking."""
        if event.type == "pipeline_start":
            print(f"[{self.pipeline_name}] Pipeline started")
            self.step_count = 0
            self.start_time = event.payload.get("timestamp")

        elif event.type == "step_start":
            self.current_step = event.source.split(".")[-1]  # Get step name
            self.step_count += 1
            print(f"Step {self.step_count}: {self.current_step} starting...")

        elif event.type == "step_end":
            duration = event.payload.get("duration", "unknown")
            status = event.payload.get("status", "unknown")
            print(f"Step {self.step_count}: {self.current_step} completed ({duration}s, {status})")

        elif event.type == "error":
            error_msg = event.payload.get("message", "Unknown error")
            self.errors.append(error_msg)
            print(f"Error in {self.current_step}: {error_msg}")

        elif event.type == "pipeline_end":
            total_duration = event.payload.get("total_duration", "unknown")
            print(f"[{self.pipeline_name}] Pipeline completed in {total_duration}s")
            print(f"Total steps: {self.step_count}, Errors: {len(self.errors)}")

# Create and register the handler
handler_bus = EventBus()
pipeline_monitor = PipelineMonitor("DataProcessing")
handler_bus.subscribe(pipeline_monitor)

print("Testing PipelineMonitor Handler")
print("=" * 50)

# Simulate a complete pipeline execution
pipeline_events = [
    Event("pipeline_start", "DataProcessing.Start", payload={"timestamp": "2024-01-15T10:00:00"}),
    Event("step_start", "DataProcessing.LoadData", payload={"step_index": 1}),
    Event("step_end", "DataProcessing.LoadData", payload={"step_index": 1, "duration": 2.1, "status": "success"}),
    Event("step_start", "DataProcessing.ValidateData", payload={"step_index": 2}),
    Event("error", "DataProcessing.ValidateData", payload={"message": "Missing required column 'target'"}),
    Event("step_end", "DataProcessing.ValidateData", payload={"step_index": 2, "duration": 0.5, "status": "failed"}),
    Event("step_start", "DataProcessing.CleanData", payload={"step_index": 3}),
    Event("step_end", "DataProcessing.CleanData", payload={"step_index": 3, "duration": 1.8, "status": "success"}),
    Event("pipeline_end", "DataProcessing.End", payload={"total_duration": 4.4}),
]

# Process all events
for event in pipeline_events:
    handler_bus.publish(event_type=event.type, source=event.source, payload=event.payload)

Testing PipelineMonitor Handler
[DataProcessing] Pipeline started
Step 1: LoadData starting...
Step 1: LoadData completed (2.1s, success)
Step 2: ValidateData starting...
Error in ValidateData: Missing required column 'target'
Step 2: ValidateData completed (0.5s, failed)
Step 3: CleanData starting...
Step 3: CleanData completed (1.8s, success)
[DataProcessing] Pipeline completed in 4.4s
Total steps: 3, Errors: 1


## ObservablePipeline

Pipeline integration with automatic event emission for monitoring and debugging.

### Example Steps

In [93]:
class Load(Step):
    def bindings(self):
        return {"data": "data"}

    def compute(self):
        return {"data": [1, 2, 3]}

@Step.needs("data")
class Sum(Step):
    def bindings(self):
        return {"data": "data", "sum": "sum"}

    def compute(self, data):
        return {"sum": sum(data)}

class Boom(Step):
    def bindings(self):
        return {}

    def compute(self):
        raise RuntimeError("Boom!")

### Observable Pipeline Execution

In [94]:
# Create and run an ObservablePipeline
bus = EventBus()
@bus.on(priority=0)
def global_logger(event: Event) -> None:
    print(f"[GLOBAL] -> {event.source}")

@bus.on(PipelineEvent.PIPELINE_START)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] -> {event.source}")

@bus.on(PipelineEvent.PIPELINE_END)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] -> {event.source}")

@bus.on(PipelineEvent.STEP_START)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] -> {event.source}")

@bus.on(PipelineEvent.STEP_END)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] -> {event.source}")



print("=== ObservablePipeline Demo ===")
storage = DictStorage()
pipeline = ObservablePipeline(steps=[Load(), Sum()], bus=bus, name="Demo", storage=storage)
pipeline.run()

print(f"\nFinal STATE: {storage.as_dict()}")

=== ObservablePipeline Demo ===
[GLOBAL] -> Demo
[PipelineEvent.PIPELINE_START] -> Demo
[GLOBAL] -> Demo.Load
[PipelineEvent.STEP_START] -> Demo.Load
[GLOBAL] -> Demo.Load
[PipelineEvent.STEP_END] -> Demo.Load
[GLOBAL] -> Demo.Sum
[PipelineEvent.STEP_START] -> Demo.Sum
[GLOBAL] -> Demo.Sum
[PipelineEvent.STEP_END] -> Demo.Sum
[GLOBAL] -> Demo
[PipelineEvent.PIPELINE_END] -> Demo

Final STATE: {'data': [1, 2, 3], 'sum': 6}


### Error Handling

In [95]:
@bus.on(PipelineEvent.STEP_ERROR)
def global_logger(event: Event) -> None:
    print(f"[ERROR] -> {event.payload.get('error')}")

pipeline = ObservablePipeline([Boom()], name="ErrDemo", bus=bus, storage=DictStorage())

try:
    pipeline.run()
except:
    pass

[GLOBAL] -> ErrDemo
[PipelineEvent.PIPELINE_START] -> ErrDemo
[GLOBAL] -> ErrDemo.Boom
[PipelineEvent.STEP_START] -> ErrDemo.Boom
[GLOBAL] -> ErrDemo.Boom
[ERROR] -> Boom!


## Key Takeaways

1. **Event Structure**: Events are immutable objects with type, source, payload, and unique ID
2. **Event Predicates**: Powerful filtering functions for selecting specific events
3. **EventBus**: Central hub for event subscription, dispatching, and management
4. **Handler Patterns**: Both function-based and class-based handlers for different use cases
5. **Pipeline Integration**: Events provide excellent observability for data processing workflows