# Fabric Spark Job & Notebook with LakeTrace Logging

This notebook demonstrates how to use **LakeTrace** logging in Microsoft Fabric:
- Structured JSON logging with automatic rotation
- Safe for both Notebooks and Spark Job Definitions
- Automatic runtime detection (Fabric vs local)
- End-of-run lakehouse upload capability
- Thread-safe, Spark-driver-safe logging

## Section 1: Import Required Libraries

In [None]:
import sys
from datetime import datetime
import uuid

# Import PySpark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, concat_ws, lit, current_timestamp

# Import LakeTrace
try:
    from laketrace import get_logger
    from laketrace.runtime import detect_runtime
except ImportError:
    # Fallback for local testing
    print("Warning: laketrace not installed. Install with: pip install laketrace-logger")
    sys.exit(1)

print("✓ All libraries imported successfully")

## Section 2: Initialize LakeTrace Logger

In [None]:
# Generate unique run ID for this execution
run_id = str(uuid.uuid4())[:8]

# Detect runtime environment (Fabric, Databricks, or local)
runtime_info = detect_runtime()

# Configure logger for Fabric/Spark environment
logger_config = {
    "log_dir": "/tmp/laketrace_logs",  # Local temp dir (safe for Spark)
    "rotation_mb": 10,                  # Rotate every 10 MB
    "retention_files": 5,               # Keep 5 rotated files
    "level": "INFO",
    "json": True,                       # Structured JSON output
    "stdout": True,                     # Also emit to stdout
    "compression": "gz",                # Compress rotated logs
    "add_runtime_context": True,        # Include runtime metadata
}

# Initialize logger with config
logger = get_logger(
    name="fabric_pipeline",
    config=logger_config
)

# Bind context fields that will be included in all logs
logger = logger.bind(
    run_id=run_id,
    stage="initialization",
    notebook="fabric_spark_example",
    runtime=runtime_info.get("type", "unknown")
)

logger.info("Logger initialized", 
    runtime_type=runtime_info.get("type"),
    platform=runtime_info.get("platform")
)
print(f"✓ LakeTrace logger ready (run_id={run_id})")

## Section 3: Create Spark Session for Fabric

In Fabric notebooks, SparkSession is pre-initialized. For Spark Job definitions, we get the active session.

In [None]:
# Get or create Spark session
try:
    spark = SparkSession.getActiveSession()
    if spark is None:
        spark = SparkSession.builder \
            .appName("fabric-pipeline") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.parquet.columnarReadEnabled", "true") \
            .getOrCreate()
        logger.info("Created new Spark session")
    else:
        logger.info("Using active Spark session")
except Exception as e:
    logger.error(f"Failed to initialize Spark: {str(e)}", exc_info=True)
    raise

# Log Spark configuration
logger.info("Spark session ready",
    app_name=spark.sparkContext.appName,
    master=spark.sparkContext.master,
    version=spark.__version__
)
print(f"✓ Spark session initialized")

## Section 4: Load Sample Data into Spark DataFrame

Create sample data to simulate a typical data pipeline scenario.

In [None]:
# Update logger stage
logger = logger.bind(stage="data_loading")

try:
    # Create sample data
    data = [
        {"id": 1, "name": "Alice", "department": "Sales", "salary": 50000},
        {"id": 2, "name": "Bob", "department": "Engineering", "salary": 75000},
        {"id": 3, "name": "Charlie", "department": "Sales", "salary": 55000},
        {"id": 4, "name": "Diana", "department": "Engineering", "salary": 80000},
        {"id": 5, "name": "Eve", "department": "HR", "salary": 60000},
    ]
    
    df = spark.createDataFrame(data)
    
    logger.info("Sample data loaded",
        record_count=len(data),
        columns=df.columns,
        schema=str(df.schema)
    )
    
    print(f"✓ Loaded {len(data)} records")
    df.show(5)
    
except Exception as e:
    logger.error(f"Failed to load data: {str(e)}", exc_info=True)
    raise

## Section 5: Perform Spark Transformations with Logging

Apply transformations and log key metrics at each step.

In [None]:
# Update logger stage
logger = logger.bind(stage="transformation")

try:
    # Transformation 1: Filter high earners
    logger.debug("Starting transformation: filter salary > 60000")
    high_earners = df.filter(col("salary") > 60000)
    high_earners_count = high_earners.count()
    
    logger.info("Filtered high earners",
        filter_condition="salary > 60000",
        result_count=high_earners_count
    )
    
    # Transformation 2: Add salary tier column
    logger.debug("Starting transformation: add salary tier")
    df_with_tier = high_earners.withColumn(
        "salary_tier",
        col("salary").cast("int")
    ).withColumn(
        "processed_at",
        current_timestamp()
    )
    
    logger.info("Added computed columns",
        columns_added=["salary_tier", "processed_at"]
    )
    
    # Transformation 3: Group and aggregate
    logger.debug("Starting transformation: department aggregation")
    dept_stats = df_with_tier.groupBy("department").agg({
        "salary": "avg",
        "id": "count"
    }).withColumnRenamed("avg(salary)", "avg_salary") \
     .withColumnRenamed("count(id)", "emp_count")
    
    dept_count = dept_stats.count()
    logger.info("Department aggregation complete",
        departments=dept_count,
        agg_type="avg_salary,emp_count"
    )
    
    print("✓ Transformations complete")
    dept_stats.show()
    
except Exception as e:
    logger.error(f"Transformation failed: {str(e)}", 
        stage="transformation",
        exc_info=True
    )
    raise

## Section 6: Write Results to Fabric Lakehouse

Save transformed data in Delta format (native to Fabric Lakehouse).

In [None]:
# Update logger stage
logger = logger.bind(stage="data_write")

try:
    # Define output path (for demonstration, using local path)
    # In real Fabric scenario, use: "abfss://workspace@fabric.dfs.core.windows.net/..."
    output_path = "/tmp/laketrace_output/dept_stats"
    
    logger.info("Starting data write",
        output_path=output_path,
        format="delta",
        mode="overwrite"
    )
    
    # Write with OVERWRITE mode (safe for Spark)
    dept_stats.coalesce(1).write \
        .format("delta") \
        .mode("overwrite") \
        .save(output_path)
    
    logger.info("Data write complete",
        path=output_path,
        rows=dept_stats.count(),
        format="delta",
        duration_note="check logs for actual duration"
    )
    
    print(f"✓ Results written to {output_path}")
    
except Exception as e:
    logger.error(f"Write failed: {str(e)}",
        stage="data_write",
        output_path=output_path,
        exc_info=True
    )
    raise

## Section 7: Submit as Spark Job (Job Definition Code)

In [None]:
# For Spark Job Definition in Fabric:
# 1. Copy the code above into a .py file
# 2. Configure job parameters via Fabric UI or API:

job_config = {
    "name": "fabric-data-pipeline",
    "description": "Data pipeline with LakeTrace logging",
    "compute": "fabric_spark_compute",  # Your compute cluster
    "entry_point": "pipeline.py",
    "parameters": {
        "log_level": "INFO",
        "environment": "production",
        "run_id": run_id
    },
    "timeout_minutes": 60,
    "retries": 1  # Limit retries for stability
}

logger.info("Job configuration for Spark Job Definition",
    job_name=job_config["name"],
    entry_point=job_config["entry_point"],
    parameters=job_config["parameters"]
)

print("✓ Job can be submitted via Fabric UI or SDK")
print(f"  Reference run_id: {run_id}")

## Section 8: Monitor Job Execution & Upload Logs

Final step: capture logs and optionally upload to Fabric Lakehouse.

In [None]:
# Update logger stage
logger = logger.bind(stage="completion")

try:
    # Display final log summary
    logger.info("Pipeline execution complete",
        run_id=run_id,
        total_stages=5,
        status="success"
    )
    
    # View recent logs
    print("\n=== Recent Log Entries ===")
    logger.tail(n=20)  # Show last 20 lines of log file
    
except Exception as e:
    logger.error(f"Completion step failed: {str(e)}", exc_info=True)

# Optional: Upload logs to Fabric Lakehouse (end-of-run only)
try:
    # This method only works in Fabric/Databricks with proper auth
    # Specify abfss path: "abfss://workspace@fabric.dfs.core.windows.net/logs/"
    # For local testing, skip or use local path
    
    lakehouse_path = None  # Set to real path if needed
    if lakehouse_path:
        logger.info("Uploading logs to Fabric Lakehouse",
            target_path=lakehouse_path
        )
        # logger.upload_log_to_lakehouse(lakehouse_path)
        logger.info("Log upload complete")
    else:
        logger.debug("Skipping log upload (no lakehouse path configured)")
        
except Exception as e:
    logger.warning(f"Could not upload logs: {str(e)}")
    print("Note: Log upload requires Fabric/Databricks environment with auth")

print("\n✓ Pipeline execution finished")
print(f"  Run ID: {run_id}")
print(f"  Check /tmp/laketrace_logs/ for detailed logs")

## Key Features Demonstrated

### 1. **Fabric-Safe Logging**
- No `notebookutils.fs.append` (per-line writes)
- No `dbutils.fs.put` in log write path
- Local rotation handles everything safely

### 2. **Structured Output**
- JSON format with timestamp, level, logger name, hostname, PID
- Runtime context (Fabric vs Databricks detection)
- Bound fields (run_id, stage, notebook name)

### 3. **Multiple Sinks**
- Local rotating file (auto-compresses on rotation)
- Stdout (visible in Fabric job output)
- Optional lakehouse upload at end of run

### 4. **Spark-Safe Design**
- Driver-only logging (executors use print)
- No distributed I/O
- Thread-safe with Loguru engine

### 5. **Production Ready**
- Configurable rotation size & retention
- Bounded retries (max 2 for upload)
- Graceful error handling (non-fatal failures)

## Next Steps

### For Fabric Notebooks:
1. Save this notebook to Fabric workspace
2. Attach to a Spark cluster
3. Run cells sequentially
4. Monitor logs in stdout + local file

### For Fabric Spark Job Definitions:
1. Extract the code logic into `pipeline.py`
2. Create a Spark Job Definition in Fabric UI
3. Configure job parameters (compute, timeout)
4. Schedule or submit manually
5. Logs appear in job execution logs + local dir

### For Databricks:
Same code works identically due to runtime detection. Logs go to driver node `/tmp/laketrace_logs/`.

### Configuration Options:
- Adjust `rotation_mb`, `retention_files`, `level` in `logger_config`
- Set `compression` to "gz", "zip", or "none"
- Bind additional context fields with `logger.bind(**fields)`