# ðŸš€ NASA GCN Data Pipeline

Pipeline **Delta Live Tables (DLT)** para ingestÃ£o de eventos astronÃ´micos da NASA GCN.

In [None]:
import dlt
import sys
import struct
from datetime import datetime, timedelta

sys.path.append(spark.conf.get("bundle.sourcePath", "."))

from pyspark.sql.functions import (
    col, decode, split, current_timestamp, current_date, udf,
    from_json, regexp_extract, concat_ws, length, size, when, to_timestamp, lit,
    expr, trim, get_json_object, coalesce, regexp_replace
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

from nasa_gcn.config import get_kafka_options

In [None]:
PACKET_TYPE_NAMES = {
    1: "BATSE_ORIGINAL", 2: "TEST", 3: "IMALIVE", 4: "KILL",
    11: "BATSE_MAXBC", 21: "BRADFORD_TEST", 22: "BATSE_FINAL", 24: "BATSE_LOCBURST", 25: "ALEXIS",
    26: "RXTE_PCA_ALERT", 27: "RXTE_PCA", 28: "RXTE_ASM_ALERT", 29: "RXTE_ASM", 30: "COMPTEL",
    31: "IPN_RAW", 32: "IPN_SEGMENT", 33: "SAX_WFC_ALERT", 34: "SAX_WFC",
    35: "SAX_NFI_ALERT", 36: "SAX_NFI", 37: "RXTE_ASM_XTRANS", 38: "SPARE_TESTING", 39: "IPN_POSITION",
    40: "HETE_S/C_ALERT", 41: "HETE_S/C_UPDATE", 42: "HETE_S/C_LAST",
    43: "HETE_GNDANA", 44: "HETE_TEST", 45: "GRB_COUNTERPART",
    46: "SWIFT_TOO_FOM_OBSERVE", 47: "SWIFT_TOO_SC_SLEW", 48: "DOW_TOD_TEST",
    51: "INTEGRAL_POINTDIR", 52: "INTEGRAL_SPIACS", 53: "INTEGRAL_WAKEUP",
    54: "INTEGRAL_REFINED", 55: "INTEGRAL_OFFLINE", 56: "INTEGRAL_WEAK",
    57: "AAVSO", 58: "MILAGRO", 59: "KONUS_LIGHTCURVE",
    60: "SWIFT_BAT_GRB_ALERT", 61: "SWIFT_BAT_GRB_POSITION",
    62: "SWIFT_BAT_GRB_NACK", 63: "SWIFT_BAT_GRB_LC",
    64: "SWIFT_BAT_SCALED_MAP", 65: "SWIFT_FOM_OBSERVE", 66: "SWIFT_SC_SLEW",
    67: "SWIFT_XRT_POSITION", 68: "SWIFT_XRT_SPECTRUM", 69: "SWIFT_XRT_IMAGE",
    70: "SWIFT_XRT_LIGHTCURVE", 71: "SWIFT_XRT_NACK_POSITION",
    72: "SWIFT_UVOT_IMAGE", 73: "SWIFT_UVOT_SRC_LIST",
    76: "SWIFT_BAT_GRB_PROC_LC", 77: "SWIFT_XRT_PROC_SPECTRUM",
    78: "SWIFT_XRT_PROC_IMAGE", 79: "SWIFT_UVOT_PROC_IMAGE", 80: "SWIFT_UVOT_PROC_SRC_LIST",
    81: "SWIFT_UVOT_POSITION", 82: "SWIFT_BAT_GRB_POS_TEST", 83: "SWIFT_POINTDIR",
    84: "SWIFT_BAT_TRANS", 85: "SWIFT_XRT_THRESHPIX", 86: "SWIFT_XRT_THRESHPIX_PROC",
    87: "SWIFT_XRT_SPER", 88: "SWIFT_XRT_SPER_PROC", 89: "SWIFT_UVOT_NACK_POSITION",
    97: "SWIFT_BAT_QUICKLOOK_POSITION", 98: "SWIFT_BAT_SUBTHRESHOLD_POSITION",
    99: "SWIFT_BAT_SLEW_GRB_POSITION", 103: "SWIFT_ACTUAL_POINTDIR",
    133: "SWIFT_BAT_MONITOR", 140: "SWIFT_BAT_SUB_SUB_THRESH_POS", 141: "SWIFT_BAT_KNOWN_SRC_POS",
    100: "SUPERAGILE_GRB_WAKEUP", 101: "SUPERAGILE_GRB_GROUND", 102: "SUPERAGILE_GRB_REFINED",
    105: "AGILE_MCAL_ALERT", 107: "AGILE_POINTDIR", 109: "SUPERAGILE_GRB_POS_TEST",
    110: "FERMI_GBM_ALERT", 111: "FERMI_GBM_FLT_POS", 112: "FERMI_GBM_GND_POS",
    114: "FERMI_GBM_GND_INTERNAL", 115: "FERMI_GBM_FINAL_POS",
    116: "FERMI_GBM_ALERT_INTERNAL", 117: "FERMI_GBM_FLT_INTERNAL", 119: "FERMI_GBM_POS_TEST",
    131: "FERMI_GBM_SUBTHRESHOLD",
    120: "FERMI_LAT_GRB_POS_INI", 121: "FERMI_LAT_GRB_POS_UPD", 122: "FERMI_LAT_GRB_POS_DIAG",
    123: "FERMI_LAT_TRANS", 124: "FERMI_LAT_GRB_POS_TEST", 125: "FERMI_LAT_MONITOR",
    126: "FERMI_SC_SLEW", 127: "FERMI_LAT_GND", 128: "FERMI_LAT_OFFLINE", 129: "FERMI_POINTDIR",
    144: "FERMI_SC_SLEW_INTERNAL", 146: "FERMI_GBM_FIN_POS_INTERNAL",
    130: "SIMBAD_NED_SEARCH_RESULTS", 134: "MAXI_UNKNOWN_SOURCE", 135: "MAXI_KNOWN_SOURCE",
    136: "MAXI_TEST", 137: "OGLE", 139: "MOA", 145: "COINCIDENCE", 148: "SUZAKU_LIGHTCURVE", 149: "SNEWS",
    150: "LVC_PRELIMINARY", 151: "LVC_INITIAL", 152: "LVC_UPDATE",
    153: "LVC_TEST", 154: "LVC_COUNTERPART", 163: "LVC_EARLY_WARNING", 164: "LVC_RETRACTION",
    157: "AMON_ICECUBE_COINC", 158: "AMON_ICECUBE_HESE", 159: "AMON_ICECUBE_TEST",
    160: "CALET_GBM_FLT_LC", 161: "CALET_GBM_GND_LC", 166: "AMON_ICECUBE_CLUSTER",
    168: "GWHEN_COINC", 169: "AMON_ICECUBE_EHE", 170: "AMON_ANTARES_FERMILAT_COINC",
    171: "HAWC_BURST_MONITOR", 172: "AMON_NU_EM_COINC",
    173: "ICECUBE_ASTROTRACK_GOLD", 174: "ICECUBE_ASTROTRACK_BRONZE",
    175: "SK_SUPERNOVA", 176: "AMON_ICECUBE_CASCADE",
    188: "GECAM_FLT", 189: "GECAM_GND",
}

PARSED_BINARY_SCHEMA = """
    pkt_type INT, pkt_type_name STRING, pkt_sernum INT, trig_num INT,
    burst_tjd INT, burst_sod_centi INT, burst_datetime STRING,
    burst_ra_deg DOUBLE, burst_dec_deg DOUBLE, burst_error_deg DOUBLE,
    trigger_id INT, misc INT, parse_error STRING
"""

def parse_gcn_binary_packet(binary_data):
    result = {"pkt_type": None, "pkt_type_name": None, "pkt_sernum": None,
              "trig_num": None, "burst_tjd": None, "burst_sod_centi": None,
              "burst_datetime": None, "burst_ra_deg": None, "burst_dec_deg": None,
              "burst_error_deg": None, "trigger_id": None, "misc": None, "parse_error": None}
    if binary_data is None:
        result["parse_error"] = "binary_data is None"
        return result
    if len(binary_data) != 160:
        result["parse_error"] = f"Invalid packet size: {len(binary_data)} bytes"
        return result
    try:
        longs = struct.unpack('>40i', binary_data)
        pkt_type = longs[0]
        result["pkt_type"] = pkt_type
        result["pkt_type_name"] = PACKET_TYPE_NAMES.get(pkt_type, f"UNKNOWN_{pkt_type}")
        result["pkt_sernum"] = longs[1]
        result["trig_num"] = longs[4] if longs[4] > 0 else None
        burst_tjd, burst_sod = longs[5], longs[6]
        result["burst_tjd"], result["burst_sod_centi"] = burst_tjd, burst_sod
        if burst_tjd > 0 and burst_sod >= 0:
            try:
                burst_dt = datetime(1968, 5, 24) + timedelta(days=burst_tjd, seconds=burst_sod / 100.0)
                result["burst_datetime"] = burst_dt.isoformat()
            except: pass
        burst_ra, burst_dec, burst_error = longs[7], longs[8], longs[11]
        scale = 10000 if (burst_ra > 36000 or burst_ra < 0 or abs(burst_dec) > 9000) else 100
        ra_deg, dec_deg = burst_ra / scale, burst_dec / scale
        result["burst_ra_deg"] = ra_deg if 0 <= ra_deg < 360 else None
        result["burst_dec_deg"] = dec_deg if -90 <= dec_deg <= 90 else None
        result["burst_error_deg"] = abs(burst_error) / scale
        result["trigger_id"], result["misc"] = longs[18], longs[19]
    except Exception as e:
        result["parse_error"] = str(e)
    return result

parse_binary_udf = udf(parse_gcn_binary_packet, PARSED_BINARY_SCHEMA)

## ðŸ¥‰ Bronze Layer

In [None]:
@dlt.table(name="gcn_raw", comment="Raw NASA GCN Kafka messages - Bronze layer",
           table_properties={"quality": "bronze"})
def gcn_raw():
    kafka_options = get_kafka_options()
    return (spark.readStream.format("kafka").options(**kafka_options).load()
        .select(
            col("key").cast("string").alias("message_key"),
            col("value"), col("topic"), col("partition"), col("offset"),
            col("timestamp").alias("kafka_timestamp"),
            current_timestamp().alias("ingestion_timestamp")))

## ðŸ¥ˆ Silver Layer

In [None]:
@dlt.table(name="gcn_classic_text", comment="Classic text format GCN alerts optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def gcn_classic_text():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic").startswith("gcn.classic.text."))
        .withColumn("text_decoded", decode(col("value"), "UTF-8"))
        .select(
            col("message_key"),
            col("text_decoded").alias("message_text"),
            col("topic"), 
            split(col("topic"), r"\.").getItem(3).alias("event_type"),
            # Extracted RAG Fields
            regexp_extract(col("text_decoded"), r"TITLE:\s+(.*?)(?=\n)", 1).alias("title"),
            regexp_extract(col("text_decoded"), r"NOTICE_DATE:\s+(.*?)(?=\n)", 1).alias("notice_date"),
            regexp_extract(col("text_decoded"), r"NOTICE_TYPE:\s+(.*?)(?=\n)", 1).alias("notice_type"),
            # Document Text for RAG
            col("text_decoded").alias("document_text"),
            col("kafka_timestamp"), col("ingestion_timestamp"), col("partition"), col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))

In [None]:
@dlt.table(name="gcn_classic_voevent", comment="Classic VoEvent XML format GCN alerts optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def gcn_classic_voevent():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic").startswith("gcn.classic.voevent."))
        .withColumn("xml_str", decode(col("value"), "UTF-8"))
        .select(
            col("message_key"),
            col("xml_str").alias("voevent_xml"),
            col("topic"), 
            split(col("topic"), r"\.").getItem(3).alias("event_type"),
            # XPath Extractions for RAG
            expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/@ivorn')").alias("ivorn"),
            expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/@role')").alias("role"),
            expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/*[local-name()=\"Who\"]/*[local-name()=\"Date\"]')").alias("date"),
            expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/*[local-name()=\"Why\"]/*[local-name()=\"Inference\"]/*[local-name()=\"Concept\"]')").alias("concept"),
            # Document Text for RAG
            concat_ws(" | ",
                 concat_ws(": ", lit("ID"), expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/@ivorn')")),
                 concat_ws(": ", lit("ROLE"), expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/@role')")),
                 concat_ws(": ", lit("DATE"), expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/*[local-name()=\"Who\"]/*[local-name()=\"Date\"]')")),
                 concat_ws(": ", lit("CONCEPT"), expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/*[local-name()=\"Why\"]/*[local-name()=\"Inference\"]/*[local-name()=\"Concept\"]')")),
                 concat_ws(": ", lit("DESCRIPTION"), expr("xpath_string(xml_str, '/*[local-name()=\"VOEvent\"]/*[local-name()=\"How\"]/*[local-name()=\"Description\"]')"))
            ).alias("document_text"),
            col("kafka_timestamp"), col("ingestion_timestamp"), col("partition"), col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))

In [None]:
@dlt.table(name="gcn_classic_binary", comment="Classic binary format GCN alerts optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def gcn_classic_binary():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic").startswith("gcn.classic.binary."))
        .withColumn("parsed", parse_binary_udf(col("value")))
        .select(
            col("message_key"),
            col("parsed.pkt_type"), col("parsed.pkt_type_name"), col("parsed.pkt_sernum"),
            col("parsed.trig_num"), col("parsed.burst_datetime"),
            col("parsed.burst_ra_deg"), col("parsed.burst_dec_deg"), col("parsed.burst_error_deg"),
            col("parsed.trigger_id"), col("parsed.parse_error"),
            col("topic"), split(col("topic"), r"\.").getItem(3).alias("event_type"),
            # Document Text for RAG
            concat_ws(" | ",
                concat_ws(": ", lit("TYPE"), col("parsed.pkt_type_name")),
                when(col("parsed.trig_num").isNotNull(), concat_ws(": ", lit("TRIG_NUM"), col("parsed.trig_num"))),
                when(col("parsed.burst_datetime").isNotNull(), concat_ws(": ", lit("DATE"), col("parsed.burst_datetime"))),
                when(col("parsed.burst_ra_deg").isNotNull(),
                    concat_ws(", ", concat_ws(": ", lit("RA"), col("parsed.burst_ra_deg")), 
                                    concat_ws(": ", lit("DEC"), col("parsed.burst_dec_deg"))))
            ).alias("document_text"),
            col("kafka_timestamp"), col("ingestion_timestamp"), col("partition"), col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date"),
            col("value").alias("raw_binary")))

### GCN Notices - Otimizado para RAG

In [None]:
# FunÃ§Ã£o auxiliar para limpar arrays JSON e extrair primeiro elemento
def clean_json_id(id_col):
    """Remove colchetes de arrays JSON e extrai primeiro elemento."""
    return regexp_replace(regexp_replace(id_col, r'^\["?', ''), r'"?\]$', '')

@dlt.table(name="gcn_notices", comment="New JSON format GCN notices optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def gcn_notices():
    """GCN Notices com campos comuns extraÃ­dos para RAG.
    
    Suporta mÃºltiplas missÃµes: IceCube, Super-Kamiokande, Einstein Probe, Fermi, Swift.
    Trata arrays JSON e campos nulos de diferentes schemas.
    """
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic").startswith("gcn.notices."))
        .withColumn("json_str", decode(col("value"), "UTF-8"))
        # Extrair campos raw
        .withColumn("_mission", get_json_object(col("json_str"), "$.mission"))
        .withColumn("_messenger", get_json_object(col("json_str"), "$.messenger"))
        .withColumn("_alert_type", get_json_object(col("json_str"), "$.alert_type"))
        .withColumn("_id_raw", coalesce(
            get_json_object(col("json_str"), "$.id"),
            get_json_object(col("json_str"), "$.event_name"),
            get_json_object(col("json_str"), "$.trigger_id")
        ))
        .select(
            col("message_key"),
            col("json_str").alias("notice_json"),
            col("topic"),
            # MissÃ£o do tÃ³pico
            split(col("topic"), r"\.").getItem(2).alias("mission"),
            # Campos extraÃ­dos com fallback para topic
            coalesce(col("_mission"), split(col("topic"), r"\.").getItem(2)).alias("mission_name"),
            get_json_object(col("json_str"), "$.instrument").alias("instrument"),
            coalesce(col("_messenger"), lit("Unknown")).alias("messenger"),
            # Notice ID limpo (sem colchetes de array)
            clean_json_id(col("_id_raw")).alias("notice_id"),
            get_json_object(col("json_str"), "$.pipeline").alias("pipeline"),
            coalesce(col("_alert_type"), lit("notice")).alias("alert_type"),
            get_json_object(col("json_str"), "$.alert_tense").alias("alert_tense"),
            # Timestamps
            get_json_object(col("json_str"), "$.trigger_time").alias("trigger_time"),
            get_json_object(col("json_str"), "$.alert_datetime").alias("alert_datetime"),
            # Coordenadas
            get_json_object(col("json_str"), "$.ra").cast("double").alias("ra"),
            get_json_object(col("json_str"), "$.dec").cast("double").alias("dec"),
            get_json_object(col("json_str"), "$.ra_dec_error").alias("ra_dec_error"),
            get_json_object(col("json_str"), "$.containment_probability").alias("containment_probability"),
            # Campos especÃ­ficos de neutrinos
            get_json_object(col("json_str"), "$.n_events").alias("n_events"),
            get_json_object(col("json_str"), "$.nu_energy").alias("nu_energy"),
            get_json_object(col("json_str"), "$.p_astro").alias("p_astro"),
            get_json_object(col("json_str"), "$.luminosity_distance").alias("luminosity_distance"),
            # Metadados
            length(col("json_str")).alias("json_length"),
            # Document text para RAG (sÃ³ inclui campos nÃ£o-nulos)
            concat_ws(" | ",
                when(col("_mission").isNotNull(), concat_ws(": ", lit("MISSION"), col("_mission"))),
                when(col("_messenger").isNotNull(), concat_ws(": ", lit("MESSENGER"), col("_messenger"))),
                when(col("_alert_type").isNotNull(), concat_ws(": ", lit("TYPE"), col("_alert_type"))),
                when(col("_id_raw").isNotNull(), concat_ws(": ", lit("ID"), clean_json_id(col("_id_raw")))),
                when(get_json_object(col("json_str"), "$.ra").isNotNull(),
                    concat_ws(": ", lit("RA"), get_json_object(col("json_str"), "$.ra"))),
                when(get_json_object(col("json_str"), "$.dec").isNotNull(),
                    concat_ws(": ", lit("DEC"), get_json_object(col("json_str"), "$.dec")))
            ).alias("document_text"),
            col("kafka_timestamp"),
            col("ingestion_timestamp"),
            col("partition"),
            col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))

### GCN Circulars - Otimizado para RAG

In [None]:
CIRCULAR_SCHEMA = "circularId INT, eventId STRING, subject STRING, body STRING, submitter STRING, submittedHow STRING, createdOn LONG, format STRING"

@dlt.table(name="gcn_circulars", comment="GCN Circulars - astronomer reports optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def gcn_circulars():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic") == "gcn.circulars")
        .withColumn("json_str", decode(col("value"), "UTF-8"))
        .withColumn("parsed", from_json(col("json_str"), CIRCULAR_SCHEMA))
        .select(
            col("message_key"),
            col("json_str").alias("circular_json"),
            col("parsed.circularId").alias("circular_id"),
            col("parsed.eventId").alias("event_id"),
            col("parsed.subject").alias("subject"),
            col("parsed.body").alias("body"),
            col("parsed.submitter").alias("submitter"),
            trim(regexp_extract(col("parsed.submitter"), r"^([^<]+)", 1)).alias("submitter_name"),
            regexp_extract(col("parsed.submitter"), r"<([^>]+)>", 1).alias("submitter_email"),
            col("parsed.submittedHow").alias("submitted_how"),
            (col("parsed.createdOn") / 1000).cast("timestamp").alias("created_on"),
            regexp_extract(col("parsed.eventId"), r"^([A-Z]+)", 1).alias("event_type"),
            size(split(trim(col("parsed.body")), " ")).alias("word_count"),
            length(col("parsed.body")).alias("char_count"),
            concat_ws("\n",
                concat_ws(": ", lit("SUBJECT"), col("parsed.subject")),
                concat_ws(": ", lit("EVENT"), col("parsed.eventId")),
                concat_ws(": ", lit("AUTHOR"), trim(regexp_extract(col("parsed.submitter"), r"^([^<]+)", 1))),
                lit("---"),
                col("parsed.body")
            ).alias("document_text"),
            col("topic"),
            col("kafka_timestamp"),
            col("ingestion_timestamp"),
            col("partition"),
            col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))

In [None]:
@dlt.table(name="igwn_gwalert", comment="IGWN Gravitational Wave Alerts optimized for RAG - Silver layer",
           table_properties={"quality": "silver"})
def igwn_gwalert():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic") == "igwn.gwalert")
        .withColumn("json_str", decode(col("value"), "UTF-8"))
        .withColumn("superevent_id", get_json_object(col("json_str"), "$.superevent_id"))
        .withColumn("alert_type", get_json_object(col("json_str"), "$.alert_type"))
        .withColumn("group", get_json_object(col("json_str"), "$.event.group"))
        .withColumn("pipeline", get_json_object(col("json_str"), "$.event.pipeline"))
        .withColumn("instruments", regexp_replace(get_json_object(col("json_str"), "$.event.instruments"), r"[\"\[\]]", ""))
        .withColumn("significant", get_json_object(col("json_str"), "$.event.significant"))
        .withColumn("gracedb_url", get_json_object(col("json_str"), "$.urls.gracedb"))
        .withColumn("prob_bns", get_json_object(col("json_str"), "$.event.classification.BNS"))
        .withColumn("prob_bbh", get_json_object(col("json_str"), "$.event.classification.BBH"))
        .withColumn("prob_nsbh", get_json_object(col("json_str"), "$.event.classification.NSBH"))
        .withColumn("prob_terrestrial", get_json_object(col("json_str"), "$.event.classification.Terrestrial"))
        .withColumn("prob_has_ns", get_json_object(col("json_str"), "$.event.properties.HasNS"))
        .withColumn("prob_has_remnant", get_json_object(col("json_str"), "$.event.properties.HasRemnant"))
        .withColumn("far", get_json_object(col("json_str"), "$.event.far"))
        .select(
            col("message_key"),
            col("json_str").alias("gwalert_json"),
            col("topic"),
            col("superevent_id"),
            col("alert_type"),
            get_json_object(col("json_str"), "$.time_created").alias("time_created"),
            col("gracedb_url"),
            col("significant"),
            col("group"),
            col("pipeline"),
            col("far"),
            col("instruments"),
            col("prob_bns"),
            col("prob_nsbh"),
            col("prob_bbh"),
            col("prob_terrestrial"),
            col("prob_has_ns"),
            col("prob_has_remnant"),
            # Document Text for RAG with null checks to avoid empty labels
            concat_ws(" | ",
                when(col("superevent_id").isNotNull(), concat_ws(": ", lit("ID"), col("superevent_id"))),
                when(col("alert_type").isNotNull(), concat_ws(": ", lit("TYPE"), col("alert_type"))),
                when(col("group").isNotNull(), concat_ws(": ", lit("GROUP"), col("group"))),
                when(col("pipeline").isNotNull(), concat_ws(": ", lit("PIPELINE"), col("pipeline"))),
                when(length(col("instruments")) > 0, concat_ws(": ", lit("INSTRUMENTS"), col("instruments"))),
                when(col("significant").isNotNull(), concat_ws(": ", lit("SIGNIFICANT"), col("significant"))),
                when(col("gracedb_url").isNotNull(), concat_ws(": ", lit("URL"), col("gracedb_url")))
            ).alias("document_text"),
            col("kafka_timestamp"), col("ingestion_timestamp"),
            col("partition"), col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))

In [None]:
@dlt.table(name="gcn_heartbeat", comment="GCN Heartbeat messages for monitoring - Silver layer",
           table_properties={"quality": "silver"})
def gcn_heartbeat():
    return (dlt.read_stream("gcn_raw")
        .filter(col("topic") == "gcn.heartbeat")
        .select(
            col("message_key"), decode(col("value"), "UTF-8").alias("heartbeat_json"),
            col("topic"), col("kafka_timestamp"), col("ingestion_timestamp"),
            col("partition"), col("offset"),
            current_timestamp().alias("silver_processed_timestamp"),
            current_date().alias("silver_processed_date")))