# SparkForge v1.2.0: Enhanced Logging & Parallel Execution

This notebook demonstrates the new features in SparkForge v1.2.0:
- 📊 Unified logging format with rich metrics
- ⚡ Parallel execution with real-time visibility
- 📈 Enhanced pipeline reporting
- 💾 LogWriter for execution tracking


## Setup

First, let's import the required libraries and create a Spark session.


In [None]:
from mock_spark import MockSparkSession

from sparkforge import PipelineBuilder

# Create Spark session
spark = MockSparkSession.builder.appName("SparkForge_v1.2.0_Demo").getOrCreate()

print("✅ Spark session created!")
print("📦 SparkForge version: 1.2.0")


## 1. Enhanced Logging Format

### ✨ What's New in v1.2.0

The logging system now provides rich, consistent output:
- **Timestamps** on every message
- **Visual indicators**: 🚀 Starting, ✅ Completed, ❌ Failed
- **Detailed metrics**: rows processed, rows written, validation rates, invalid counts
- **Smart formatting**: Bronze shows "processed", Silver/Gold show "written"

Run the cells below to see it in action!


In [None]:
# Sample events data
events_data = [(1, 100, "click"), (2, 101, "purchase"), (3, 102, "view")]
profiles_data = [(100, "Alice"), (101, "Bob"), (102, "Charlie")]

from mock_spark import IntegerType, MockStructField, MockStructType, StringType

events_schema = MockStructType([
    MockStructField("event_id", IntegerType(), True),
    MockStructField("user_id", IntegerType(), True),
    MockStructField("action", StringType(), True),
])

profiles_schema = MockStructType([
    MockStructField("user_id", IntegerType(), True),
    MockStructField("name", StringType(), True),
])

events_df = spark.createDataFrame(events_data, events_schema)
profiles_df = spark.createDataFrame(profiles_data, profiles_schema)

print(f"📊 Created events data: {len(events_data)} rows")
print(f"📊 Created profiles data: {len(profiles_data)} rows")


## 2. Build Pipeline with Parallel Execution

Notice how we define multiple independent steps. The execution engine will automatically:
- Group independent steps together
- Execute groups in parallel
- Respect dependencies between layers


In [None]:
builder = PipelineBuilder(spark=spark, schema="analytics")

# Bronze layer - These will run in PARALLEL (Group 1)
builder.with_bronze_rules(
    name="bronze_events",
    rules={"event_id": ["not_null"], "user_id": ["not_null"]}
)

builder.with_bronze_rules(
    name="bronze_profiles",
    rules={"user_id": ["not_null"]}
)

# Silver layer - These will run in PARALLEL (Group 2)
builder.add_silver_transform(
    name="silver_purchases",
    source_bronze="bronze_events",
    transform=lambda spark, df, silvers: df.filter(df["action"] == "purchase"),
    rules={"user_id": ["not_null"]},
    table_name="silver_purchases"
)

builder.add_silver_transform(
    name="silver_customers",
    source_bronze="bronze_profiles",
    transform=lambda spark, df, silvers: df,
    rules={"user_id": ["not_null"]},
    table_name="silver_customers"
)

print("✅ Pipeline built with parallel execution enabled")
print("   📦 Group 1: 2 bronze steps (parallel)")
print("   📦 Group 2: 2 silver steps (parallel)")


## 3. Execute & Watch the Logs!

### ✨ Watch for These New Features:
- 🚀 **Step start indicators** with emojis
- ✅ **Completion messages** with duration, rows processed, and validation rates
- 📦 **Execution group** logging showing parallel execution
- ⚡ **Interleaved logs** from concurrent steps (when running in parallel)


In [None]:
# Execute the pipeline - watch the enhanced logging!
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(
    bronze_sources={
        "bronze_events": events_df,
        "bronze_profiles": profiles_df
    }
)

print("\n" + "="*60)
print("📊 PIPELINE EXECUTION COMPLETE")
print("="*60)


## 4. Analyze Enhanced Results

### NEW in v1.2.0: Detailed Step Results by Layer


In [None]:
print(f"Status: {result.status}")
print(f"Duration: {result.duration_seconds:.2f}s")

print("\n📦 Parallel Execution Metrics:")
print(f"   Execution Groups: {result.execution_groups_count}")
print(f"   Max Parallelism: {result.max_group_size} concurrent steps")
print(f"   Parallel Efficiency: {result.metrics.parallel_efficiency:.1f}%")

print("\n📊 Overall Metrics:")
print(f"   Total Steps: {result.metrics.total_steps}")
print(f"   Successful: {result.metrics.successful_steps}")
print(f"   Rows Processed: {result.metrics.total_rows_processed:,}")
print(f"   Rows Written: {result.metrics.total_rows_written:,}")

print("\n⏱️ Duration by Layer:")
print(f"   Bronze: {result.metrics.bronze_duration:.2f}s")
print(f"   Silver: {result.metrics.silver_duration:.2f}s")


## 5. Explore Step-Level Results

### NEW: bronze_results, silver_results, gold_results dictionaries


In [None]:
# Bronze results
print("🥉 Bronze Layer Results:")
for step_name, step_info in result.bronze_results.items():
    print(f"\n   {step_name}:")
    print(f"      Status: {step_info['status']}")
    print(f"      Duration: {step_info['duration']:.2f}s")
    print(f"      Rows Processed: {step_info['rows_processed']:,}")

# Silver results
print("\n🥈 Silver Layer Results:")
for step_name, step_info in result.silver_results.items():
    print(f"\n   {step_name}:")
    print(f"      Status: {step_info['status']}")
    print(f"      Duration: {step_info['duration']:.2f}s")
    print(f"      Rows Processed: {step_info['rows_processed']:,}")
    print(f"      Output Table: {step_info['output_table']}")


## Summary: What's New in v1.2.0

### 📊 **Enhanced Logging**
- ✅ Unified format with timestamps
- ✅ Rich metrics (rows processed/written, validation rates)
- ✅ Visual indicators (🚀 ✅ ❌)
- ✅ Smart formatting per layer

### ⚡ **Parallel Execution**
- ✅ Automatic dependency analysis
- ✅ Concurrent step execution (3-5x faster!)
- ✅ Real-time visibility in logs
- ✅ Performance metrics

### 📈 **Enhanced Reporting**
- ✅ Detailed step results by layer (bronze_results, silver_results, gold_results)
- ✅ Parallel efficiency metrics
- ✅ Duration breakdowns
- ✅ Comprehensive statistics

### 🎯 **Quality**
- ✅ 1,441 tests passing (100% pass rate)
- ✅ 100% type safety (mypy compliant)
- ✅ Zero security vulnerabilities
- ✅ Fully backward compatible

## Next Steps

### Try These:
- **Demo Scripts**: Run `python scripts/demo_logging.py` to see logging examples
- **Parallel Demo**: Run `python scripts/demo_parallel_timing.py` for timing comparison
- **Other Notebooks**: Check out `01_hello_world.ipynb` for basics

### Learn More:
- [Changelog](../docs/markdown/CHANGELOG.md) - Full v1.2.0 release notes
- [User Guide](../docs/user_guide.rst) - Complete feature walkthrough
- [API Reference](../docs/api_reference.rst) - Detailed API documentation
