# Bronze Layer: Slot Machine Telemetry Ingestion

**Notebook:** `01_bronze_slot_telemetry`  
**Layer:** Bronze (Raw)  
**Purpose:** Ingest raw slot machine telemetry data into the Bronze Lakehouse

---

## Overview

This notebook demonstrates the Bronze layer ingestion pattern for slot machine telemetry data. The Bronze layer stores raw data with minimal transformation, preserving the original format for auditability.

### Key Principles
- **Append-only**: Never update or delete records
- **Schema-on-read**: Accept data as-is
- **Full history**: Maintain complete audit trail
- **Metadata enrichment**: Add ingestion timestamp and source tracking

## Configuration

In [None]:
# Configuration Parameters
# These can be overridden by pipeline parameters

# Lakehouse configuration
BRONZE_LAKEHOUSE = "lh_bronze"
TABLE_NAME = "bronze_slot_telemetry"

# Source configuration
SOURCE_PATH = "Files/raw/slot_telemetry/"
SOURCE_FORMAT = "json"  # Can be: json, csv, parquet

# Processing configuration
BATCH_SIZE = 100000
ENABLE_SCHEMA_EVOLUTION = True

## Import Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, current_timestamp, input_file_name,
    year, month, dayofmonth, hour,
    sha2, concat_ws, to_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    DoubleType, TimestampType, BooleanType
)
from delta.tables import DeltaTable
from datetime import datetime
import json

# Initialize Spark session (already available in Fabric)
spark = SparkSession.builder.getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Processing started: {datetime.now()}")

## Define Schema

While Bronze typically uses schema-on-read, defining an expected schema helps with validation and documentation.

In [None]:
# Expected schema for slot telemetry data
SLOT_TELEMETRY_SCHEMA = StructType([
    StructField("event_id", StringType(), False),
    StructField("machine_id", StringType(), False),
    StructField("casino_id", StringType(), True),
    StructField("floor_location", StringType(), True),
    StructField("event_timestamp", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("denomination", DoubleType(), True),
    StructField("bet_amount", DoubleType(), True),
    StructField("win_amount", DoubleType(), True),
    StructField("jackpot_contribution", DoubleType(), True),
    StructField("credits_in", IntegerType(), True),
    StructField("credits_out", IntegerType(), True),
    StructField("credits_wagered", IntegerType(), True),
    StructField("credits_won", IntegerType(), True),
    StructField("games_played", IntegerType(), True),
    StructField("player_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("is_bonus_round", BooleanType(), True),
    StructField("rng_seed", StringType(), True),
    StructField("game_outcome", StringType(), True),
    StructField("paytable_version", StringType(), True),
    StructField("firmware_version", StringType(), True),
    StructField("error_code", StringType(), True),
    StructField("door_status", StringType(), True),
    StructField("meter_readings", StringType(), True)  # JSON string for nested data
])

print("Schema defined with", len(SLOT_TELEMETRY_SCHEMA.fields), "fields")

## Read Source Data

In [None]:
def read_source_data(source_path: str, source_format: str, schema=None):
    """
    Read data from source location with format detection.
    
    Args:
        source_path: Path to source files
        source_format: File format (json, csv, parquet)
        schema: Optional schema to apply
    
    Returns:
        DataFrame with source data
    """
    reader = spark.read
    
    if schema:
        reader = reader.schema(schema)
    
    if source_format == "json":
        df = reader.option("multiLine", True).json(source_path)
    elif source_format == "csv":
        df = reader.option("header", True).option("inferSchema", True).csv(source_path)
    elif source_format == "parquet":
        df = reader.parquet(source_path)
    else:
        raise ValueError(f"Unsupported format: {source_format}")
    
    return df

# Read the source data
try:
    df_raw = read_source_data(
        source_path=f"abfss://{BRONZE_LAKEHOUSE}@onelake.dfs.fabric.microsoft.com/{SOURCE_PATH}",
        source_format=SOURCE_FORMAT,
        schema=SLOT_TELEMETRY_SCHEMA
    )
    print(f"Read {df_raw.count()} records from source")
    df_raw.printSchema()
except Exception as e:
    print(f"Error reading source data: {e}")
    # For demo purposes, generate sample data
    print("Generating sample data for demonstration...")
    df_raw = None

## Generate Sample Data (Demo Only)

If source data is not available, generate sample data for demonstration.

In [None]:
import random
from datetime import datetime, timedelta
import uuid

def generate_sample_slot_data(num_records: int = 10000):
    """
    Generate sample slot telemetry data for demonstration.
    """
    event_types = ["SPIN", "WIN", "JACKPOT", "BONUS_START", "BONUS_END", 
                   "CASH_IN", "CASH_OUT", "CARD_IN", "CARD_OUT", "DOOR_OPEN", "DOOR_CLOSE"]
    denominations = [0.01, 0.05, 0.25, 1.00, 5.00]
    casino_ids = ["CAS001", "CAS002", "CAS003"]
    floor_locations = ["A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "VIP1", "VIP2"]
    
    data = []
    base_time = datetime.now() - timedelta(days=7)
    
    for i in range(num_records):
        event_type = random.choice(event_types)
        denomination = random.choice(denominations)
        
        # Calculate realistic values based on event type
        if event_type == "SPIN":
            bet_amount = denomination * random.randint(1, 5) * random.randint(1, 20)
            win_amount = bet_amount * random.choice([0, 0, 0, 0, 0.5, 1, 2, 5, 10, 50]) if random.random() > 0.6 else 0
        elif event_type == "JACKPOT":
            bet_amount = denomination * random.randint(1, 5) * random.randint(1, 20)
            win_amount = random.uniform(1000, 50000)
        else:
            bet_amount = 0
            win_amount = 0
        
        record = {
            "event_id": str(uuid.uuid4()),
            "machine_id": f"SLT{random.randint(1000, 9999)}",
            "casino_id": random.choice(casino_ids),
            "floor_location": random.choice(floor_locations),
            "event_timestamp": (base_time + timedelta(seconds=random.randint(0, 604800))).isoformat(),
            "event_type": event_type,
            "denomination": denomination,
            "bet_amount": round(bet_amount, 2),
            "win_amount": round(win_amount, 2),
            "jackpot_contribution": round(bet_amount * 0.01, 2) if event_type == "SPIN" else 0,
            "credits_in": random.randint(0, 10000) if event_type == "CASH_IN" else 0,
            "credits_out": random.randint(0, 10000) if event_type == "CASH_OUT" else 0,
            "credits_wagered": int(bet_amount / denomination) if denomination > 0 else 0,
            "credits_won": int(win_amount / denomination) if denomination > 0 else 0,
            "games_played": random.randint(1, 500),
            "player_id": f"PLY{random.randint(10000000, 99999999)}" if random.random() > 0.3 else None,
            "session_id": str(uuid.uuid4()),
            "is_bonus_round": event_type in ["BONUS_START", "BONUS_END"],
            "rng_seed": str(uuid.uuid4())[:8],
            "game_outcome": "WIN" if win_amount > 0 else "LOSS" if event_type == "SPIN" else None,
            "paytable_version": f"PT{random.randint(1, 5)}.{random.randint(0, 9)}",
            "firmware_version": f"FW{random.randint(1, 3)}.{random.randint(0, 9)}.{random.randint(0, 99)}",
            "error_code": f"E{random.randint(100, 999)}" if random.random() < 0.01 else None,
            "door_status": "OPEN" if event_type in ["DOOR_OPEN"] else "CLOSED",
            "meter_readings": json.dumps({
                "coin_in": random.randint(100000, 9999999),
                "coin_out": random.randint(80000, 8999999),
                "jackpot": random.randint(0, 100000),
                "games": random.randint(10000, 999999)
            })
        }
        data.append(record)
    
    return spark.createDataFrame(data, schema=SLOT_TELEMETRY_SCHEMA)

# Generate sample data if source data not available
if df_raw is None:
    df_raw = generate_sample_slot_data(50000)
    print(f"Generated {df_raw.count()} sample records")

## Add Bronze Metadata

Enrich raw data with ingestion metadata for lineage and auditing.

In [None]:
def add_bronze_metadata(df):
    """
    Add standard Bronze layer metadata columns.
    
    Columns added:
    - _ingestion_timestamp: When the record was ingested
    - _source_file: Source file name (if available)
    - _batch_id: Unique identifier for this batch
    - _record_hash: SHA-256 hash of key fields for deduplication
    - _year, _month, _day: Partition columns based on event timestamp
    """
    batch_id = datetime.now().strftime("%Y%m%d%H%M%S")
    
    df_enriched = df \
        .withColumn("_ingestion_timestamp", current_timestamp()) \
        .withColumn("_source_file", lit(SOURCE_PATH)) \
        .withColumn("_batch_id", lit(batch_id)) \
        .withColumn("_record_hash", sha2(concat_ws("|", 
            col("event_id"), 
            col("machine_id"), 
            col("event_timestamp")
        ), 256)) \
        .withColumn("_event_ts", to_timestamp(col("event_timestamp"))) \
        .withColumn("_year", year(col("_event_ts"))) \
        .withColumn("_month", month(col("_event_ts"))) \
        .withColumn("_day", dayofmonth(col("_event_ts")))
    
    return df_enriched

# Add metadata
df_bronze = add_bronze_metadata(df_raw)
print("Metadata columns added")
df_bronze.printSchema()

## Data Quality Checks (Basic)

Perform minimal quality checks at Bronze layer - mainly ensuring data was read correctly.

In [None]:
def bronze_quality_checks(df):
    """
    Perform basic quality checks for Bronze layer.
    
    Returns:
        dict with quality metrics
    """
    total_records = df.count()
    
    # Check for required fields
    null_event_id = df.filter(col("event_id").isNull()).count()
    null_machine_id = df.filter(col("machine_id").isNull()).count()
    null_timestamp = df.filter(col("event_timestamp").isNull()).count()
    
    # Check for duplicates based on hash
    distinct_records = df.select("_record_hash").distinct().count()
    duplicate_count = total_records - distinct_records
    
    # Calculate metrics
    metrics = {
        "total_records": total_records,
        "null_event_id": null_event_id,
        "null_machine_id": null_machine_id,
        "null_timestamp": null_timestamp,
        "duplicate_records": duplicate_count,
        "distinct_records": distinct_records,
        "quality_score": round((1 - (null_event_id + null_machine_id + null_timestamp) / total_records) * 100, 2)
    }
    
    return metrics

# Run quality checks
quality_metrics = bronze_quality_checks(df_bronze)

print("\n" + "="*50)
print("BRONZE QUALITY REPORT")
print("="*50)
for key, value in quality_metrics.items():
    print(f"{key}: {value}")
print("="*50)

## Write to Bronze Lakehouse

In [None]:
def write_to_bronze(df, table_name: str, partition_cols: list = None):
    """
    Write DataFrame to Bronze Lakehouse as Delta table.
    
    Args:
        df: DataFrame to write
        table_name: Target table name
        partition_cols: Optional list of partition columns
    """
    if partition_cols is None:
        partition_cols = ["_year", "_month", "_day"]
    
    # Write with append mode (Bronze is append-only)
    writer = df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", str(ENABLE_SCHEMA_EVOLUTION).lower())
    
    if partition_cols:
        writer = writer.partitionBy(*partition_cols)
    
    writer.saveAsTable(table_name)
    
    print(f"Successfully wrote {df.count()} records to {table_name}")

# Write to Bronze
write_to_bronze(df_bronze, TABLE_NAME, ["_year", "_month", "_day"])

## Verify Write

In [None]:
# Verify the write by reading back
df_verify = spark.table(TABLE_NAME)

print(f"\nTable: {TABLE_NAME}")
print(f"Total records: {df_verify.count()}")
print(f"\nPartitions:")
df_verify.select("_year", "_month", "_day").distinct().show()

print(f"\nSample records:")
df_verify.select(
    "event_id", "machine_id", "event_type", 
    "bet_amount", "win_amount", "_ingestion_timestamp"
).show(5, truncate=False)

## Summary

In [None]:
# Generate summary statistics
summary = {
    "notebook": "01_bronze_slot_telemetry",
    "layer": "Bronze",
    "table": TABLE_NAME,
    "records_processed": quality_metrics["total_records"],
    "quality_score": quality_metrics["quality_score"],
    "processing_time": datetime.now().isoformat(),
    "status": "SUCCESS"
}

print("\n" + "="*50)
print("PROCESSING SUMMARY")
print("="*50)
print(json.dumps(summary, indent=2))
print("="*50)