# Event Examples

This notebook demonstrates the event-driven system for pipeline monitoring, debugging, and observability through immutable events and centralized dispatch.

## Features Demonstrated

- **Event Class**: Immutable event objects with type, ID, constraints, and state
- **Event Predicates**: Filtering functions for event matching and selection
- **EventBus**: Centralized subscription and dispatching system with priority support
- **BaseHandler**: Structured event handling with custom logic and filtering
- **ObservablePipeline**: Pipeline integration with automatic event emission
- **Error Handling**: Event-driven error tracking and pipeline diagnostics

## Setup

In [10]:
import sys
import os

# Add the project root to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [None]:
from src.idspy.core.pipeline import PipelineEvent, ObservablePipeline
from src.idspy.core.state import State
from src.idspy.core.step import Step
from src.idspy.events.bus import EventBus, BaseHandler
from src.idspy.events.events import Event, only_id, id_startswith, has_constraint_key, constraint_equals

## Event Class

Immutable event objects with type, ID, constraints, and state payload.

In [12]:
# Event with complex nested data
event = Event(
    type="pipeline_step",
    id="DataProcessor.Validation",
    constraints={
        "requires": ["raw_data", "schema"],
        "provides": ["validated_data"],
        "index": 2,
        "batch_size": 1000
    },
    state={
        "raw_data": {"rows": 10000, "cols": 15},
        "memory_usage": "256MB",
        "processing_time": 1.23
    }
)

# Events are immutable - this would raise an error:
# event.type = "different_type"  # AttributeError!

print(f"Event as dict: {event.to_dict()}")

Event as dict: {'type': 'pipeline_step', 'id': 'DataProcessor.Validation', 'constraints': {'requires': ['raw_data', 'schema'], 'provides': ['validated_data'], 'index': 2, 'batch_size': 1000}, 'state': {'raw_data': {'rows': 10000, 'cols': 15}, 'memory_usage': '256MB', 'processing_time': 1.23}, 'timestamp': '2025-09-17T17:29:32.767605+00:00'}


### Event Predicates

Filter functions for event matching: `only_id()`, `id_startswith()`, `has_constraint_key()`, `constraint_equals()`.

In [None]:
# Create some test events
events = [
    Event("step_start", "Pipeline.Load", constraints={"index": 0, "type": "input"}),
    Event("step_start", "Pipeline.Transform", constraints={"index": 1, "type": "processing"}),
    Event("step_end", "Pipeline.Load", constraints={"index": 0, "duration": 1.5}),
    Event("error", "OtherPipeline.Validate", constraints={"error_code": 404}),
]

print("Event Predicate Examples:")
print("=" * 40)

# Test only_id predicate
load_predicate = only_id("Pipeline.Load")
print("Events with ID 'Pipeline.Load':")
for event in filter(load_predicate, events):
    print(f"  {event.type} :: {event.id}")

# Test id_startswith predicate
pipeline_predicate = id_startswith("Pipeline.")
print("\nEvents with ID starting with 'Pipeline.':")
for event in filter(pipeline_predicate, events):
    print(f"  {event.type} :: {event.id}")

# Test has_constraint_key predicate
has_duration = has_constraint_key("duration")
print("\nEvents with 'duration' constraint:")
for event in filter(has_duration, events):
    print(f"  {event.type} :: {event.id} (duration: {event.constraints['duration']})")

# Test constraint_equals predicate
input_type = constraint_equals("type", "input")
print("\nEvents with type='input':")
for event in filter(input_type, events):
    print(f"  {event.type} :: {event.id}")

Event Predicate Examples:
Events with ID 'Pipeline.Load':
  step_start :: Pipeline.Load
  step_end :: Pipeline.Load

Events with ID starting with 'Pipeline.':
  step_start :: Pipeline.Load
  step_start :: Pipeline.Transform
  step_end :: Pipeline.Load

Events with 'duration' constraint:
  step_end :: Pipeline.Load (duration: 1.5)

Events with type='input':
  step_start :: Pipeline.Load


## EventBus

Centralized event dispatching with subscription patterns and priority control.

In [14]:
# Demonstrate EventBus subscription patterns
demo_bus = EventBus()

# Pattern 1: Function-based handlers with decorators
@demo_bus.on()  # Subscribe to ALL events
def global_logger(event: Event) -> None:
    print(f"[GLOBAL] {event.type} from {event.id}")

@demo_bus.on("user_action")  # Subscribe to specific event type
def user_action_handler(event: Event) -> None:
    print(f"[USER] Action: {event.constraints.get('action', 'unknown')}")

@demo_bus.on("system_event", predicate=has_constraint_key("user"), priority=0)  # With predicate and priority (default 1)
def priority_system_handler(event: Event) -> None:
    user = event.constraints.get("user")
    print(f"[USER] priority 0 System event for user: {user}")

@demo_bus.on("system_event", predicate=has_constraint_key("user"), priority=2)
def priority_system_handler(event: Event) -> None:
    user = event.constraints.get("user")
    print(f"[USER] priority 2 System event for user: {user}")

# Pattern 2: Manual subscription with tokens
token1 = demo_bus.subscribe(
    lambda e: print(f"[MANUAL] {e.type}"),
    event_type="pipeline_step"
)

# Test the subscriptions
print("Testing EventBus Subscriptions:")
print("=" * 40)

test_events = [
    Event("user_action", "login", constraints={"action": "authenticate", "user": "alice"}),
    Event("system_event", "maintenance", constraints={"priority": "high", "duration": 30}),
    Event("pipeline_step", "Pipeline.Process", constraints={"index": 1}),
    Event("other_event", "misc", constraints={"data": "test"}),
]

for event in test_events:
    print(f"\nPublishing: {event.type} :: {event.id}")
    demo_bus.publish(event)

# Demonstrate unsubscription
print(f"\nUnsubscribing token {token1}: {demo_bus.unsubscribe(token1)}")
print("After unsubscription:")
demo_bus.publish(Event("pipeline_step", "Pipeline.Another", constraints={"index": 2}))

Testing EventBus Subscriptions:

Publishing: user_action :: login
[USER] Action: authenticate
[GLOBAL] user_action from login

Publishing: system_event :: maintenance
[GLOBAL] system_event from maintenance

Publishing: pipeline_step :: Pipeline.Process
[MANUAL] pipeline_step
[GLOBAL] pipeline_step from Pipeline.Process

Publishing: other_event :: misc
[GLOBAL] other_event from misc

Unsubscribing token 5: True
After unsubscription:
[GLOBAL] pipeline_step from Pipeline.Another


## BaseHandler

Structured event handling with custom logic and filtering capabilities.

In [18]:
# Example: Conditional pipeline monitor
class PipelineMonitor(BaseHandler):
    def __init__(self, pipeline_name: str):
        super().__init__()
        self.pipeline_name = pipeline_name
        self.current_step = None
        self.step_count = 0

    def can_handle(self, event: Event) -> bool:
        # Only handle events from our specific pipeline
        return event.id.startswith(f"{self.pipeline_name}.")

    def handle(self, event: Event) -> None:
        if event.type == "before_step":
            self.current_step = event.id
            self.step_count += 1
            print(f"[MONITOR] {self.pipeline_name} starting step {self.step_count}: {event.id}")
        elif event.type == "after_step":
            print(f"[MONITOR] {self.pipeline_name} completed step: {event.id}")
            self.current_step = None

handler_bus = EventBus()
pipeline_monitor = PipelineMonitor("TestPipeline")
handler_bus.subscribe(pipeline_monitor)

# Test with various events
test_events = [
    Event("before_step", "TestPipeline.Load", constraints={"index": 0}),
    Event("after_step", "TestPipeline.Load", constraints={"index": 0, "duration": 1.2}),
    Event("before_step", "TestPipeline.Process", constraints={"index": 1}),
    Event("error", "TestPipeline.Process", constraints={"error": "Invalid data format"}),
    Event("after_step", "OtherPipeline.Export", constraints={"index": 2, "duration": 0.8}),
]

print("Testing BaseHandler Implementations:")
print("=" * 50)

for event in test_events:
    print(f"\nPublishing: {event.type} :: {event.id}")
    handler_bus.publish(event)

Testing BaseHandler Implementations:

Publishing: before_step :: TestPipeline.Load
[MONITOR] TestPipeline starting step 1: TestPipeline.Load

Publishing: after_step :: TestPipeline.Load
[MONITOR] TestPipeline completed step: TestPipeline.Load

Publishing: before_step :: TestPipeline.Process
[MONITOR] TestPipeline starting step 2: TestPipeline.Process

Publishing: error :: TestPipeline.Process

Publishing: after_step :: OtherPipeline.Export


## ObservablePipeline

Pipeline integration with automatic event emission for monitoring and debugging.

### Example Steps

In [16]:
@Step.provides(data=list)
class Load(Step):
    def run(self, **inputs) -> dict:
        return {"data": [1, 2, 3]}

@Step.requires(data=list)
@Step.provides(sum=int)
class Sum(Step):
    def run(self, data, **inputs) -> dict:
        return {"sum": sum(data)}

@Step.requires(missing=object)
class Boom(Step):
    def run(self, **inputs) -> dict:
        # never reached because requires isn't satisfied
        return {}

### Observable Pipeline Execution

In [None]:
# Create and run an ObservablePipeline
bus = EventBus()
@bus.on(priority=0)
def global_logger(event: Event) -> None:
    print(f"[GLOBAL] {event.type} from {event.id}")

@bus.on(PipelineEvent.PIPELINE_START)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] {event.id}")

@bus.on(PipelineEvent.PIPELINE_END)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] {event.id}")

@bus.on(PipelineEvent.BEFORE_STEP)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] {event.id}")

@bus.on(PipelineEvent.AFTER_STEP)
def global_logger(event: Event) -> None:
    print(f"[{event.type}] {event.id}")



print("=== ObservablePipeline Demo ===")
p = ObservablePipeline([Load(), Sum()], name="Demo", bus=bus)

s = State()
p(s)
print(f"\nFinal STATE: {s.to_dict()}")

=== ObservablePipeline Demo ===
[GLOBAL] PipelineEvent.PIPELINE_START from Demo
[PipelineEvent.PIPELINE_START] Demo
[GLOBAL] PipelineEvent.BEFORE_STEP from Demo.Load
[PipelineEvent.BEFORE_STEP] Demo.Load
[GLOBAL] PipelineEvent.AFTER_STEP from Demo.Load
[PipelineEvent.AFTER_STEP] Demo.Load
[GLOBAL] PipelineEvent.BEFORE_STEP from Demo.Sum
[PipelineEvent.BEFORE_STEP] Demo.Sum
[GLOBAL] PipelineEvent.AFTER_STEP from Demo.Sum
[PipelineEvent.AFTER_STEP] Demo.Sum
[GLOBAL] PipelineEvent.PIPELINE_END from Demo
[PipelineEvent.PIPELINE_END] Demo

Final STATE: {'data': [1, 2, 3], 'sum': 6}


### Error Handling

In [23]:
@bus.on(PipelineEvent.ON_ERROR)
def global_logger(event: Event) -> None:
    print(f"[ERROR] {event.id}")

p_err = ObservablePipeline([Boom()], name="ErrDemo", bus=bus)

try:
    p_err(State())
except KeyError:
    pass

[GLOBAL] PipelineEvent.PIPELINE_START from ErrDemo
[PipelineEvent.PIPELINE_START] ErrDemo
[GLOBAL] PipelineEvent.BEFORE_STEP from ErrDemo.Boom
[PipelineEvent.BEFORE_STEP] ErrDemo.Boom
[GLOBAL] PipelineEvent.ON_ERROR from ErrDemo.Boom
[PipelineEvent.ON_ERROR] ErrDemo.Boom
[ERROR] ErrDemo.Boom
[GLOBAL] PipelineEvent.PIPELINE_END from ErrDemo
[PipelineEvent.PIPELINE_END] ErrDemo
