In [0]:
%pip install confluent-kafka

In [0]:
from confluent_kafka import Producer
import json
from  itertools import islice
import numpy as np
from pyspark.sql.functions import col, decode, split, element_at, udf, lit, reduce, from_json
import logging
from pyspark.sql.types import StructType, StructField, StringType
import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkContext
import os
from functools import reduce
import time

In [0]:
## Assign configs
config = spark.read.option("multiline", "true").json("dbfs:/configs/config.json")
env = config.first()["env"].strip().lower()
lz_key = config.first()["lz_key"].strip().lower()

keyvault_name = f"ingest{lz_key}-meta002-{env}"

In [0]:
# Access the Service Principle secrets from keyvaults
client_secret = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-SECRET')
tenant_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-TENANT-ID')
client_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-ID')

In [0]:
## Paramaterise containers
curated_storage_account = f"ingest{lz_key}curated{env}"
curated_container = "gold"
silver_curated_container = "silver"

In [0]:
curated_storage_account = f"ingest{lz_key}curated{env}"
checkpoint_storage_account = f"ingest{lz_key}xcutting{env}"

##Assign OAuth to curated storage account
storage_accounts = [curated_storage_account, checkpoint_storage_account]

for storage_account in storage_accounts:
    configs = {
            f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net": "OAuth",
            f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net":
                "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
            f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net": client_id,
            f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net": client_secret,
            f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net":
                f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
        }
    for key,val in configs.items():
        spark.conf.set(key,val)

In [0]:
# Print out the auth config for each storage account to confirm
for storage_account in storage_accounts:
    key = f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net"
    print(f"{key}: {spark.conf.get(key, 'MISSING')}")

In [0]:
eh_kv_secret = dbutils.secrets.get(scope=keyvault_name, key="RootManageSharedAccessKey")

# Event Hub configurations
eventhubs_hostname = f"ingest{lz_key}-integration-eventHubNamespace001-{env}.servicebus.windows.net:9093"
conf = {
    'bootstrap.servers': eventhubs_hostname,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '$ConnectionString',
    'sasl.password': eh_kv_secret,
    'retries': 5,                     # Increased retries
    'enable.idempotence': True,        # Enable idempotent producer
}
broadcast_conf = sc.broadcast(conf)

In [0]:
# Databricks Workflow - Sequential State Processing
states = [
    "paymentPending", 
    "appealSubmitted", 
    "awaitingRespondentEvidence(a)", 
    "awaitingRespondentEvidence(b)", 
    "caseUnderReview", 
    "reasonForAppealSubmitted", 
    "listing",
    "PrepareForHearing",
    "Decision",
    "FTPA Submitted (a)",
    "FTPA Submitted (b)",
    "Decided (b)",
    "Decided (a)",
    "FTPA Decided",
    "Ended",
    "Remitted"
]

# Retrieve the state parameter from the Databricks Workflow
# The workflow will pass this parameter to each task
dbutils.widgets.text("current_state", "paymentPending", "State to Process")

# Get the current state to process
current_state = dbutils.widgets.get("current_state")

print(f"🔄 Processing state: {current_state}")

# Validate that the state exists in our list
if current_state not in states:
    raise ValueError(f"Invalid state: {current_state}. Must be one of: {states}")

# 
curated_storage_account = f"ingest{lz_key}curated{env}"
curated_container = "gold"
silver_curated_container = "silver"

gold_files_base_path = f"abfss://{curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{current_state}/"

try:
    files = dbutils.fs.ls(gold_files_base_path)[-1] # Index on newest file
    
    valid_json = files.path + "JSON/"
    print(f"📂 Valid JSON path: {valid_json}")

    ## Process INVALID_JSON
    invalid_json = files.path + "INVALID_JSON/"
    try:
        dbutils.fs.ls(invalid_json)
    except Exception:
        print(f"ℹ️ No INVALID_JSON directory found for state: {current_state}")
        print("✅ Processing complete for this state!")
    else:
        from pyspark.sql.functions import *
        
        # Load binary data
        binary_df = (
            spark.read.format('binaryFile')
            .option('pathGlobFilter', '*.{html,json}')
            .option('recursiveFileLookup', 'true')
            .load(invalid_json)
        )
        
        # Process data
        html_df = (
            binary_df
            .withColumn("content_str", decode(col('content'), 'utf-8'))
            .withColumn("file_path", element_at(split(col('path'), '/'), -1))
            .withColumn("state", lit(current_state))
            .select('content_str', 'file_path', 'state')
        )
        
        # Check if we have data to process
        record_count = html_df.count()
        if record_count == 0:
            print(f"ℹ️ No data to process for state: {current_state}")
        else:
            print(f"📊 Found {record_count} records for state: {current_state}")
            
            # Apply repartitioning
            num_spark_partitions = 1
            optimized_html_df = html_df.repartition(num_spark_partitions)
            
            display(optimized_html_df)
            
            # KAFKA CONFIGURATION
            print(f"📤 Sending {record_count} records to Kafka for state: {current_state}")
                        
            def process_partition(partition):
                import logging
                from confluent_kafka import Producer
                from datetime import datetime

                # Initialize logger
                logging.basicConfig(level=logging.INFO)
                logger = logging.getLogger('KafkaProducer')
                
                failure_list = []
                success_list = []
                results = []

                # Initialize producer
                producer = Producer(**broadcast_conf.value)

                for row in partition:
                    if row.file_path is None or row.content_str is None:
                        logger.warning(f"Skipping row with missing file_path/content_str: {row}")
                        continue

                    ## Use current row for callback
                    current_state_row = row.state
                    current_file_path = row.file_path

                    def delivery_report(err, msg):
                        key_str = msg.key().decode('utf-8') if msg.key() is not None else "Unknown"
                        timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
                        
                        if err is not None:
                            err_msg = str(err)
                            logger.error(f"Message delivery failed for key {key_str}: {err}")
                            failure_list.append((key_str, current_state_row, "failure", err_msg, timestamp))
                        else:
                            success_list.append((key_str, current_state_row, "success", "", timestamp))

                    try:
                        # Handle different content_str types
                        if isinstance(row.content_str, str):
                            value = row.content_str.encode('utf-8')
                        elif isinstance(row.content_str, bytearray):
                            value = bytes(row.content_str)
                        elif isinstance(row.content_str, bytes):
                            value = row.content_str
                        else:
                            logger.error(f"Unsupported type for content_str: {type(row.content_str)}")
                            failure_list.append((current_file_path, current_state_row, "failure", "Unsupported content type", datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")))
                            continue

                        # Produce message to Kafka
                        producer.produce(
                            topic=f'evh-active-pub-{env}-{lz_key}-uks-dlrm-01',
                            key=current_file_path.encode('utf-8'),
                            value=value,
                            callback=delivery_report
                        )

                    except BufferError:
                        logger.error("Producer buffer full. Polling for events.")
                        producer.poll(1)
                        # Retry the message production
                        try:
                            producer.produce(
                                topic=f'evh-active-pub-{env}-{lz_key}-uks-dlrm-01',
                                key=current_file_path.encode('utf-8'),
                                value=value,
                                callback=delivery_report
                            )
                        except Exception as retry_e:
                            logger.error(f"Failed to produce message after buffer retry: {retry_e}")
                            failure_list.append((current_file_path, current_state_row, "failure", f"Buffer error retry failed: {str(retry_e)}", datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")))
                            
                    except Exception as e:
                        logger.error(f"Unexpected error during production: {e}")
                        failure_list.append((current_file_path, current_state_row, "failure", str(e), datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")))

                # Flush producer and handle any remaining messages
                try:
                    producer.flush()
                    logger.info("Producer flushed successfully.")
                except Exception as e:
                    logger.error(f"Unexpected error during flush: {e}")

                # Combine all results
                results.extend(success_list)
                results.extend(failure_list)

                return results

            # Define schema for results
            from pyspark.sql.types import StructType, StructField, StringType
            
            schema = StructType([
                StructField("file_name", StringType(), True),
                StructField("state", StringType(), True),
                StructField("status", StringType(), True),
                StructField("error_message", StringType(), True),
                StructField("timestamp", StringType(), True)
            ])

            # Process the current state's data
            print(f"🔄 Starting Kafka processing for state: {current_state}")
            
            result_rdd = optimized_html_df.rdd.mapPartitions(process_partition)
            result_df = spark.createDataFrame(result_rdd, schema)

            # Trigger execution & force completion for this state
            kafka_result_count = result_df.count()
            print(f"📊 Kafka processing completed: {kafka_result_count} records for state: {current_state}")

            # Display results for this state
            display(result_df)
            
            # Optional: Save results for this state
            # result_df.write.mode("append").saveAsTable(f"kafka_results_{current_state}")
            
            print(f"✅ Successfully sent {record_count} records to Kafka for state: {current_state}")
            
            print(f"✅ Successfully processed state: {current_state}")

except Exception as e:
    print(f"❌ Error processing state {current_state}: {e}")
    raise  # Re-raise so the workflow task shows as failed

print(f"🎉 Completed processing for state: {current_state}")

In [0]:
## Display failed files

failed_files = result_df.filter(col("status") == "failure")

display(failed_files)
failed_files.count()

In [0]:
## Filter over the relevant state as all data is merged together and append each state seperately

final_results_df = result_df.coalesce(4)
silver_base_path = f"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{current_state}/publish_audit_db_eh"
    
(final_results_df
    .filter(col("state") == current_state)
    .write
    .format("delta")
    .mode("append")
    .save(f"{silver_base_path}")
)

In [0]:
time.sleep(60)

#Acknowledge data has been sent to EventHubs

In [0]:
ack_schema = StructType([
    StructField("file_name", StringType(), True),
    StructField("state", StringType(), True),
    StructField("status", StringType(), True),
    StructField("error_message", StringType(), True),
    StructField("timestamp", StringType(), True)
])

EH_NAMESPACE = f"ingest{lz_key}-integration-eventHubNamespace001-{env}"
EH_NAME = f"evh-active-pub-{env}-{lz_key}-uks-dlrm-01" #To create this Eventhub in the UI

connection_string = dbutils.secrets.get(keyvault_name, "RootManageSharedAccessKey")

KAFKA_OPTIONS = {
    "kafka.bootstrap.servers": f"{EH_NAMESPACE}.servicebus.windows.net:9093",
    "subscribe": EH_NAME,
    "consumer.group.id": current_state,
    # "startingOffsets": "earliest",
    "kafka.security.protocol": "SASL_SSL",
    "failOnDataLoss": "false",
    "startingOffsets": "latest",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";'
}

In [0]:
# Paths specific to this state
data_path = f"abfss://silver@ingest{lz_key}curated{env}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{current_state}/publish_audit_db_eh/"
checkpoint_path = f"abfss://db-ack-checkpoint@ingest{lz_key}xcutting{env}.dfs.core.windows.net/{current_state}/ACK/"

print(f"📂 Data path: {data_path}")
print(f"📂 Checkpoint path: {checkpoint_path}")

# Read stream from EventHub (Kafka)
eventhubdf = (
    spark.readStream.format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
)

# Parse payload
parsed_df = (
    eventhubdf
    .select(col("value").cast("string").alias("json_str"))
    .select(from_json(col("json_str"), ack_schema).alias("json_obj"))
    .select("json_obj.*")
)

# Start streaming write to Delta
query = (
    parsed_df.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .outputMode("append")
    .start(data_path)
)

# Wait briefly to allow ingestion, then stop
time.sleep(30)
query.stop()

# Read results back (optional validation step)
df = (
    spark.read.format("delta")
    .load(data_path)
    .filter(col("status").isNotNull())
)

print(f"Ack records for {current_state}: {df.count()}")
display(df)

print(f"✅ Acknowledgement processing completed for state: {current_state}")


In [0]:
dbutils.notebook.exit(f"{current_state} notebook completed successfully")