In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
EH_CONNECTION_STRING = "My-connection-string"

eh_kafka_conf = {
  'kafka.bootstrap.servers': 'mta-real-time.servicebus.windows.net:9093',
  'kafka.sasl.mechanism': 'PLAIN',
  'kafka.security.protocol': 'SASL_SSL',
  'kafka.sasl.jaas.config': f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{EH_CONNECTION_STRING}";'
}

raw_stream_df = (spark
  .readStream
  .format("kafka")
  .options(**eh_kafka_conf)
  .option("subscribe", "bus-positions-topic")
  .option("startingOffsets", "earliest")
  .load()
)

json_stream_df = raw_stream_df.selectExpr("CAST(value AS STRING)", "timestamp as kafka_timestamp")

In [0]:
trip_update_schema_bronze = StructType([
    StructField("trip_id", StringType(), True),
    StructField("start_date", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("direction_id", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("update_timestamp", StringType(), True),
    StructField("trip_delay_seconds", StringType(), True),
    StructField("stop_sequence", StringType(), True),
    StructField("stop_id", StringType(), True),
    StructField("arrival_time", StringType(), True),
    StructField("departure_time", StringType(), True),
    StructField("message_type", StringType(), True)
])

bus_position_schema_bronze = StructType([
    StructField("vehicle_id", StringType(), True),
    StructField("trip_id", StringType(), True),
    StructField("start_date", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("direction_id", StringType(), True),
    StructField("stop_id", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("bearing", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("message_type", StringType(), True)
])

alert_schema_bronze = StructType([
    StructField("alert_id", StringType(), True),
    StructField("start_time", StringType(), True), 
    StructField("end_time", StringType(), True),   
    StructField("header_text", StringType(), True),
    StructField("description_text", StringType(), True),
    StructField("agency_id", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("stop_id", StringType(), True),
    StructField("trip_route_id", StringType(), True),
    StructField("trip_direction_id", StringType(), True),
    StructField("message_type", StringType(), True)
])

In [0]:
from pyspark.sql.functions import from_json, col

common_schema = StructType([StructField("message_type", StringType())])
typed_stream_df = json_stream_df.withColumn("parsed_value", from_json(col("value"), common_schema))

trip_updates_bronze_df = (typed_stream_df
  .filter(col("parsed_value.message_type") == "trip_update")
  .withColumn("data", from_json(col("value"), trip_update_schema_bronze))
  .select("kafka_timestamp", "data.*")
)

bus_positions_bronze_df = (typed_stream_df
  .filter(col("parsed_value.message_type") == "bus_position")
  .withColumn("data", from_json(col("value"), bus_position_schema_bronze))
  .select("kafka_timestamp", "data.*")
)

alerts_bronze_df = (typed_stream_df
  .filter(col("parsed_value.message_type") == "alert")
  .withColumn("data", from_json(col("value"), alert_schema_bronze))
  .select("kafka_timestamp", "data.*")
)

In [0]:
TRIP_UPDATES_BRONZE_PATH = "/mta_project/delta/bronze/trip_updates"
BUS_POSITIONS_BRONZE_PATH = "/mta_project/delta/bronze/bus_positions"
ALERTS_BRONZE_PATH = "/mta_project/delta/bronze/alerts"

(trip_updates_bronze_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{TRIP_UPDATES_BRONZE_PATH}/_checkpoints")
  .trigger(availableNow=True)
  .start(TRIP_UPDATES_BRONZE_PATH)
)

(bus_positions_bronze_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{BUS_POSITIONS_BRONZE_PATH}/_checkpoints")
  .trigger(availableNow=True)
  .start(BUS_POSITIONS_BRONZE_PATH)
)

(alerts_bronze_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{ALERTS_BRONZE_PATH}/_checkpoints")
  .trigger(availableNow=True)
  .start(ALERTS_BRONZE_PATH)
)

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f1880753980>