# Tutorial 3: Hooks and Observability

Learn how to register lifecycle hooks for monitoring, logging, and custom behaviors in your data pipelines.

**Learning Objectives:**
- Understand pipeline lifecycle events
- Register hooks for different stages (pre_read, post_transform, etc.)
- Use filters to target specific projects/layers/engines
- Build custom observability (logging, metrics, alerts)
- Debug pipelines with hook introspection

**Prerequisites:**
- odibi_de_v2 installed
- Completion of Tutorial 1 recommended

## Part 1: Understanding Lifecycle Events

**Standard Events:**
- `pre_read`: Before data ingestion
- `post_read`: After data is loaded
- `pre_transform`: Before transformation logic
- `post_transform`: After transformation completes
- `pre_save`: Before writing output
- `post_save`: After data is saved
- `on_error`: When an exception occurs
- `pipeline_start`: Beginning of orchestration
- `pipeline_end`: End of orchestration

In [None]:
from odibi_de_v2.hooks import HookManager
from datetime import datetime

# Create a hook manager
hooks = HookManager()

# Simple logging hook
def log_event(payload):
    event_time = datetime.now().strftime("%H:%M:%S")
    event_type = payload.get('event', 'unknown')
    print(f"[{event_time}] Event: {event_type}")

# Register for multiple events
hooks.register("pre_read", log_event)
hooks.register("post_read", log_event)
hooks.register("pre_transform", log_event)
hooks.register("post_transform", log_event)

print("✓ Registered logging hooks for pipeline events")

In [None]:
# Simulate pipeline execution
print("\nSimulating Pipeline Execution:")
print("=" * 50)

hooks.emit("pre_read", {"event": "pre_read", "table": "bronze.raw_data"})
print("  → Loading data...")

hooks.emit("post_read", {"event": "post_read", "rows": 1000})
print("  → Data loaded successfully")

hooks.emit("pre_transform", {"event": "pre_transform", "function": "clean_data"})
print("  → Running transformation...")

hooks.emit("post_transform", {"event": "post_transform", "rows_out": 950})
print("  → Transformation complete")

## Part 2: Filtered Hooks

Use filters to target specific projects, layers, or engines.

In [None]:
# Create a new hook manager for this example
filtered_hooks = HookManager()

# Hook that only runs for bronze layer
def bronze_validation(payload):
    print(f"  🔍 Running bronze layer validation")
    df = payload.get('df')
    if df is not None:
        print(f"     Rows: {len(df)}")

filtered_hooks.register(
    "post_read",
    bronze_validation,
    filters={"layer": "bronze"}
)

# Hook that only runs for silver layer
def silver_quality_check(payload):
    print(f"  ✓ Running silver layer quality checks")

filtered_hooks.register(
    "post_transform",
    silver_quality_check,
    filters={"layer": "silver"}
)

# Hook that only runs for Spark engine
def spark_optimization(payload):
    print(f"  ⚡ Applying Spark-specific optimizations")

filtered_hooks.register(
    "pre_transform",
    spark_optimization,
    filters={"engine": "spark"}
)

print("✓ Registered filtered hooks")

In [None]:
import pandas as pd

# Test with different layer payloads
print("\nTest 1: Bronze Layer")
print("-" * 50)
sample_df = pd.DataFrame({'id': [1, 2, 3], 'value': [10, 20, 30]})
filtered_hooks.emit("post_read", {"layer": "bronze", "df": sample_df})

print("\nTest 2: Silver Layer")
print("-" * 50)
filtered_hooks.emit("post_transform", {"layer": "silver"})

print("\nTest 3: Gold Layer (no matching hooks)")
print("-" * 50)
filtered_hooks.emit("post_transform", {"layer": "gold"})
print("  (No hooks executed - filter mismatch)")

print("\nTest 4: Spark Engine")
print("-" * 50)
filtered_hooks.emit("pre_transform", {"engine": "spark", "layer": "silver"})

print("\nTest 5: Pandas Engine (no Spark hook)")
print("-" * 50)
filtered_hooks.emit("pre_transform", {"engine": "pandas", "layer": "silver"})
print("  (Spark optimization hook skipped)")

## Part 3: Data Validation Hooks

In [None]:
validation_hooks = HookManager()

# Schema validation hook
def validate_schema(payload):
    """Ensure required columns exist."""
    df = payload.get('df')
    required_columns = payload.get('required_columns', [])
    
    if df is None:
        return
    
    missing = [col for col in required_columns if col not in df.columns]
    
    if missing:
        raise ValueError(f"❌ Schema validation failed. Missing columns: {missing}")
    else:
        print(f"  ✓ Schema validation passed: {required_columns}")

validation_hooks.register("post_read", validate_schema)

# Null check hook
def check_nulls(payload):
    """Alert if null percentage exceeds threshold."""
    df = payload.get('df')
    threshold = payload.get('null_threshold', 0.1)
    
    if df is None:
        return
    
    total_cells = len(df) * len(df.columns)
    null_count = df.isnull().sum().sum()
    null_pct = null_count / total_cells if total_cells > 0 else 0
    
    if null_pct > threshold:
        print(f"  ⚠️  High null percentage: {null_pct:.2%} (threshold: {threshold:.2%})")
    else:
        print(f"  ✓ Null check passed: {null_pct:.2%}")

validation_hooks.register("post_read", check_nulls)

print("✓ Registered validation hooks")

In [None]:
# Test validation with good data
print("\nValidation Test 1: Good Data")
print("=" * 50)
good_df = pd.DataFrame({
    'id': [1, 2, 3, 4],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'score': [95, 87, 92, 88]
})

validation_hooks.emit("post_read", {
    "df": good_df,
    "required_columns": ["id", "name", "score"],
    "null_threshold": 0.1
})

# Test validation with missing column
print("\nValidation Test 2: Missing Column")
print("=" * 50)
try:
    validation_hooks.emit("post_read", {
        "df": good_df,
        "required_columns": ["id", "name", "score", "department"],
        "null_threshold": 0.1
    })
except ValueError as e:
    print(str(e))

# Test validation with high nulls
print("\nValidation Test 3: High Null Percentage")
print("=" * 50)
bad_df = pd.DataFrame({
    'id': [1, 2, 3, 4],
    'name': ['Alice', None, None, 'Diana'],
    'score': [95, None, None, 88]
})

validation_hooks.emit("post_read", {
    "df": bad_df,
    "required_columns": ["id", "name", "score"],
    "null_threshold": 0.1
})

## Part 4: Metrics and Monitoring

In [None]:
from collections import defaultdict
import time

# Metrics collector
class PipelineMetrics:
    def __init__(self):
        self.metrics = defaultdict(dict)
        self.timers = {}
    
    def start_timer(self, stage):
        self.timers[stage] = time.time()
    
    def stop_timer(self, stage):
        if stage in self.timers:
            duration = time.time() - self.timers[stage]
            self.metrics[stage]['duration_seconds'] = duration
            del self.timers[stage]
    
    def record(self, stage, key, value):
        self.metrics[stage][key] = value
    
    def report(self):
        print("\n📊 Pipeline Metrics Report")
        print("=" * 60)
        for stage, data in self.metrics.items():
            print(f"\n{stage}:")
            for key, value in data.items():
                if isinstance(value, float):
                    print(f"  {key}: {value:.4f}")
                else:
                    print(f"  {key}: {value}")

# Create metrics instance
metrics = PipelineMetrics()
metrics_hooks = HookManager()

# Hook to track read timing
def track_read_start(payload):
    metrics.start_timer("read")
    print("  ⏱️  Starting read timer")

def track_read_end(payload):
    metrics.stop_timer("read")
    df = payload.get('df')
    if df is not None:
        metrics.record("read", "rows_read", len(df))
        metrics.record("read", "columns", len(df.columns))
    print("  ⏱️  Read complete")

# Hook to track transform timing
def track_transform_start(payload):
    metrics.start_timer("transform")
    print("  ⏱️  Starting transform timer")

def track_transform_end(payload):
    metrics.stop_timer("transform")
    df_in = payload.get('df_in')
    df_out = payload.get('df_out')
    if df_in is not None:
        metrics.record("transform", "rows_in", len(df_in))
    if df_out is not None:
        metrics.record("transform", "rows_out", len(df_out))
        if df_in is not None:
            pct_change = ((len(df_out) - len(df_in)) / len(df_in)) * 100
            metrics.record("transform", "row_change_pct", pct_change)
    print("  ⏱️  Transform complete")

# Register hooks
metrics_hooks.register("pre_read", track_read_start)
metrics_hooks.register("post_read", track_read_end)
metrics_hooks.register("pre_transform", track_transform_start)
metrics_hooks.register("post_transform", track_transform_end)

print("✓ Registered metrics tracking hooks")

In [None]:
# Simulate a pipeline with metrics
print("\nSimulating Pipeline with Metrics:")
print("=" * 60)

# Read stage
metrics_hooks.emit("pre_read", {})
time.sleep(0.1)  # Simulate read time
raw_df = pd.DataFrame({
    'id': range(1, 101),
    'value': range(100, 200)
})
metrics_hooks.emit("post_read", {"df": raw_df})

# Transform stage
metrics_hooks.emit("pre_transform", {})
time.sleep(0.05)  # Simulate transform time
# Remove some rows
transformed_df = raw_df[raw_df['value'] > 150]
metrics_hooks.emit("post_transform", {"df_in": raw_df, "df_out": transformed_df})

# Display metrics
metrics.report()

## Part 5: Error Handling Hooks

In [None]:
error_hooks = HookManager()

# Error logging hook
def log_error(payload):
    error = payload.get('error')
    stage = payload.get('stage', 'unknown')
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    print(f"\n❌ ERROR LOG [{timestamp}]")
    print(f"   Stage: {stage}")
    print(f"   Error Type: {type(error).__name__}")
    print(f"   Message: {str(error)}")
    
    # In production, this could send to logging service, Slack, etc.

# Error notification hook
def notify_on_error(payload):
    project = payload.get('project', 'unknown')
    layer = payload.get('layer', 'unknown')
    
    print(f"\n📧 ALERT: Pipeline failure in {project}/{layer}")
    # In production: send email, Slack message, PagerDuty alert, etc.

error_hooks.register("on_error", log_error)
error_hooks.register("on_error", notify_on_error)

print("✓ Registered error handling hooks")

In [None]:
# Simulate an error
print("\nSimulating Pipeline Error:")
print("=" * 60)

try:
    # Simulate a transformation error
    raise ValueError("Invalid data format: expected numeric column 'price'")
except Exception as e:
    error_hooks.emit("on_error", {
        "error": e,
        "stage": "transformation",
        "project": "retail_analytics",
        "layer": "silver"
    })

## Part 6: Debugging - List All Hooks

In [None]:
# Create a comprehensive hook manager
debug_hooks = HookManager()

# Register various hooks
debug_hooks.register("pre_read", lambda p: None)
debug_hooks.register("pre_read", lambda p: None, filters={"layer": "bronze"})
debug_hooks.register("post_read", lambda p: None)
debug_hooks.register("pre_transform", lambda p: None, filters={"engine": "spark"})
debug_hooks.register("post_transform", lambda p: None)
debug_hooks.register("post_transform", lambda p: None, filters={"layer": "gold"})
debug_hooks.register("on_error", lambda p: None)

# List all hooks
all_hooks = debug_hooks.list_hooks()

print("Registered Hooks Summary:")
print("=" * 60)

for event, hooks_list in all_hooks.items():
    print(f"\n{event}: {len(hooks_list)} hook(s)")
    for idx, hook in enumerate(hooks_list, 1):
        filters = hook['filters']
        filter_str = f" (filters: {filters})" if filters else " (no filters)"
        print(f"  {idx}.{filter_str}")

## Part 7: Complete Example - Production-Ready Hooks

In [None]:
# Production-ready hook manager with all observability
production_hooks = HookManager()
production_metrics = PipelineMetrics()

# 1. Schema validation
def prod_schema_check(payload):
    df = payload.get('df')
    required = ['id', 'timestamp', 'value']
    if df is not None:
        missing = [c for c in required if c not in df.columns]
        if missing:
            raise ValueError(f"Missing columns: {missing}")

# 2. Data quality metrics
def prod_quality_metrics(payload):
    df = payload.get('df')
    if df is not None:
        null_pct = df.isnull().sum().sum() / (len(df) * len(df.columns))
        dup_pct = df.duplicated().sum() / len(df)
        production_metrics.record("quality", "null_pct", null_pct)
        production_metrics.record("quality", "duplicate_pct", dup_pct)

# 3. Performance tracking
def prod_perf_start(payload):
    stage = payload.get('stage', 'unknown')
    production_metrics.start_timer(stage)

def prod_perf_end(payload):
    stage = payload.get('stage', 'unknown')
    production_metrics.stop_timer(stage)

# 4. Audit logging
def prod_audit_log(payload):
    event = payload.get('event')
    user = payload.get('user', 'system')
    timestamp = datetime.now().isoformat()
    print(f"[AUDIT] {timestamp} | {user} | {event}")

# Register all production hooks
production_hooks.register("post_read", prod_schema_check)
production_hooks.register("post_read", prod_quality_metrics)
production_hooks.register("pre_read", prod_perf_start)
production_hooks.register("post_read", prod_perf_end)
production_hooks.register("pipeline_start", prod_audit_log)
production_hooks.register("pipeline_end", prod_audit_log)

print("✓ Registered production-ready hooks")
print(f"\nTotal hooks: {sum(len(v) for v in production_hooks.list_hooks().values())}")

In [None]:
# Run a production pipeline simulation
print("\nProduction Pipeline Execution:")
print("=" * 60)

production_hooks.emit("pipeline_start", {
    "event": "pipeline_start",
    "user": "data_engineer"
})

production_hooks.emit("pre_read", {"stage": "read"})
time.sleep(0.05)

prod_data = pd.DataFrame({
    'id': range(1, 51),
    'timestamp': pd.date_range('2024-01-01', periods=50),
    'value': range(100, 150)
})

production_hooks.emit("post_read", {
    "stage": "read",
    "df": prod_data
})

production_hooks.emit("pipeline_end", {
    "event": "pipeline_end",
    "user": "data_engineer"
})

production_metrics.report()

## Summary

**What You Learned:**

1. ✓ **Lifecycle Events**: pre_read, post_read, pre_transform, post_transform, pre_save, post_save, on_error
2. ✓ **Hook Registration**: `hooks.register(event, callback, filters={})`
3. ✓ **Filtered Execution**: Target specific layers, projects, or engines
4. ✓ **Data Validation**: Schema checks, null checks, quality gates
5. ✓ **Metrics & Monitoring**: Timing, row counts, performance tracking
6. ✓ **Error Handling**: Logging and alerting on failures
7. ✓ **Debugging**: List and inspect registered hooks

**Key Patterns:**

```python
# Create manager
hooks = HookManager()

# Register hook
def my_hook(payload):
    print(payload)

hooks.register("post_read", my_hook, filters={"layer": "bronze"})

# Emit event
hooks.emit("post_read", {"layer": "bronze", "df": data})
```

**Production Use Cases:**
- **Monitoring**: Track pipeline performance and data quality
- **Validation**: Enforce schema and business rules
- **Alerting**: Send notifications on errors or anomalies
- **Auditing**: Log all pipeline activities for compliance
- **Debugging**: Add temporary hooks to investigate issues

**Best Practices:**
- Keep hooks lightweight (avoid heavy computation)
- Use filters to minimize unnecessary executions
- Log errors within hooks - don't let them fail silently
- Use consistent payload structure across your pipelines

**Next Steps:**
- Tutorial 4: Complete Project Template