In [0]:
# Databricks notebook source
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType
from dotenv import load_dotenv
import os
load_dotenv()

# COMMAND ----------

raw_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "") \
      .option("subscribe", "opensky-flights") \
      .option("startingOffsets", "earliest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";""") \
    .load()


In [0]:
# Parse JSON into columns (assuming flat structure)
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType, BooleanType
catalog_name = 'opensky'

In [0]:
json_df = raw_df.selectExpr("CAST(value AS STRING) as json")

schema = StructType([
    StructField("icao24", StringType()),
    StructField("callsign", StringType()),
    StructField("origin_country", StringType()),
    StructField("time_position", StringType()),
    StructField("last_contact", StringType()),
    StructField("longitude", StringType()),
    StructField("latitude", StringType()),
    StructField("geo_altitude", StringType()),
    StructField("on_ground", StringType()),
    StructField("velocity", StringType()),
    StructField("heading", StringType()),
    StructField("vertical_rate", StringType()),
    StructField("baro_altitude", StringType()),
    StructField("squawk", StringType()),
    StructField("spi", StringType()),
    StructField("position_source", StringType()),
    StructField("ingest_time", StringType())
])

parsed_df = json_df.select(from_json(col("json"), schema).alias("data")).seopensky.default.rawlect("data.*")

display(parsed_df.limit(5))

In [0]:
parsed_df.writeStream\
    .format('delta')\
        .option('checkpointLocation','./checkpoint')\
            .option('availableNow',True)\
            .option('writemode','append')\
                .table(f'{catalog_name}.default.raw')