# Node Execution Deep Dive

## Introduction
This lesson explores ODIBI's Node execution engine - the orchestrator that transforms declarative configs into actual data operations.

### What You'll Learn
1. Node class architecture and initialization
2. The 4-phase execution lifecycle
3. Engine and connection integration
4. Transform execution patterns
5. Validation and error handling
6. Metadata collection and caching

## Part 1: Node Architecture

### Node Initialization

In [None]:
from odibi.node import Node, NodeResult
from odibi.config import NodeConfig
from odibi.context import Context
from typing import Any, Dict

# Node requires 5 key components
class NodeComponents:
    """Understanding Node initialization."""
    
    @staticmethod
    def show_structure():
        print("""
Node Initialization Requirements:

1. config: NodeConfig
   - Declarative configuration (read/transform/validate/write)
   - Parsed and validated by Pydantic
   
2. context: Context
   - In-memory data registry for passing DataFrames
   - Enables node-to-node communication
   
3. engine: Any (Spark/Pandas adapter)
   - Abstraction over execution engine
   - Implements read/write/transform operations
   
4. connections: Dict[str, Any]
   - Registry of available data connections
   - Resolved from ConnectionConfig
   
5. config_file: Optional[str]
   - Path to YAML config (for error reporting)
   - Enriches error messages with file context
        """)

NodeComponents.show_structure()

In [None]:
# Example: Creating a Node instance
import pandas as pd
from odibi.engines.pandas_engine import PandasEngine

# Setup components
config = NodeConfig(
    name="example_node",
    read={"connection": "local", "format": "csv", "path": "data.csv"}
)

context = Context()
engine = PandasEngine()
connections = {"local": {"type": "local"}}

# Initialize Node
node = Node(
    config=config,
    context=context,
    engine=engine,
    connections=connections,
    config_file="pipeline.yaml"
)

print(f"Node created: {node.config.name}")
print(f"Execution steps tracked: {node._execution_steps}")

## Part 2: Execution Lifecycle

### The Main Execute Method

In [None]:
# Execution flow pseudocode
execution_flow = """
def execute() -> NodeResult:
    start_time = time.time()
    
    try:
        result_df = None
        
        # Phase 1: READ
        if config.read:
            result_df = _execute_read()
            track_step("Read from connection")
        
        # Phase 2: TRANSFORM
        if config.transform:
            result_df = _execute_transform(result_df)
            track_step("Applied N transform steps")
        
        # Phase 3: VALIDATE
        if config.validation and result_df:
            _execute_validation(result_df)
            track_step("Validation passed")
        
        # Phase 4: WRITE
        if config.write:
            if not result_df and config.depends_on:
                # Write-only node: get data from dependencies
                result_df = context.get(config.depends_on[0])
            
            _execute_write(result_df)
            track_step("Written to connection")
        
        # Register in context for downstream nodes
        if result_df:
            context.register(config.name, result_df)
            
            # Optional caching
            if config.cache:
                _cached_result = result_df
        
        # Collect metadata
        duration = time.time() - start_time
        metadata = _collect_metadata(result_df)
        
        return NodeResult(success=True, duration=duration, ...)
    
    except Exception as e:
        # Wrap error with rich context
        return NodeResult(success=False, error=enriched_error)
"""

print(execution_flow)

## Part 3: Read Phase

### Connection Resolution & Engine Integration

In [None]:
# Read phase implementation
def analyze_read_phase():
    """
    Read Phase Steps:
    1. Get connection from registry (with validation)
    2. Delegate to engine.read() with parameters
    3. Return DataFrame
    """
    
    code = '''
def _execute_read(self) -> Any:
    read_config = self.config.read
    
    # Step 1: Resolve connection
    connection = self.connections.get(read_config.connection)
    
    if connection is None:
        raise ValueError(
            f"Connection '{read_config.connection}' not found. "
            f"Available: {', '.join(self.connections.keys())}"
        )
    
    # Step 2: Delegate to engine (abstraction layer)
    df = self.engine.read(
        connection=connection,
        format=read_config.format,      # csv, parquet, delta, sql
        table=read_config.table,        # for database sources
        path=read_config.path,          # for file sources
        options=read_config.options     # engine-specific options
    )
    
    return df
    '''
    
    print(code)
    print("""
Key Points:
- Connection validation with helpful error messages
- Engine abstraction allows Spark/Pandas/Polars flexibility
- Format-agnostic (engine handles specifics)
- Options passed through for advanced configurations
    """)

analyze_read_phase()

## Part 4: Transform Phase

### Three Transform Types

In [None]:
# Transform phase architecture
print("""
Transform Phase: 3 Step Types

1. SQL Steps (String)
   - Raw SQL queries
   - Access context DataFrames as tables
   - Example: "SELECT * FROM customers WHERE age > 25"

2. Function Steps (TransformStep)
   - Python functions from FunctionRegistry
   - Parameters validated via Pydantic schemas
   - Can accept 'current' DataFrame or work with context

3. Operation Steps (TransformStep)
   - Built-in operations (pivot, unpivot, etc.)
   - Engine-specific implementations
   - Parameter-driven transformations
""")

# Transform execution loop
transform_code = '''
def _execute_transform(self, input_df: Optional[Any]) -> Any:
    current_df = input_df
    
    for step_idx, step in enumerate(self.config.transform.steps):
        try:
            # Track execution context for errors
            exec_context = ExecutionContext(
                node_name=self.config.name,
                step_index=step_idx,
                total_steps=len(self.config.transform.steps)
            )
            
            # Route to appropriate executor
            if isinstance(step, str):
                # SQL step
                current_df = self._execute_sql_step(step)
            
            elif step.function:
                # Function step
                current_df = self._execute_function_step(
                    step.function, step.params, current_df
                )
            
            elif step.operation:
                # Operation step
                current_df = self._execute_operation_step(
                    step.operation, step.params, current_df
                )
        
        except Exception as e:
            # Enrich error with schema/shape context
            exec_context.input_schema = self._get_schema(current_df)
            exec_context.input_shape = self._get_shape(current_df)
            raise NodeExecutionError(context=exec_context, ...)
    
    return current_df
'''

print(transform_code)

In [None]:
# SQL Transform Example
print("""
SQL Step Execution:

def _execute_sql_step(self, sql: str) -> Any:
    return self.engine.execute_sql(sql, self.context)

- Engine translates SQL to native operations (Spark SQL / Pandas SQL)
- Context provides named DataFrames as virtual tables
- Example: "SELECT * FROM {{customers}} JOIN {{orders}} ON ..."
""")

# Function Transform Example
print("""
Function Step Execution:

def _execute_function_step(self, function_name, params, current_df):
    # 1. Validate parameters against schema
    FunctionRegistry.validate_params(function_name, params)
    
    # 2. Get registered function
    func = FunctionRegistry.get(function_name)
    
    # 3. Inspect signature for 'current' parameter
    sig = inspect.signature(func)
    if "current" in sig.parameters:
        # Pass current DataFrame
        result = func(self.context, current=current_df, **params)
    else:
        # Context-only (reads from context directly)
        result = func(self.context, **params)
    
    return result

Key Innovation: Automatic 'current' injection for pipeline continuity
""")

## Part 5: Validation Phase

### Data Quality Enforcement

In [None]:
# Validation implementation
validation_code = '''
def _execute_validation(self, df: Any) -> None:
    validation_config = self.config.validation
    failures = []
    
    # Check 1: Not empty
    if validation_config.not_empty:
        if self._is_empty(df):
            failures.append("DataFrame is empty")
    
    # Check 2: No nulls in specified columns
    if validation_config.no_nulls:
        null_counts = self._count_nulls(df, validation_config.no_nulls)
        for col, count in null_counts.items():
            if count > 0:
                failures.append(f"Column '{col}' has {count} null values")
    
    # Check 3: Schema validation
    if validation_config.schema_validation:
        schema_failures = self._validate_schema(
            df, validation_config.schema_validation
        )
        failures.extend(schema_failures)
    
    # Fail fast if any validation fails
    if failures:
        raise ValidationError(self.config.name, failures)
'''

print(validation_code)
print("""
Validation Types:
1. not_empty: Ensures DataFrame has rows
2. no_nulls: Checks specified columns for null values
3. schema_validation: Type/existence checks (engine-delegated)

All failures collected before raising (comprehensive error reporting)
""")

## Part 6: Write Phase

### Format Handling & Modes

In [None]:
# Write phase implementation
write_code = '''
def _execute_write(self, df: Any) -> None:
    write_config = self.config.write
    
    # Resolve connection (with validation)
    connection = self.connections.get(write_config.connection)
    
    if connection is None:
        raise ValueError(
            f"Connection '{write_config.connection}' not found. "
            f"Available: {', '.join(self.connections.keys())}"
        )
    
    # Delegate to engine
    self.engine.write(
        df=df,
        connection=connection,
        format=write_config.format,    # csv, parquet, delta, sql
        table=write_config.table,      # for database targets
        path=write_config.path,        # for file targets
        mode=write_config.mode,        # append, overwrite, error, ignore
        options=write_config.options   # engine-specific options
    )
'''

print(write_code)
print("""
Write Modes:
- append: Add to existing data
- overwrite: Replace existing data
- error: Fail if target exists (default)
- ignore: Skip if target exists

Special Case: Write-only nodes
- No read/transform phases
- Get data from context via depends_on
- Example: Export transformed data to multiple formats
""")

## Part 7: NodeResult & Metadata

### Execution Tracking

In [None]:
# NodeResult model
from pydantic import BaseModel, Field
from typing import Optional, Dict, List, Any

class NodeResultExample(BaseModel):
    node_name: str
    success: bool
    duration: float
    rows_processed: Optional[int] = None
    result_schema: Optional[List[str]] = None
    error: Optional[Exception] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)

# Metadata collection
metadata_code = '''
def _collect_metadata(self, df: Optional[Any]) -> Dict[str, Any]:
    metadata = {
        "timestamp": datetime.now().isoformat(),
        "steps": self._execution_steps.copy(),  # Audit trail
    }
    
    if df is not None:
        metadata["rows"] = self._count_rows(df)
        metadata["schema"] = self._get_schema(df)  # Column names
    
    return metadata
'''

print(metadata_code)
print("""
NodeResult Benefits:
1. Performance tracking (duration)
2. Data lineage (rows_processed, schema)
3. Execution audit trail (metadata.steps)
4. Error context (error with rich details)
5. Downstream decision-making (success boolean)
""")

## Part 8: Error Handling & Recovery

### Rich Error Context

In [None]:
# Error enrichment
error_code = '''
except Exception as e:
    duration = time.time() - start_time
    
    # Wrap in NodeExecutionError with context
    if not isinstance(e, NodeExecutionError):
        exec_context = ExecutionContext(
            node_name=self.config.name,
            config_file=self.config_file,          # YAML file path
            previous_steps=self._execution_steps,  # What succeeded
        )
        
        error = NodeExecutionError(
            message=str(e),
            context=exec_context,
            original_error=e,
            suggestions=self._generate_suggestions(e)  # AI-like hints
        )
    else:
        error = e
    
    return NodeResult(
        node_name=self.config.name,
        success=False,
        duration=duration,
        error=error,
    )
'''

print(error_code)

# Suggestion generation
suggestions_code = '''
def _generate_suggestions(self, error: Exception) -> List[str]:
    suggestions = []
    error_str = str(error).lower()
    
    if "column" in error_str and "not found" in error_str:
        suggestions.append(
            "Check that previous nodes output the expected columns"
        )
        suggestions.append(
            f"Use 'odibi run-node {self.config.name} --show-schema' to debug"
        )
    
    if "keyerror" in error.__class__.__name__.lower():
        suggestions.append(
            "Verify that all referenced DataFrames are registered in context"
        )
        suggestions.append(
            "Check node dependencies in 'depends_on' list"
        )
    
    if "function" in error_str and "not" in error_str:
        suggestions.append(
            "Ensure the transform function is decorated with @transform"
        )
        suggestions.append(
            "Import the module containing the transform function"
        )
    
    return suggestions
'''

print(suggestions_code)
print("""
Error Handling Philosophy:
- Never swallow exceptions
- Always return NodeResult (even on failure)
- Enrich errors with context (file, node, step)
- Provide actionable suggestions
- Track partial success (previous_steps)
""")

## Summary

### Node Execution Mental Model

```
NodeConfig (declarative)
    ↓
Node.__init__(config, context, engine, connections)
    ↓
Node.execute()
    ↓
┌─────────────────┐
│  READ PHASE     │ → Load from source
│  result_df      │
└────────┬────────┘
         ↓
┌─────────────────┐
│ TRANSFORM PHASE │ → SQL/Function/Operation
│  result_df      │
└────────┬────────┘
         ↓
┌─────────────────┐
│ VALIDATE PHASE  │ → Quality checks
│  (assertions)   │
└────────┬────────┘
         ↓
┌─────────────────┐
│  WRITE PHASE    │ → Persist to target
│  (side effect)  │
└────────┬────────┘
         ↓
context.register(node_name, result_df)
    ↓
NodeResult(success, duration, metadata, error)
```

### Key Takeaways
1. **Node = Config Executor**: Bridges declarative YAML and imperative operations
2. **4 Phases**: Read → Transform → Validate → Write (modular, composable)
3. **Engine Abstraction**: Same config works with Spark/Pandas/Polars
4. **Context Integration**: Nodes communicate via shared registry
5. **Error Enrichment**: Failures include context, suggestions, audit trail
6. **Metadata Tracking**: Performance, lineage, schema evolution

### Next Steps
- Complete exercises.ipynb for hands-on practice
- Review node_lifecycle.md for visual flow diagrams
- Explore Module 07: Orchestrator (how Nodes are coordinated)