# 🔧 Databricks Utilities for transformWithState Learning

This notebook contains all the utility functions optimized for Databricks environment.
Import this notebook to use the functions in other notebooks.

## Functions Available:
- `create_databricks_spark()` - Optimized Spark session
- `create_flight_data()` - Flight data stream generator  
- `FlightProcessor` - StatefulProcessor implementation
- `run_databricks_demo()` - Complete demo runner


## 🔧 Spark Session Creation


In [None]:
from typing import Optional
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import max as spark_max, count, expr
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.streaming import StreamingQuery

def create_spark() -> SparkSession:
    """
    Create a Spark session optimized for Databricks transformWithState learning.
    
    Databricks provides full support for transformWithState including:
    - Advanced state store providers (RocksDB)
    - Reliable checkpointing with DBFS
    - Multi-column family state management
    - Production-grade streaming infrastructure
    
    Args:
        None
        
    Returns:
        SparkSession: A Databricks-optimized Spark session for transformWithState
    """
    print("🔧 Creating Databricks Spark session for transformWithState...")
    
    # Get or create Spark session (Databricks manages the cluster)
    spark = SparkSession.builder \
        .appName("LearnTransformWithState_Databricks") \
        .config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
        .config("spark.sql.streaming.stateStore.rocksdb.formatVersion", "5") \
        .config("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows", "true") \
        .config("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles", "1000") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.streaming.minBatchesToRetain", "2") \
        .config("spark.sql.streaming.stateStore.maintenanceInterval", "300s") \
        .getOrCreate()
    
    # Set log level for cleaner output
    spark.sparkContext.setLogLevel("WARN")
    
    print("✅ Databricks Spark ready with full transformWithState support!")
    print("   🗄️  RocksDB state store enabled")
    print("   📁 DBFS checkpointing available") 
    print("   🚀 Production-grade streaming infrastructure")
    return spark


## 📊 Flight Data Stream Generator


In [None]:
def create_flight_data(spark: SparkSession) -> DataFrame:
    """
    Create simple flight data stream.
    
    Args:
        spark: The Spark session to use for creating the stream
        
    Returns:
        DataFrame: A streaming DataFrame with flight state updates
    """
    print("📊 Creating flight data stream...")
    
    # Use rate source to generate data
    rate_stream = spark \
        .readStream \
        .format("rate") \
        .option("rowsPerSecond", 1) \
        .load()
    
    # Transform to simple flight data
    flight_stream = rate_stream.selectExpr(
        # Three flights cycling - realistic flight numbers
        """CASE (value % 3)
            WHEN 0 THEN 'Delta1247'
            WHEN 1 THEN 'United892'
            ELSE 'Southwest5031'
        END as flight""",
        
        # Simple state progression: boarding -> flying -> landed
        """CASE (value % 3)
            WHEN 0 THEN 'boarding'
            WHEN 1 THEN 'flying'
            ELSE 'landed'
        END as state""",
        
        "timestamp"
    )
    
    print("✅ Flight data ready!")
    return flight_stream


## ⚙️ FlightProcessor - StatefulProcessor Implementation


In [None]:
from typing import Iterator, Any, Dict, List
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class FlightProcessor(StatefulProcessor):
    """
    The simplest flight state processor for learning transformWithState on Databricks.
    
    Powered by Databricks' production-grade infrastructure:
    - RocksDB state store with multi-column family support
    - DBFS checkpointing for fault tolerance  
    - Auto-scaling clusters for performance
    
    Tracks flights through 3 states: boarding -> flying -> landed
    """
    
    def init(self, handle: StatefulProcessorHandle) -> None:
        """
        Set up the processor when it starts.
        
        Args:
            handle: Spark's handle for managing state and timers
            
        Returns:
            None
        """
        # Define what we store for each flight
        state_schema = StructType([
            StructField("flight", StringType(), True),      # Flight name like "Delta1247"
            StructField("state", StringType(), True),       # Current state
            StructField("count", IntegerType(), True)       # How many updates
        ])
        
        # Create state storage - RocksDB handles multiple column families!
        self.flight_state = handle.getValueState("flight_state", state_schema)
    
    def handleInputRows(self, key: str, rows: Iterator[pd.DataFrame], timerValues: Any) -> Iterator[pd.DataFrame]:
        """
        Process flight updates for one flight.
        
        Args:
            key: The flight identifier (e.g., "Delta1247")
            rows: Iterator of DataFrames containing flight state updates
            timerValues: Timer information (not used in this example)
            
        Returns:
            Iterator[pd.DataFrame]: Iterator containing result DataFrame with flight state info
        """
        
        # Get all updates for this flight
        all_rows = []
        for batch in rows:
            all_rows.append(batch)
        
        if not all_rows:
            return iter([])
        
        # Combine all updates
        updates = pd.concat(all_rows, ignore_index=True)
        
        # Get current state for this flight
        # Databricks RocksDB handles this efficiently with multi-column family support!
        if self.flight_state.exists():
            current = self.flight_state.get().iloc[0]
            current_state = current['state']
            current_count = current['count']
            # Optional: Add logging to see Databricks state management in action
            if current_count % 5 == 0:  # Log every 5th update
                print(f"🗄️  Databricks RocksDB: {key} state retrieved (count: {current_count})")
        else:
            current_state = "unknown"
            current_count = 0
            print(f"🆕 New flight {key} starting in Databricks state store")
        
        # Process each update
        for _, update in updates.iterrows():
            new_state = update['state']
            flight_name = update['flight']
            
            # Simple validation: only allow valid progressions
            if self._is_valid_transition(current_state, new_state):
                current_state = new_state
                current_count += 1
                # Optional: Show state transitions
                if current_count <= 3:  # Show first few transitions
                    print(f"✅ {flight_name}: {update['state']} -> {current_state} (update #{current_count})")
        
        # Save the new state to Databricks RocksDB
        new_state_data = pd.DataFrame({
            'flight': [flight_name],
            'state': [current_state],
            'count': [current_count]
        })
        self.flight_state.update(new_state_data)
        
        # Return the result
        result = pd.DataFrame({
            'flight': [flight_name],
            'current_state': [current_state],
            'update_count': [str(current_count)]
        })
        
        return iter([result])
    
    def _is_valid_transition(self, from_state: str, to_state: str) -> bool:
        """
        Check if state transition is allowed.
        
        Args:
            from_state: Current state of the flight
            to_state: Desired new state for the flight
            
        Returns:
            bool: True if transition is valid, False otherwise
        """
        # Simple rules: boarding -> flying -> landed
        valid_moves = {
            "unknown": ["boarding"],           # New flights start boarding
            "boarding": ["flying"],            # From boarding, can fly
            "flying": ["landed"],              # From flying, can land
            "landed": ["boarding"]             # From landed, can start new journey
        }
        
        allowed = valid_moves.get(from_state, [])
        return to_state in allowed
    
    def handleExpiredTimer(self, key: str, timerValues: Any, expiredTimerInfo: Any) -> Iterator[pd.DataFrame]:
        """
        Handle timers (not used in this simple example).
        
        Args:
            key: The flight identifier for which the timer expired
            timerValues: Timer values
            expiredTimerInfo: Information about the expired timer
            
        Returns:
            Iterator[pd.DataFrame]: Empty iterator since timers are not used
        """
        return iter([])
    
    def close(self) -> None:
        """
        Clean up when done.
        
        Args:
            None
            
        Returns:
            None
        """
        pass


## 🚀 Demo Runner Function


In [None]:
import uuid

def run_learning_demo(spark: SparkSession) -> None:
    """
    Run the transformWithState demo on Databricks for learning stateful processing.
    
    This uses the actual transformWithState API with full Databricks support:
    - RocksDB state store for reliable state management
    - DBFS checkpointing for fault tolerance
    - Production-grade streaming infrastructure
    
    Args:
        spark: The Databricks Spark session to use for the demo
        
    Returns:
        None
    """
    print("\n" + "🎓" + "="*50)
    print("LEARNING DEMO: transformWithState on Databricks")
    print("="*50)
    
    # Create flight data
    flight_data = create_flight_data(spark)
    
    # Define output schema
    output_schema = StructType([
        StructField("flight", StringType(), True),
        StructField("current_state", StringType(), True),
        StructField("update_count", StringType(), True)
    ])
    
    # Apply transformWithState - this is the key learning point!
    # Databricks fully supports this with RocksDB state store
    flight_states = flight_data \
        .groupBy("flight") \
        .transformWithStateInPandas(
            statefulProcessor=FlightProcessor(),
            outputStructType=output_schema,
            outputMode="Update",
            timeMode="ProcessingTime"
        )
    
    # Use DBFS for reliable checkpointing in Databricks
    checkpoint_dir = f"/tmp/learn_checkpoint_{uuid.uuid4().hex[:8]}"
    
    print(f"📁 Using checkpoint location: {checkpoint_dir}")
    
    # Start the stream with Databricks-optimized settings
    query = flight_states \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", False) \
        .option("numRows", 15) \
        .option("checkpointLocation", checkpoint_dir) \
        .trigger(processingTime='5 seconds') \
        .start()
    
    print("\n🚀 Demo running on Databricks! Watch transformWithState in action...")
    print("📝 Key things to notice:")
    print("   - 🗄️  RocksDB manages state reliably for each flight")  
    print("   - ✅ State transitions are validated in real-time")
    print("   - 📈 Update counts increase over time")
    print("   - 💾 State persists across batches with checkpointing")
    print("   - 🚀 This is production-grade transformWithState!")
    print(f"   - 📁 Checkpoint: {checkpoint_dir}")
    print("\n⏹️  Press Ctrl+C to stop when you've learned enough!")
    
    try:
        query.awaitTermination()
    except KeyboardInterrupt:
        print("\n🛑 Stopping demo...")
        query.stop()
        print("✅ Demo complete! You've mastered transformWithState on Databricks.")
        print(f"📁 Checkpoint preserved at: {checkpoint_dir}")
    finally:
        # Ensure clean shutdown
        if query.isActive:
            query.stop()
            print("🔄 Stream stopped gracefully")


## 📚 Concept Explanation Function


In [None]:
def explain_basics() -> None:
    """
    Explain the core concepts simply.
    
    Args:
        None
        
    Returns:
        None
    """
    print("\n" + "📚" + "="*60)
    print("TRANSFORM WITH STATE ON DATABRICKS")
    print("="*60)
    print("""
🎯 THE BIG IDEA:
   Keep information about each thing (like flights) between batches

🔑 KEY CONCEPTS:

1. GROUPING BY KEY
   .groupBy("flight")  ← Each flight gets separate processing

2. STATE STORAGE  
   Each flight remembers its current state (boarding/flying/landed)

3. BATCH PROCESSING
   Every few seconds, process new updates for each flight

4. STATE PERSISTENCE
   Flight state survives between batches - that's the magic!

🛫 OUR EXAMPLE:
   - Track flights: Delta1247, United892, Southwest5031
   - States: boarding → flying → landed
   - Each flight remembers where it is

🧠 MENTAL MODEL:
   Think of it like having a notebook for each flight.
   Every batch, you:
   1. Look up the flight's current page in the notebook
   2. Read what state it was in
   3. Update it based on new information  
   4. Write the new state back to the notebook
   5. The notebook persists for the next batch!

🏗️ DATABRICKS ADVANTAGES:
   - 🗄️  RocksDB state store (production-grade)
   - 📁 DBFS checkpointing (fault tolerance)
   - 🚀 Auto-scaling clusters (performance)
   - 💾 Multi-column family support (advanced features)
   - 🔧 Managed infrastructure (no setup headaches)

⚙️ THE API:
   - transformWithState gives you full control
   - StatefulProcessor handles the state logic
   - You decide what to store and how to update it
   - Databricks makes it production-ready!
""")
    print("="*60)
    print("🚀 READY TO SEE IT ON DATABRICKS!")
    print("="*60)


## ✅ All Functions Ready!

You can now use these functions in other notebooks:

```python
# Import this notebook
%run ./databricks_utils

# Create Spark session
spark = create_spark()

# Run the demo
run_learning_demo(spark)

# Or explain concepts first
explain_basics()
```

**Available Functions:**
- `create_spark()` - Databricks-optimized Spark session
- `create_flight_data(spark)` - Flight data stream generator
- `FlightProcessor` - StatefulProcessor class for transformWithState
- `run_learning_demo(spark)` - Complete interactive demo
- `explain_basics()` - Concept explanation

**Production Features:**
- 🗄️ RocksDB state store with multi-column family support
- 📁 DBFS checkpointing for fault tolerance
- 🚀 Auto-scaling clusters for performance
- 💾 Managed infrastructure (zero setup)
- 🔧 Production-grade streaming infrastructure
