In [None]:
from pyspark.sql import SparkSession
from confluent_kafka.admin import AdminClient

spark = (SparkSession.builder
    .appName("SparkProcessor")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "4")
    # Add Kafka package for reading from Kafka
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
    .getOrCreate()
)

KAFKA_BOOTSTRAP_SERVERS = "cp-kafka:9092"
admin_client = AdminClient({
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS
})

# List all topics
try:
    metadata = admin_client.list_topics(timeout=10)
    print("Available Kafka Topics:")
    print("-" * 50)
    for topic in metadata.topics.values():
        print(f"Topic: {topic.topic}")
        print(f"  Partitions: {len(topic.partitions)}")
        print()
except Exception as e:
    print(f"Error listing topics: {e}")

:: loading settings :: url = jar:file:/home/dev/app/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c870f1ad-fd3b-4d47-a2c6-c0abcea66ea7;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 793ms :: artifacts dl 26ms
	:: modules in us

Available Kafka Topics:
--------------------------------------------------
Topic: hosp_labevents
  Partitions: 3

Topic: icu_d_items
  Partitions: 1

Topic: icu_chartevents
  Partitions: 1

Topic: __consumer_offsets
  Partitions: 50



In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType

chartevents_schema = StructType([
    StructField("subject_id", IntegerType()),
    StructField("hadm_id", IntegerType()),
    StructField("stay_id", IntegerType()),
    StructField("caregiver_id", IntegerType()),
    StructField("charttime", StringType()),
    StructField("storetime", StringType()),
    StructField("itemid", IntegerType()),
    StructField("value", StringType()),
    StructField("valuenum", DoubleType()),
    StructField("valueuom", StringType()),
    StructField("warning", StringType())
])

d_items_schema = StructType([
    StructField("itemid", IntegerType()),
    StructField("label", StringType()),
    StructField("abbreviation", StringType()),
    StructField("linksto", StringType()),
    StructField("category", StringType()),
    StructField("unitname", StringType()),
    StructField("param_type", StringType()),
    StructField("lownormalvalue", DoubleType()),
    StructField("highnormalvalue", DoubleType()),
])

In [None]:
from pyspark.sql.functions import from_json, col, expr

df_chartevents_raw = (spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVERS)
    .option('subscribe', 'icu_chartevents')
    .option('startingOffsets', 'earliest')
    .option("maxOffsetsPerTrigger", "50")
    .load()
)

df_d_items_raw = (spark.read
    .format('kafka')
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVERS)
    .option('subscribe', 'icu_d_items')
    .option('startingOffsets', 'earliest')
    .option('endingOffsets', 'latest')
    .load()
)

df_chartevents = (df_chartevents_raw
    .select(from_json(col('value').cast('string'), chartevents_schema).alias('data'))
    .select('data.*')
)

df_d_items = (df_d_items_raw
    .select(from_json(col('value').cast('string'), d_items_schema).alias('data'))
    .select('data.*')
)
df_d_items.cache()

df_joined = df_chartevents.join(df_d_items, on='itemid', how='left')

df_processed = df_joined.withColumn("is_critical", 
    expr("valuenum < lownormalvalue OR valuenum > highnormalvalue")
)

sampled_df = sampler.conditional_sample(
    df_processed, 
    condition_col="is_critical", 
    pass_rate=0.10
)

query = df_processed.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("path", './output') \
    .option("checkpointLocation", './checkpoints') \
    .option("header", "true") \
    .trigger(processingTime="10 second") \
    .start()

# query = (df_d_items.write
#     .format("console")
#     .option("truncate", "false")
#     .trigger(processingTime="5 seconds")
#     .start()
# )

query.awaitTermination(timeout=300)

df_d_items.show()

25/12/04 15:36:05 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+------+--------------------+-------------------+--------------+-------------------+--------+-------------+--------------+---------------+
|itemid|               label|       abbreviation|       linksto|           category|unitname|   param_type|lownormalvalue|highnormalvalue|
+------+--------------------+-------------------+--------------+-------------------+--------+-------------+--------------+---------------+
|220003|  ICU Admission date| ICU Admission date|datetimeevents|                ADT|    NULL|Date and time|          NULL|           NULL|
|220045|          Heart Rate|                 HR|   chartevents|Routine Vital Signs|     bpm|      Numeric|          NULL|           NULL|
|220046|Heart rate Alarm ...|    HR Alarm - High|   chartevents|             Alarms|     bpm|      Numeric|          NULL|           NULL|
|220047|Heart Rate Alarm ...|     HR Alarm - Low|   chartevents|             Alarms|     bpm|      Numeric|          NULL|           NULL|
|220048|        Heart Rhyth