In [0]:
# %pip install confluent_kafka
# %pip install httpx
# %pip install attrs
# %pip install authlib
# %restart_python

In [0]:
import pyspark.sql.functions as fn
from pyspark.sql.avro.functions import from_avro
from confluent_kafka.schema_registry import SchemaRegistryClient
import ssl
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
from pyspark.sql.functions import col


In [0]:
deltaTablePath = "dbfs:/Volumes/fhir_workspace/fhir_data_bronze/"
checkpointPath = "/Volumes/fhir_workspace/fhir_data_bronze/checkpoints/"

In [0]:
confluentClusterName = "fhir_cluster"
confluentBootstrapServers = dbutils.secrets.get("confluent", "bootstrap_server")
confluentApiKey = dbutils.secrets.get("confluent", "kafka_api_key")
confluentSecret = dbutils.secrets.get("confluent", "kafka_api_secret")
schemaRegistryUrl = dbutils.secrets.get("confluent", "schema_registry_endpoint")
confluentRegistryApiKey = dbutils.secrets.get("confluent", "schema_registry_api_key")
confluentRegistrySecret = dbutils.secrets.get("confluent", "schema_registry_api_secret")
confluentTopicName = "procedures"
deltaTablePath = deltaTablePath + confluentTopicName
checkpointPath = checkpointPath + confluentTopicName

In [0]:
schema_registry_conf = {
    'url': schemaRegistryUrl,
    'basic.auth.user.info': '{}:{}'.format(confluentRegistryApiKey, confluentRegistrySecret)}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

In [0]:
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
streamTestDf = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
  .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', 'valueSchemaId','fixedValue')
)

In [0]:

def parse_and_save(df, epoch_id):
    fromAvroOptions = {"mode": "FAILFAST"}

    def getSchema(id):
        return str(schema_registry_client.get_schema(id).schema_str)

    distinct_schema_ids = df.select(col("valueSchemaId").cast("integer")).distinct()

    for row in distinct_schema_ids.collect():
        current_schema_id = row.valueSchemaId
        schema_str = getSchema(current_schema_id)

        # Filter for the schema ID
        filter_df = df.filter(col("valueSchemaId") == current_schema_id)

        # Parse Avro and flatten fields
        parsed_df = filter_df.select(
            col("topic"), col("partition"), col("offset"), col("timestamp"), col("timestampType"), col("key"),
            from_avro("fixedValue", schema_str, fromAvroOptions).alias("parsedValue")
        )

        flat_df = parsed_df.select(
            col("topic"), col("partition"), col("offset"), col("timestamp"), col("timestampType"), col("key"),
            col('parsedValue.START').alias('start_timestamp'),
            col('parsedValue.STOP').alias('stop_timestamp'),
            col('parsedValue.PATIENT').alias('patient_id'),
            col('parsedValue.ENCOUNTER').alias('encounter_id'),
            col('parsedValue.SYSTEM').alias('system'),
            col('parsedValue.CODE').alias('code'),
            col('parsedValue.DESCRIPTION').alias('description'),
            col('parsedValue.BASE_COST').alias('base_cost'),
            col('parsedValue.REASONCODE').alias('reason_code'),
            col('parsedValue.REASONDESCRIPTION').alias('reason_description')
        )

        # Write to Unity Catalog table (replace this with your actual catalog.schema.table)
        flat_df.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .saveAsTable("fhir_workspace.fhir_data_bronze.procedures")

In [0]:
streamTestDf.writeStream \
  .option("checkpointLocation", checkpointPath) \
  .foreachBatch(parse_and_save) \
  .queryName("procedures_data") \
  .start()