# Real-Time SMS Pipeline Using Spark Structured Streaming - LOCAL TESTING

This notebook implements a real-time data pipeline that:
1. Ingests SMS data from Kafka topics (FCDR and ECDR)
2. Applies business logic transformations using Spark DataFrames
3. Writes processed data to **LOCAL data lake** in Parquet format (`/home/homar/spark-kafka-stream/data_lake/`)
4. Loads data into Vertica for querying

## Pipeline Overview:
- **Source**: Kafka topics for SMS events (FCDR Jasmin, FCDR Telestax, ECDR)
- **Processing**: Spark Structured Streaming with 1-minute micro-batches
- **Target**: LOCAL data lake (Parquet) → Vertica (standard.fact_sms table)

## Local Testing Configuration:
- **Data Lake Path**: `/home/homar/spark-kafka-stream/data_lake/`
- **Checkpoints Path**: `/home/homar/spark-kafka-stream/checkpoints/`
- **OCI Cloud Storage**: Commented out for local testing

This pipeline replaces the complex Vertica-based ETL with a scalable Spark streaming solution, now configured for local development and testing.

## 1. Setup Spark Session and Kafka Configuration

In [None]:
!pip show pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime

# Initialize Spark Session with optimized configurations for Local Data Lake
spark = SparkSession.builder \
    .appName("RealTime_SMS_Pipeline_Local") \
    .master("local[*]") \
    .config("spark.driver.host", "0.0.0.0") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.port", "7077") \
    .config("spark.sql.streaming.checkpointFileManagerClass", "org.apache.spark.sql.execution.streaming.CheckpointFileManager") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.streaming.stateStore.maintenanceInterval", "600s") \
    .config("spark.jars", "/usr/local/spark/jars/vertica-jdbc.jar") \
    .config("spark.driver.extraClassPath", "/usr/local/spark/jars/vertica-jdbc.jar") \
    .config("spark.executor.extraClassPath", "/usr/local/spark/jars/vertica-jdbc.jar") \
    # OCI Configuration - Commented out for local testing
    # .config("spark.hadoop.fs.oci.client.hostname", "objectstorage.me-jeddah-1.oraclecloud.com") \
    # .config("spark.hadoop.fs.oci.client.auth.tenantId", "ocid1.tenancy.oc1..aaaaaaaaxxxxx") \
    # .config("spark.hadoop.fs.oci.client.auth.userId", "ocid1.user.oc1..aaaaaaaaxxxxx") \
    # .config("spark.hadoop.fs.oci.client.auth.fingerprint", "xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx") \
    # .config("spark.hadoop.fs.oci.client.auth.privateKeyFilePath", "/home/jovyan/code/oci_api_key.pem") \
    .getOrCreate()

# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")

# Production Kafka Configuration - Updated for your production environment
KAFKA_BOOTSTRAP_SERVERS = "strimzi-kafka-cluster-oci-prod-2-kafka-bootstrap.strimzi-kafka-prod-2:9092"

# Production Kafka Topics - Real topics from Unifonic production
KAFKA_TOPICS = {
    "fcdr_jasmin": [
        "src-aws-prod.prod.sms.fcdr.jasmin",
        "src-aws-prod.prod.sms.fcdr.scl-jasmin", 
        "src-new-oci-prod.prod.sms.fcdr.trx-oci-jasmin",
        "src-stc-prod.prod.sms.fcdr.jasmin",
        "src-oci-dr-ruh-prod.prod.sms.fcdr.trx-oci-jasmin"
    ],
    "fcdr_telestax": ["oci.prod.sms.fcdr.telestax"],
    "ecdr": ["prod.sms.ecdr.el", "src-aws-prod.prod.sms.ecdr.el"]
}

# OCI Data Lake Configuration - Commented out for local testing
# OCI_NAMESPACE = "unifonic"  # Replace with your OCI tenancy namespace
# OCI_BUCKET = "spark_data_lake"  # Your existing data lake bucket
# DATA_LAKE_PATH = f"oci://{OCI_BUCKET}@{OCI_NAMESPACE}/sms_pipeline/"
# CHECKPOINT_PATH = f"oci://{OCI_BUCKET}@{OCI_NAMESPACE}/checkpoints/"

# Local paths (for local testing) - NOW ACTIVE
LOCAL_DATA_LAKE_PATH = "/home/homar/spark-kafka-stream/data_lake/"
LOCAL_CHECKPOINT_PATH = "/home/homar/spark-kafka-stream/checkpoints/"

# Use local paths as primary for testing
DATA_LAKE_PATH = LOCAL_DATA_LAKE_PATH
CHECKPOINT_PATH = LOCAL_CHECKPOINT_PATH

print(" Spark Session initialized successfully for Local Data Lake")
print(f"Kafka Bootstrap Servers: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"Local Data Lake Path: {DATA_LAKE_PATH}")
print(f"Local Checkpoint Path: {CHECKPOINT_PATH}")
print(f"Pipeline will run every 1 minute")
print("Running in LOCAL testing environment!")


## 2. Define Kafka Topic Schemas

In [None]:
# Define schemas for parsing JSON data from Kafka topics

# FCDR Jasmin Schema - nested JSON structure under "data.body"
fcdr_jasmin_schema = StructType([
    StructField("data", StructType([
        StructField("body", StructType([
            StructField("messageid", StringType(), True),
            StructField("messageDeliveryStatus", StringType(), True),
            StructField("submitDate", StringType(), True),
            StructField("deliveryDate", StringType(), True),
            StructField("addrSrcDigits", StringType(), True),
            StructField("addrDstDigits", StringType(), True),
            StructField("origNetworkId", StringType(), True),
            StructField("networkId", StringType(), True),
            StructField("messageType", StringType(), True),
            StructField("first20CharactersOfSms", StringType(), True),
            StructField("reasonForFailure", StringType(), True),
            StructField("elMessageId", StringType(), True),
            StructField("elCorrelationId", StringType(), True),
            StructField("campaignId", StringType(), True),
            StructField("mprocNotes18", StringType(), True), # operator_id
            StructField("mprocNotes19", StringType(), True), # country_id
            StructField("mprocNotes20", StringType(), True), # provider_id
            StructField("mprocNotes4", StringType(), True),  # mnp_used
            StructField("diameterSubscriptionId", StringType(), True),
            StructField("diameterResponseStatus", StringType(), True),
            StructField("normalizedReason", StringType(), True),
            StructField("normalizedStatus", StringType(), True),
            StructField("charNumbers", StringType(), True),
            StructField("udh1MessageIdOfFirstPartMessage", StringType(), True),
            StructField("udh2NumberOfParts", StringType(), True),
            StructField("udh3NumberOfCurrentPart", StringType(), True),
            StructField("completeMessageBodyFlag", StringType(), True),
            StructField("aiClassification", StringType(), True)
        ]), True)
    ]), True),
    StructField("time_ingested", TimestampType(), True)
])

# FCDR Telestax Schema - flat structure
fcdr_telestax_schema = StructType([
    StructField("MessageId", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("SubmitDate", StringType(), True),
    StructField("RespTimestamp", StringType(), True),
    StructField("SourceAddr", StringType(), True),
    StructField("DestAddr", StringType(), True),
    StructField("OrigNetworkId", StringType(), True),
    StructField("NetworkId", StringType(), True),
    StructField("MessageOrDlr", StringType(), True),
    StructField("ShortMessageText20", StringType(), True),
    StructField("reason", StringType(), True),
    StructField("ELMessageId", StringType(), True),
    StructField("ELCorrelationId", StringType(), True),
    StructField("CampaignID", StringType(), True),
    StructField("mprocnotes", StringType(), True),
    StructField("UDHPartsCount", StringType(), True),
    StructField("UDHCurrentPartNumber", StringType(), True),
    StructField("UDHFirstMessageId", StringType(), True),
    StructField("CompleteMessageBodyFlag", StringType(), True),
    StructField("DiamSessionId", StringType(), True),
    StructField("DiamResponseStatus", StringType(), True),
    StructField("NormalizedReason", StringType(), True),
    StructField("NormalizedStatus", StringType(), True),
    StructField("CharNumbers", StringType(), True),
    StructField("time_ingested", TimestampType(), True)
])

# ECDR Schema - nested JSON structure under "data.cdrEvent"
ecdr_schema = StructType([
    StructField("data", StructType([
        StructField("cdrEvent", StructType([
            StructField("messageId", StringType(), True),
            StructField("messageStatus", StringType(), True),
            StructField("timestamp", StringType(), True),
            StructField("timestampA", StringType(), True),
            StructField("senderName", StringType(), True),
            StructField("recipient", StringType(), True),
            StructField("userId", StringType(), True),
            StructField("masterAccountId", StringType(), True),
            StructField("networkId", StringType(), True),
            StructField("messageParts", StringType(), True),
            StructField("providerId", StringType(), True),
            StructField("messageBody", StringType(), True),
            StructField("rejectReason", StringType(), True),
            StructField("correlationId", StringType(), True),
            StructField("campaignId", StringType(), True),
            StructField("messageType", StringType(), True),
            StructField("mnpUsed", StringType(), True),
            StructField("diameterSessionId", StringType(), True),
            StructField("diameterResponseCode", StringType(), True),
            StructField("numOfChars", StringType(), True),
            StructField("traceId", StringType(), True),
            StructField("customerStatus", StringType(), True),
            StructField("customerMessage", StringType(), True),
            StructField("isEncrypted", StringType(), True),
            StructField("smsClassification", StringType(), True)
        ]), True)
    ]), True),
    StructField("tags", StructType([
        StructField("sourceIp", StringType(), True)
    ]), True),
    StructField("time_ingested", TimestampType(), True)
])

print(" Schemas defined for FCDR Jasmin, FCDR Telestax, and ECDR topics")

## 3. Read Data from Kafka Topics

In [None]:
# Function to create Kafka streaming DataFrame for specific topics
def create_kafka_stream(topics_list, stream_name):
    """Create a Kafka streaming DataFrame for given topics"""
    topics_str = ",".join(topics_list)
    
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("subscribe", topics_str) \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .load()
    
    # Add metadata columns and convert to string
    parsed_df = kafka_df.selectExpr(
        "CAST(key AS STRING) as key",
        "CAST(value AS STRING) as value", 
        "topic",
        "partition",
        "offset",
        "timestamp as kafka_timestamp"
    ).withColumn("stream_name", lit(stream_name))
    
    print(f" Created streaming DataFrame for {stream_name}: {topics_str}")
    return parsed_df

# Create streaming DataFrames for each topic group
fcdr_jasmin_stream = create_kafka_stream(KAFKA_TOPICS["fcdr_jasmin"], "fcdr_jasmin")
fcdr_telestax_stream = create_kafka_stream(KAFKA_TOPICS["fcdr_telestax"], "fcdr_telestax")  
ecdr_stream = create_kafka_stream(KAFKA_TOPICS["ecdr"], "ecdr")

print("\n All Kafka streaming DataFrames created successfully!")

## 4. Parse and Clean Raw Kafka Data

In [None]:
# Parse JSON data from Kafka value column

def parse_fcdr_jasmin_data(df):
    """Parse FCDR Jasmin JSON data"""
    parsed_df = df.select(
        col("key"),
        from_json(col("value"), fcdr_jasmin_schema).alias("parsed_data"),
        col("topic"),
        col("partition"), 
        col("offset"),
        col("kafka_timestamp"),
        col("stream_name")
    ).select(
        col("key"),
        col("parsed_data.data.body.*"),
        col("parsed_data.time_ingested").alias("time_ingested"),
        col("topic"),
        col("partition"),
        col("offset"), 
        col("kafka_timestamp"),
        col("stream_name")
    ).filter(col("messageType") == "message")  # Only process messages, not DLRs
    
    return parsed_df

def parse_fcdr_telestax_data(df):
    """Parse FCDR Telestax JSON data"""
    parsed_df = df.select(
        col("key"),
        from_json(col("value"), fcdr_telestax_schema).alias("parsed_data"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"), 
        col("stream_name")
    ).select(
        col("key"),
        col("parsed_data.*"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"),
        col("stream_name")
    ).filter(col("MessageOrDlr") == "message")  # Only process messages, not DLRs
    
    return parsed_df

def parse_ecdr_data(df):
    """Parse ECDR JSON data"""
    parsed_df = df.select(
        col("key"),
        from_json(col("value"), ecdr_schema).alias("parsed_data"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"),
        col("stream_name")
    ).select(
        col("key"),
        col("parsed_data.data.cdrEvent.*"),
        col("parsed_data.tags.sourceIp").alias("tags_sourceIp"),
        col("parsed_data.time_ingested").alias("time_ingested"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"),
        col("stream_name")
    ).filter(col("messageStatus").isin("Rejected", "Failed", "Duplicated"))  # Only failed messages
    
    return parsed_df

# Parse the streaming DataFrames
fcdr_jasmin_parsed = parse_fcdr_jasmin_data(fcdr_jasmin_stream)
fcdr_telestax_parsed = parse_fcdr_telestax_data(fcdr_telestax_stream)
ecdr_parsed = parse_ecdr_data(ecdr_stream)

print(" Raw Kafka data parsed successfully for all streams")
print(" FCDR Jasmin: Messages from Jasmin SMSC servers")
print(" FCDR Telestax: Messages from Telestax SMSC servers") 
print(" ECDR: Failed/Rejected external layer events")

## 5. Transform FCDR Messages Data

In [None]:
# Transform FCDR Jasmin messages according to business logic
def transform_fcdr_jasmin_messages(df):
    """Apply transformations to FCDR Jasmin message data"""
    
    transformed_df = df.select(
        # Generate natural key for deduplication
        concat_ws("~",
            coalesce(col("messageid"), lit("0")),
            coalesce(col("origNetworkId"), lit("0")), 
            coalesce(col("addrDstDigits"), lit("0")),
            coalesce(date_format(col("submitDate").cast("timestamp"), "yyyyMMddHHmmssSSS"), lit("NONE")),
            coalesce(date_format(col("deliveryDate").cast("timestamp"), "yyyyMMddHHmmssSSS"), lit("NONE")),
            coalesce(col("networkId"), lit("0")),
            coalesce(upper(col("messageDeliveryStatus")), lit("NONE")),
            coalesce(col("udh3NumberOfCurrentPart"), lit("0"))
        ).alias("natural_key"),
        
        # Basic message fields
        col("messageid").cast("int").alias("message_id"),
        
        # Message status logic
        when(upper(col("messageDeliveryStatus")) == "SUCCESS_ESME", "SENT")
        .when((upper(col("messageDeliveryStatus")).isin("FAILED_ESME", "EL_FAILED", "EL_REJECTED", "EL_DUPLICATED", "FAILED")) 
              & (col("diameterResponseStatus") == "2001"), "SENT")
        .otherwise("NOTSENT").alias("message_status"),
        
        # Date handling
        coalesce(col("deliveryDate").cast("timestamp"), col("time_ingested")).alias("submit_date"),
        col("submitDate").cast("timestamp").alias("user_submit_date"),
        coalesce(col("deliveryDate").cast("timestamptz"), col("time_ingested").cast("timestamptz")).alias("submit_date_tz"),
        
        # Contact information
        col("addrSrcDigits").alias("sender_name"),
        substring(col("addrDstDigits"), 1, 20).alias("receiver"),
        
        # Network and account info
        col("origNetworkId").cast("int").alias("account_id"),
        col("networkId").cast("int").alias("network_id"),
        
        # SMS content
        col("first20CharactersOfSms").alias("short_message"),
        
        # Business logic fields
        upper(col("messageDeliveryStatus")).alias("event_status"),
        
        # DLR status logic 
        when(col("messageDeliveryStatus") == "success_esme", "PENDING")
        .when(col("messageDeliveryStatus").isin("failed_Esme", "el_failed", "el_rejected", "el_duplicated", "mproc_rejected", "mproc_dropped"), "NOTDELIVERED")
        .otherwise("NONE").alias("dlr_status"),
        
        # Region logic
        when(coalesce(col("mprocNotes19"), lit("")) != "3", "INTERNATIONAL")
        .otherwise("LOCAL").alias("region"),
        
        # Unit count
        coalesce(col("udh2NumberOfParts").cast("int"), lit(1)).alias("number_of_units"),
        
        # Provider information from mproc notes | check only number only on those reguler expression Like ("12345"+"98765")
        when(regexp_extract(col("mprocNotes18"), "^\\+?\\d+$", 0) != "", col("mprocNotes18").cast("bigint"))
        .otherwise(lit(0)).alias("operator_id"),
        
        when(regexp_extract(col("mprocNotes19"), "^\\+?\\d+$", 0) != "", col("mprocNotes19").cast("bigint"))
        .otherwise(lit(0)).alias("country_id"),
        
        when(regexp_extract(col("mprocNotes20"), "^\\+?\\d+$", 0) != "", col("mprocNotes20").cast("bigint"))
        .otherwise(lit(0)).alias("provider_id"),
        
        # MNP flag
        (col("mprocNotes4") == "Y").alias("mnp_used"),
        
        # Additional fields
        col("reasonForFailure").alias("failure"),
        col("elMessageId").alias("el_message_id"),
        col("elCorrelationId").alias("correlation_id"),
        col("campaignId").alias("campaign_id"),
        col("charNumbers").cast("int").alias("message_body_length"),
        coalesce(col("diameterSubscriptionId"), lit("null")).alias("diameter_session_id"),
        col("diameterResponseStatus").alias("diameter_response_status"),
        col("normalizedReason").alias("normalized_reason"),
        col("normalizedStatus").alias("normalized_status"),
        
        # Source identification
        lit("Jasmin").alias("source_type"),
        lit("FCDR").alias("cdr_kind"),
        
        # Timestamps
        current_timestamp().alias("created_time"),
        col("time_ingested"),
        
        # SMS Classification
        regexp_replace(col("aiClassification"), '"', '').alias("sms_classification"),
        
        # Kafka metadata
        col("topic"),
        col("partition"),
        col("offset")
    )
    
    return transformed_df

# Transform FCDR Telestax messages according to business logic  
def transform_fcdr_telestax_messages(df):
    """Apply transformations to FCDR Telestax message data"""
    
    transformed_df = df.select(
        # Generate natural key
        concat_ws("~",
            coalesce(col("MessageId"), lit("0")),
            coalesce(col("OrigNetworkId"), lit("0")),
            coalesce(col("DestAddr"), lit("0")),
            coalesce(date_format(col("SubmitDate").cast("timestamp"), "yyyyMMddHHmmssSSS"), lit("NONE")),
            coalesce(date_format(col("RespTimestamp").cast("timestamp"), "yyyyMMddHHmmssSSS"), lit("NONE")),
            coalesce(col("NetworkId"), lit("0")),
            coalesce(upper(col("Status")), lit("NONE")),
            coalesce(col("UDHCurrentPartNumber"), lit("0"))
        ).alias("natural_key"),
        
        # Basic message fields
        col("MessageId").cast("int").alias("message_id"),
        
        # Message status logic
        when(upper(col("Status")) == "SUCCESS_ESME", "SENT")
        .when((upper(col("Status")).isin("FAILED_ESME", "EL_FAILED", "EL_REJECTED", "EL_DUPLICATED", "FAILED"))
              & (col("DiamResponseStatus") == "2001"), "SENT")
        .otherwise("NOTSENT").alias("message_status"),
        
        # Date handling
        coalesce(col("RespTimestamp").cast("timestamp"), col("time_ingested")).alias("submit_date"),
        col("SubmitDate").cast("timestamp").alias("user_submit_date"), 
        coalesce(col("RespTimestamp").cast("timestamptz"), col("time_ingested").cast("timestamptz")).alias("submit_date_tz"),
        
        # Contact information
        col("SourceAddr").alias("sender_name"),
        substring(col("DestAddr"), 1, 20).alias("receiver"),
        
        # Network and account info
        col("OrigNetworkId").cast("int").alias("account_id"),
        col("NetworkId").cast("int").alias("network_id"),
        
        # SMS content
        col("ShortMessageText20").alias("short_message"),
        
        # Business logic fields
        upper(col("Status")).alias("event_status"),
        
        # DLR status logic
        when(upper(col("Status")) == "SUCCESS_ESME", "PENDING")
        .when(upper(col("Status")).isin("FAILED_ESME", "EL_FAILED", "EL_REJECTED", "EL_DUPLICATED", "MPROC_REJECTED", "MPROC_DROPPED"), "NOTDELIVERED")
        .otherwise("NONE").alias("dlr_status"),
        
        # Region logic based on mprocnotes part 19
        when(coalesce(split(col("mprocnotes"), ":").getItem(18), lit("")) != "3", "INTERNATIONAL")
        .otherwise("LOCAL").alias("region"),
        
        # Unit count
        coalesce(col("UDHPartsCount").cast("int"), lit(1)).alias("number_of_units"),
        
        # Provider information from mprocnotes
        when(regexp_extract(split(col("mprocnotes"), ":").getItem(17), "^\\+?\\d+$", 0) != "", 
             split(col("mprocnotes"), ":").getItem(17).cast("bigint"))
        .otherwise(lit(0)).alias("operator_id"),
        
        when(regexp_extract(split(col("mprocnotes"), ":").getItem(18), "^\\+?\\d+$", 0) != "",
             split(col("mprocnotes"), ":").getItem(18).cast("bigint"))
        .otherwise(lit(0)).alias("country_id"),
        
        when(regexp_extract(split(col("mprocnotes"), ":").getItem(19), "^\\+?\\d+$", 0) != "",
             split(col("mprocnotes"), ":").getItem(19).cast("bigint"))
        .otherwise(lit(0)).alias("provider_id"),
        
        # MNP flag
        (split(col("mprocnotes"), ":").getItem(3) == "Y").alias("mnp_used"),
        
        # Additional fields
        col("reason").alias("failure"),
        col("ELMessageId").alias("el_message_id"),
        col("ELCorrelationId").alias("correlation_id"),
        col("CampaignID").alias("campaign_id"),
        col("CharNumbers").cast("int").alias("message_body_length"),
        coalesce(col("DiamSessionId"), lit("null")).alias("diameter_session_id"),
        col("DiamResponseStatus").alias("diameter_response_status"),
        col("NormalizedReason").alias("normalized_reason"),
        col("NormalizedStatus").alias("normalized_status"),
        
        # Source identification
        lit("Telestax").alias("source_type"),
        lit("FCDR").alias("cdr_kind"),
        
        # Timestamps
        current_timestamp().alias("created_time"),
        col("time_ingested"),
        
        # Kafka metadata
        col("topic"),
        col("partition"), 
        col("offset")
    )
    
    return transformed_df

# Apply transformations
fcdr_jasmin_transformed = transform_fcdr_jasmin_messages(fcdr_jasmin_parsed)
fcdr_telestax_transformed = transform_fcdr_telestax_messages(fcdr_telestax_parsed)

print(" FCDR message transformations completed")
print(" Applied business logic for message status, regions, providers, and DLR status")

## 6. Transform ECDR Data

In [None]:
# Transform ECDR data according to business logic

# That regular expression "^\\+?\\d+$" means:

# ^ → match from the start of the string.

# \\+? → optionally (?) match a plus sign +.

# The double backslash is because in Python strings you escape \ as \\.

# \\d+ → match one or more digits (0–9).

# $ → match the end of the string.
def transform_ecdr_data(df):
    """Apply transformations to ECDR data (External CDR events)"""
    
    transformed_df = df.select(
        # Basic message fields
        col("messageId").cast("int").alias("message_id"),
        
        # Message status logic for ECDR
        when(col("messageStatus") == "success_esme", "SENT")
        .when((col("messageStatus").isin("Failed_esme", "Failed", "Rejected", "Duplicated"))
              & (coalesce(col("diameterResponseCode"), lit("X")  ) == "2001"), "SENT")
        .otherwise("NOTSENT").alias("message_status"),
        
        # Date handling
        coalesce(col("timestampA").cast("timestamp"), col("time_ingested")).alias("submit_date"),
        col("timestamp").cast("timestamp").alias("user_submit_date"),
        coalesce(col("timestampA").cast("timestamptz"), col("time_ingested").cast("timestamptz")).alias("submit_date_tz"),
        
        # Contact information  
        col("senderName").alias("sender_name"),
        col("recipient").cast("string").alias("receiver"),
        
        # Account and network info (ECDR accounts get +20000 offset)
        when((regexp_extract(col("userId"), "^\\+?\\d+$", 0) != "") & (length(col("userId")) >= 1),
             col("userId").cast("int") + 20000)
        .otherwise(lit(0)).alias("account_id"),
        
        when((regexp_extract(col("masterAccountId"), "^\\+?\\d+$", 0) != "") & (length(col("masterAccountId")) >= 1),
             col("masterAccountId").cast("int"))
        .otherwise(lit(0)).alias("master_account_id"),
        
        when((regexp_extract(col("networkId"), "^\\+?\\d+$", 0) != "") & (length(col("networkId")) >= 1),
             col("networkId").cast("int"))
        .otherwise(lit(0)).alias("network_id"),
        
        # SMS content
        regexp_replace(col("messageBody"), '"', '').alias("short_message"),
        
        # Event status mapping for ECDR
        when(col("messageStatus") == "Rejected", "EL_REJECTED")
        .when(col("messageStatus") == "Duplicated", "EL_DUPLICATED") 
        .when(col("messageStatus") == "Failed", "EL_FAILED")
        .otherwise(col("messageStatus")).alias("event_status"),
        
        # Protocol type for ECDR
        lit("HTTP").alias("source_protocol"),
        
        # Unit count
        when((regexp_extract(col("messageParts"), "^\\+?\\d+$", 0) != "") & (length(col("messageParts")) >= 1),
             col("messageParts").cast("int"))
        .otherwise(lit(0)).alias("number_of_units"),
        
        # Provider ID
        when((regexp_extract(col("providerId"), "^\\+?\\d+$", 0) != "") & (length(col("providerId")) >= 1),
             col("providerId").cast("int"))
        .otherwise(lit(0)).alias("provider_id"),
        
        # ECDR specific fields
        lit("NONE").alias("dlr_status"),
        lit("LOCAL").alias("region"),  # ECDR is typically local traffic
        
        # Additional fields
        col("rejectReason").alias("failure"),
        col("messageId").alias("el_message_id"),  # ECDR messageId is the EL message ID
        col("correlationId").alias("correlation_id"),
        col("campaignId").alias("campaign_id"),
        col("messageType").alias("message_type"),
        col("mnpUsed").cast("boolean").alias("mnp_used"),
        
        when(col("diameterSessionId").cast("string") != "null", col("diameterSessionId"))
        .otherwise(lit(null)).alias("diameter_session_id"),
        
        col("diameterResponseCode").alias("diameter_response_status"),
        
        when((regexp_extract(col("numOfChars"), "^\\+?\\d+$", 0) != "") & (length(col("numOfChars")) >= 1),
             col("numOfChars").cast("int"))
        .otherwise(lit(0)).alias("message_body_length"),
        
        col("traceId").alias("trace_id"),
        col("customerStatus").alias("customer_status"),
        col("customerMessage").alias("customer_reason"),
        
        # Charge message ID for ECDR
        when(regexp_extract(col("diameterSessionId").cast("string"), "^-?\\d+$", 0) != "",
             col("diameterSessionId").cast("int"))
        .otherwise(lit(null)).alias("charge_message_id"),
        
        # Product classification logic
        when(lower(split(col("correlationId"), ":").getItem(1)) == "sms",
             initcap(split(col("correlationId"), ":").getItem(0)))
        .when(col("campaignId").isNotNull() & (col("campaignId") != ""),
             lit("Campaign"))
        .otherwise(lit("SMS_Native")).alias("product"),
        
        # Source identification
        lit("kafka").alias("ingestion_source"),
        lit("ECDR").alias("cdr_kind"),
        
        # Additional ECDR fields
        col("userId").alias("el_user_id"),
        col("tags_sourceIp").alias("source_ip"),
        col("isEncrypted").cast("int").alias("encrypted"),
        
        # Product ID extraction
        when(lower(split(col("correlationId"), ":").getItem(0)) == "flowstudio",
             split(col("correlationId"), ":").getItem(2))
        .when(lower(split(col("correlationId"), ":").getItem(0)) == "campaign",
             split(col("correlationId"), ":").getItem(2))
        .otherwise(lit(null)).alias("product_id"),
        
        # White network ID logic(0 --< refer to 0 means "extract the whole match" (not just a capture group).)
        when(regexp_extract(col("networkId"), "^\\d+$", 0) != "",
             (col("networkId").cast("int").between(500, 599)) |  # White Campaign
             (col("networkId").cast("int").between(10, 99)) |    # White OTP
             (col("networkId").cast("int").between(200, 299)) |  # White OTP  
             (col("networkId").cast("int").between(700, 799)) |  # White Campaign
             (col("networkId").cast("int").between(400, 499)) |  # White OTP
             (col("networkId").cast("int").between(600, 699)) |  # White OTP
             (col("networkId").cast("int").between(800, 899)))   # White International
        .otherwise(lit(false)).alias("white_network_id"),
        
        # Ad sender name detection
        (lower(col("senderName")).like("ad-%") | lower(col("senderName")).like("%-ad")).alias("ad_sender_name"),
        
        # SMS Classification
        regexp_replace(col("smsClassification"), '"', '').alias("sms_classification"),
        
        # Timestamps
        current_timestamp().alias("created_time"),
        col("time_ingested"),
        
        # Kafka metadata
        col("topic"),
        col("partition"),
        col("offset")
    )
    
    return transformed_df

# Apply ECDR transformations
ecdr_transformed = transform_ecdr_data(ecdr_parsed)

print(" ECDR transformations completed")
print(" Applied business logic for external layer events (failed/rejected messages)")

## 7. Consolidate Traffic Data

In [None]:
# Consolidate all traffic data into unified schema
def create_unified_fact_schema(fcdr_jasmin_df, fcdr_telestax_df, ecdr_df):
    """Create unified fact table schema from all data sources"""
    
    # Common columns for all traffic types
    common_columns = [
        "message_id", "message_status", "submit_date", "sender_name", "receiver",
        "event_status", "account_id", "network_id", "number_of_units", 
        "operator_id", "country_id", "provider_id", "dlr_status", "region",
        "short_message", "failure", "el_message_id", "correlation_id", 
        "campaign_id", "mnp_used", "diameter_session_id", "diameter_response_status",
        "normalized_reason", "normalized_status", "user_submit_date", 
        "message_body_length", "source_type", "cdr_kind", "created_time", 
        "time_ingested", "submit_date_tz", "sms_classification",
        "topic", "partition", "offset"
    ]
    
    # Standardize FCDR Jasmin DataFrame
    fcdr_jasmin_unified = fcdr_jasmin_df.select(
        *common_columns,
        lit(null).cast("string").alias("source_protocol"),
        lit(null).cast("int").alias("master_account_id"), 
        lit(null).cast("string").alias("message_type"),
        lit(null).cast("string").alias("trace_id"),
        lit(null).cast("int").alias("charge_message_id"),
        lit(null).cast("string").alias("product"),
        lit(null).cast("string").alias("ingestion_source"),
        lit(null).cast("string").alias("el_user_id"),
        lit(null).cast("string").alias("source_ip"),
        lit(null).cast("int").alias("encrypted"),
        lit(null).cast("string").alias("product_id"),
        lit(null).cast("boolean").alias("white_network_id"),
        lit(null).cast("boolean").alias("ad_sender_name"),
        # Date fields for partitioning
        date(col("submit_date")).alias("date_nk"),
        hour(col("submit_date")).alias("hour_nk"),
        minute(col("submit_date")).alias("minute_nk")
    )
    
    # Standardize FCDR Telestax DataFrame
    fcdr_telestax_unified = fcdr_telestax_df.select(
        *common_columns,
        lit(null).cast("string").alias("source_protocol"),
        lit(null).cast("int").alias("master_account_id"),
        lit(null).cast("string").alias("message_type"), 
        lit(null).cast("string").alias("trace_id"),
        lit(null).cast("int").alias("charge_message_id"),
        lit(null).cast("string").alias("product"),
        lit(null).cast("string").alias("ingestion_source"),
        lit(null).cast("string").alias("el_user_id"),
        lit(null).cast("string").alias("source_ip"),
        lit(null).cast("int").alias("encrypted"),
        lit(null).cast("string").alias("product_id"),
        lit(null).cast("boolean").alias("white_network_id"),
        lit(null).cast("boolean").alias("ad_sender_name"),
        # Date fields for partitioning
        date(col("submit_date")).alias("date_nk"),
        hour(col("submit_date")).alias("hour_nk"),
        minute(col("submit_date")).alias("minute_nk")
    )
    
    # Standardize ECDR DataFrame
    ecdr_unified = ecdr_df.select(
        *common_columns,
        col("source_protocol"),
        col("master_account_id"),
        col("message_type"),
        col("trace_id"),
        col("charge_message_id"),
        col("product"),
        col("ingestion_source"),
        col("el_user_id"),
        col("source_ip"),
        col("encrypted"),
        col("product_id"),
        col("white_network_id"),
        col("ad_sender_name"),
        # Date fields for partitioning
        date(col("submit_date")).alias("date_nk"),
        hour(col("submit_date")).alias("hour_nk"),
        minute(col("submit_date")).alias("minute_nk")
    )
    
    # Union all DataFrames
    consolidated_df = fcdr_jasmin_unified.unionByName(fcdr_telestax_unified).unionByName(ecdr_unified)
    
    return consolidated_df

# Create consolidated traffic DataFrame
consolidated_traffic = create_unified_fact_schema(
    fcdr_jasmin_transformed, 
    fcdr_telestax_transformed, 
    ecdr_transformed
)

# Add additional business logic fields
final_traffic = consolidated_traffic.select(
    "*",
    # Generate unique record ID
    concat(
        col("cdr_kind"), lit("_"),
        col("message_id"), lit("_"),
        col("account_id"), lit("_"),
        col("receiver"), lit("_"),
        unix_timestamp(col("submit_date"))
    ).alias("record_id"),
    
    # Operator flag logic
    when((col("event_status") == "mproc_dropped") |
         (col("event_status") == "failed_esme") |
         (col("event_status") == "failed") |
         ((col("event_status") == "success_esme") & 
          (coalesce(col("failure"), lit("X")).isin("exist in DND list", "message is duplicate") == False)),
         lit(True))
    .otherwise(lit(False)).alias("operator_flag"),
    
    # Remarks for data quality
    when(col("submit_date") == col("time_ingested"), 
         lit("submit_date populated with time_ingested"))
    .otherwise(lit(null)).alias("remarks"),
    
    # Final message flag
    when((col("short_message").isNotNull()) &
         (col("event_status") != "partial_esme"),
         lit(True))
    .otherwise(lit(False)).alias("final_message")
)

print(" Traffic data consolidated successfully")
print(" Combined FCDR Jasmin, FCDR Telestax, and ECDR into unified schema")
print(" Added business logic for operator flags, remarks, and final message classification")

## 8. Write to Data Lake (Parquet)

In [None]:
# Write processed data to Local Data Lake in Parquet format

# OCI Data Lake Function - Commented out for local testing
# def write_to_oci_data_lake(df, output_path, checkpoint_path):
#     """Write streaming DataFrame to OCI data lake with partitioning"""
#     
#     # Add partitioning columns
#     partitioned_df = df.withColumn("year", year(col("submit_date"))) \
#                        .withColumn("month", month(col("submit_date"))) \
#                        .withColumn("day", dayofmonth(col("submit_date"))) \
#                        .withColumn("hour", hour(col("submit_date"))) \
#                        .withColumn("minute", minute(col("submit_date")))
#     
#     # Write to OCI data lake with partitioning and checkpointing
#     query = partitioned_df.writeStream \
#         .format("parquet") \
#         .option("path", output_path) \
#         .option("checkpointLocation", checkpoint_path) \
#         .partitionBy("year", "month", "day", "hour", "minute") \
#         .outputMode("append") \
#         .trigger(processingTime="1 minute") \
#         .option("maxFilesPerTrigger", "1000") \
#         .option("maxRecordsPerFile", "100000") \
#         .start()
#     
#     return query

def write_to_local_data_lake(df, output_path, checkpoint_path):
    """Write streaming DataFrame to local data lake with partitioning"""
    
    # Add partitioning columns
    partitioned_df = df.withColumn("year", year(col("submit_date"))) \
                       .withColumn("month", month(col("submit_date"))) \
                       .withColumn("day", dayofmonth(col("submit_date"))) \
                       .withColumn("hour", hour(col("submit_date"))) \
                       .withColumn("minute", minute(col("submit_date")))
    
    # Write to local storage
    query = partitioned_df.writeStream \
        .format("parquet") \
        .option("path", output_path) \
        .option("checkpointLocation", checkpoint_path) \
        .partitionBy("year", "month", "day", "hour", "minute") \
        .outputMode("append") \
        .trigger(processingTime="1 minute") \
        .option("maxFilesPerTrigger", "1000") \
        .start()
    
    return query

# Set up paths for local data lake output
fact_sms_output_path = f"{DATA_LAKE_PATH}fact_sms/"
fact_sms_checkpoint_path = f"{CHECKPOINT_PATH}fact_sms_checkpoint/"

print(" Setting up LOCAL data lake write operations...")
print(f"Output path: {fact_sms_output_path}")
print(f"Checkpoint path: {fact_sms_checkpoint_path}")
print("Trigger: Every 1 minute")
print("Format: Parquet with partitioning by year/month/day/hour/minute")

# Create directories if they don't exist
import os
os.makedirs(fact_sms_output_path, exist_ok=True)
os.makedirs(fact_sms_checkpoint_path, exist_ok=True)

# Write directly to local storage (no OCI fallback needed)
print("Writing to LOCAL Data Lake...")

# Start streaming query to write to local storage
data_lake_query = write_to_local_data_lake(
    final_traffic,
    fact_sms_output_path,
    fact_sms_checkpoint_path
)

print("Local data lake streaming query started successfully")
print(f"Data will be written to {fact_sms_output_path} every minute")
print("You can monitor the parquet files in your local directory")

# OCI Code - Commented out for local testing
# try:
#     print(" Attempting to write to OCI Data Lake...")
#     
#     # Test OCI connectivity by trying to write a small test file
#     test_df = spark.createDataFrame([("test",)], ["value"])
#     test_path = f"{DATA_LAKE_PATH}test/"
#     test_df.write.mode("overwrite").parquet(test_path)
#     
#     print(" OCI Data Lake is accessible - using cloud storage")
#     
#     # Start streaming query to write to OCI data lake
#     data_lake_query = write_to_oci_data_lake(
#         final_traffic,
#         fact_sms_output_path,
#         fact_sms_checkpoint_path
#     )
#     
#     print(" OCI Data lake streaming query started successfully")
#     print("  Data will be written to OCI bucket 'spark_data_leak' every minute")
#     
# except Exception as e:
#     print(f"OCI Data Lake not accessible: {e}")
#     print("Falling back to local storage for testing...")
# 
#     # Start streaming query to write to local storage
#     data_lake_query = write_to_local_fallback(
#         final_traffic,
#         local_fact_sms_output_path,
#         local_fact_sms_checkpoint_path
#     )
#     
#     print("Local data lake streaming query started")
#     print("Data will be written to local storage every minute")
#     print("Remember to configure OCI credentials for production use")

## 9. Load to Vertica Table

In [None]:
# Setup Vertica Table Structure for Testing
import jaydebeapi

def setup_vertica_table():
    """Create the fact_sms table in Vertica container if it doesn't exist"""
    
    try:
        # Connect to Vertica using jaydebeapi
        conn = jaydebeapi.connect(
            "com.vertica.jdbc.Driver",
            VERTICA_CONFIG["url"],
            [VERTICA_CONFIG["user"], VERTICA_CONFIG["password"]],
            "/usr/local/spark/jars/vertica-jdbc.jar"
        )
        
        cursor = conn.cursor()
        
        # Create schema if not exists
        cursor.execute("CREATE SCHEMA IF NOT EXISTS standard;")
        
        # Create fact_sms table
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS standard.fact_sms (
            record_id VARCHAR(500),
            cdr_kind VARCHAR(10),
            date_nk DATE,
            hour_nk INTEGER,
            message_id INTEGER,
            message_status VARCHAR(20),
            submit_date TIMESTAMP,
            sender_name VARCHAR(50),
            receiver VARCHAR(50),
            event_status VARCHAR(50),
            source_protocol VARCHAR(20),
            account_id INTEGER,
            master_account_id INTEGER,
            network_id INTEGER,
            number_of_units INTEGER,
            provider_id INTEGER,
            dlr_status VARCHAR(20),
            region VARCHAR(20),
            short_message VARCHAR(200),
            failure VARCHAR(200),
            el_message_id VARCHAR(100),
            correlation_id VARCHAR(200),
            campaign_id VARCHAR(100),
            message_type VARCHAR(50),
            mnp_used BOOLEAN,
            diameter_session_id VARCHAR(100),
            diameter_response_status VARCHAR(20),
            user_submit_date TIMESTAMP,
            message_body_length INTEGER,
            trace_id VARCHAR(100),
            created_time TIMESTAMP,
            time_ingested TIMESTAMP,
            submit_date_tz TIMESTAMPTZ,
            remarks VARCHAR(200),
            operator_flag BOOLEAN,
            final_message BOOLEAN,
            charge_message_id INTEGER,
            product VARCHAR(50),
            ingestion_source VARCHAR(50),
            el_user_id VARCHAR(50),
            source_ip VARCHAR(50),
            encrypted INTEGER,
            product_id VARCHAR(50),
            white_network_id BOOLEAN,
            ad_sender_name BOOLEAN,
            sms_classification VARCHAR(50)
        )
        ORDER BY submit_date
        SEGMENTED BY HASH(record_id) ALL NODES;
        """
        
        cursor.execute(create_table_sql)
        conn.commit()
        
        print("Vertica table standard.fact_sms created successfully")
        print("Table is ready to receive streaming data")
        
        cursor.close()
        conn.close()
        
    except Exception as e:
        print(f"Error setting up Vertica table: {e}")
        print("Please ensure Vertica container is running and accessible")

# Run table setup
setup_vertica_table()

In [None]:
# Load processed data to Vertica standard.fact_sms table

# Vertica connection configuration for Docker container
VERTICA_CONFIG = {
    "url": "jdbc:vertica://ci-vertica-db:5433/customer_insights",
    "user": "customer_insights",
    "password": "customer_insights1",
    "driver": "com.vertica.jdbc.Driver"
}

def write_to_vertica(batch_df, batch_id):
    """Write each micro-batch to Vertica using JDBC"""
    
    if batch_df.count() > 0:
        print(f"Processing batch {batch_id} with {batch_df.count()} records")
        
        # Select only the columns that exist in Vertica fact_sms table
        vertica_columns = [
            "record_id", "cdr_kind", "date_nk", "hour_nk", "message_id", 
            "message_status", "submit_date", "sender_name", "receiver", 
            "event_status", "source_protocol", "account_id", "master_account_id",
            "network_id", "number_of_units", "provider_id", "dlr_status", 
            "region", "short_message", "failure", "el_message_id", 
            "correlation_id", "campaign_id", "message_type", "mnp_used",
            "diameter_session_id", "diameter_response_status", "user_submit_date",
            "message_body_length", "trace_id", "created_time", "time_ingested",
            "submit_date_tz", "remarks", "operator_flag", "final_message",
            "charge_message_id", "product", "ingestion_source", "el_user_id",
            "source_ip", "encrypted", "product_id", "white_network_id",
            "ad_sender_name", "sms_classification"
        ]
        
        # Select and prepare data for Vertica (unpacking)
        vertica_df = batch_df.select(*vertica_columns)
        
        try:
            # Write to Vertica using JDBC with upsert mode
            vertica_df.write \
                .format("jdbc") \
                .option("url", VERTICA_CONFIG["url"]) \
                .option("user", VERTICA_CONFIG["user"]) \
                .option("password", VERTICA_CONFIG["password"]) \
                .option("driver", VERTICA_CONFIG["driver"]) \
                .option("dbtable", "standard.fact_sms") \
                .option("batchsize", "10000") \
                .option("truncate", "false") \
                .mode("append") \
                .save()
            
            print(f"Batch {batch_id} successfully written to Vertica container")
        except Exception as e:
            print(f"Error writing batch {batch_id} to Vertica: {e}")
            # Continue processing even if Vertica write fails
    else:
        print(f"Batch {batch_id} is empty, skipping Vertica write")

# Alternative: Write to Vertica using foreachBatch for better control
def create_vertica_writer(df, checkpoint_path):
    """Create streaming query that writes to Vertica"""
    
    query = df.writeStream \
        .foreachBatch(write_to_vertica) \
        .option("checkpointLocation", checkpoint_path) \
        .outputMode("update") \
        .trigger(processingTime="1 minute") \
        .start()
    
    return query

# Set up Vertica checkpoint path
vertica_checkpoint_path = f"{CHECKPOINT_PATH}vertica_checkpoint/"

print("Setting up Vertica write operations for Docker container...")
print(f"Vertica URL: {VERTICA_CONFIG['url']}")
print(f"Target table: standard.fact_sms")
print(f"Checkpoint path: {vertica_checkpoint_path}")
print("Connecting to Vertica container: ci-vertica-db")

# Start streaming query to write to Vertica
vertica_query = create_vertica_writer(
    final_traffic,
    vertica_checkpoint_path         
)

print("Vertica streaming query started")
print("Data will be loaded to Vertica container every minute using JDBC")

## 10. Monitor Pipeline Performance

In [None]:
# Monitor streaming pipeline performance and health

import time
from datetime import datetime

def monitor_streaming_queries():
    """Monitor the health and performance of streaming queries"""
    
    # Get all active streaming queries
    active_queries = spark.streams.active
    
    print("📊 STREAMING PIPELINE MONITORING")
    print("=" * 50)
    print(f"⏰ Timestamp: {datetime.now()}")
    print(f"🔍 Active Queries: {len(active_queries)}")
    
    for i, query in enumerate(active_queries):
        print(f"\n📈 Query {i+1}: {query.name if query.name else 'Unnamed'}")
        print(f"   ID: {query.id}")
        print(f"   Status: {query.status}")
        
        if query.status == "ACTIVE":
            try:
                # Get progress information
                progress = query.lastProgress
                if progress:
                    print(f"   Batch ID: {progress.get('batchId', 'N/A')}")
                    print(f"   Input Rows: {progress.get('inputRowsPerSecond', 'N/A')}")
                    print(f"   Processing Time: {progress.get('durationMs', {}).get('triggerExecution', 'N/A')} ms")
                    
                    # Check for any sources
                    sources = progress.get('sources', [])
                    for source in sources:
                        print(f"   Source: {source.get('description', 'Unknown')}")
                        print(f"   Input Rows: {source.get('inputRowsPerSecond', 'N/A')}")
                        
            except Exception as e:
                print(f"   ⚠️  Error getting progress: {e}")
        else:
            print(f"   ⚠️  Query is not active!")

def data_quality_checks(df_sample):
    """Perform basic data quality checks on streaming data"""
    
    print("\n🔍 DATA QUALITY CHECKS")
    print("=" * 30)
    
    try:
        # Basic counts
        total_records = df_sample.count() if df_sample else 0
        print(f"📊 Total Records in Sample: {total_records}")
        
        if total_records > 0:
            # Check for null message IDs
            null_message_ids = df_sample.filter(col("message_id").isNull()).count()
            print(f"❌ Null Message IDs: {null_message_ids}")
            
            # Check message status distribution
            status_dist = df_sample.groupBy("message_status").count().collect()
            print("📈 Message Status Distribution:")
            for row in status_dist:
                print(f"   {row['message_status']}: {row['count']}")
            
            # Check CDR kind distribution
            cdr_dist = df_sample.groupBy("cdr_kind").count().collect()
            print("📈 CDR Kind Distribution:")
            for row in cdr_dist:
                print(f"   {row['cdr_kind']}: {row['count']}")
                
            # Check for recent data
            recent_data = df_sample.filter(
                col("time_ingested") > (current_timestamp() - expr("INTERVAL 5 MINUTES"))
            ).count()
            print(f"⏰ Records in Last 5 Minutes: {recent_data}")
            
    except Exception as e:
        print(f"⚠️  Error in data quality checks: {e}")

# Function to get sample data for monitoring
def get_sample_data():
    """Get sample data from the current stream for quality checks"""
    try:
        # Get recent data from data lake for monitoring
        sample_path = f"{DATA_LAKE_PATH}fact_sms/"
        
        # Read recent parquet files for monitoring
        current_time = datetime.now()
        sample_df = spark.read.parquet(sample_path) \
            .filter(col("time_ingested") > (current_timestamp() - expr("INTERVAL 10 MINUTES"))) \
            .limit(1000)
        
        return sample_df
    except Exception as e:
        print(f"⚠️  Cannot read sample data: {e}")
        return None

# Monitor streaming queries
monitor_streaming_queries()

# Perform data quality checks
print("\n" + "="*60)
sample_data = get_sample_data()
data_quality_checks(sample_data)

print("\n" + "="*60)
print("✅ PIPELINE MONITORING COMPLETE")
print("🔄 Pipeline is processing SMS data in real-time every minute")
print("📊 Monitor this cell regularly to check pipeline health")
print("⚠️  Watch for any error messages or quality issues")

## Pipeline Summary & Next Steps

### ✅ What This Pipeline Accomplishes:

1. **Real-time Data Ingestion**: Reads from multiple Kafka topics (FCDR Jasmin, FCDR Telestax, ECDR)
2. **Business Logic Transformation**: Applies the same business rules as the original SQL-based pipeline
3. **Data Lake Storage**: Writes processed data to S3 in Parquet format with time-based partitioning
4. **Vertica Integration**: Loads data into Vertica for querying and reporting
5. **Monitoring**: Includes health checks and data quality monitoring

### 🔧 Configuration Required:

- **Kafka Configuration**: Update `KAFKA_BOOTSTRAP_SERVERS` with production values
- **S3 Configuration**: Configure AWS credentials and S3 bucket access
- **Vertica Configuration**: Update `VERTICA_CONFIG` with actual connection details
- **Spark Configuration**: Tune memory and resource settings for production

### 🚀 Deployment Options:

1. **Local Development**: Run this notebook for testing and development
2. **Spark Cluster**: Deploy as Spark application on cluster for production
3. **Cloud Services**: Use managed Spark services (AWS EMR, Azure Synapse, etc.)
4. **Container Deployment**: Package as Docker container for Kubernetes deployment

### 📊 Benefits vs Original Pipeline:

- **Scalability**: Spark can handle much higher throughput than Vertica ETL
- **Real-time Processing**: 1-minute micro-batches vs batch processing
- **Maintainability**: Simple DataFrame operations vs complex SQL
- **Monitoring**: Built-in streaming metrics and health checks
- **Cost Efficiency**: Reduces load on expensive Vertica resources

In [None]:
# Pipeline Cleanup and Shutdown (Run when stopping the pipeline)

def shutdown_pipeline():
    """Gracefully shutdown all streaming queries"""
    
    print("🛑 SHUTTING DOWN PIPELINE")
    print("=" * 30)
    
    active_queries = spark.streams.active
    
    for query in active_queries:
        try:
            print(f"⏹️  Stopping query: {query.id}")
            query.stop()
            print(f"✅ Query {query.id} stopped successfully")
        except Exception as e:
            print(f"❌ Error stopping query {query.id}: {e}")
    
    print("🔄 Waiting for queries to fully stop...")
    
    # Wait for all queries to stop
    for query in active_queries:
        try:
            query.awaitTermination(timeout=30)  # Wait max 30 seconds
        except:
            pass
    
    print("✅ All streaming queries stopped")
    print("🧹 Pipeline shutdown complete")

# Uncomment the following line when you want to stop the pipeline
# shutdown_pipeline()

print("🚀 REAL-TIME SMS PIPELINE IS RUNNING")
print("=" * 40)
print("📡 Ingesting data from Kafka topics")
print("🔄 Processing every 1 minute")
print("💾 Writing to S3 data lake")
print("🗄️  Loading to Vertica")
print("📊 Monitor the pipeline health above")
print("\n⚠️  To stop the pipeline, uncomment and run: shutdown_pipeline()")