In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, DoubleType, StructType, BooleanType, ArrayType, IntegerType
import os
import logging

In [22]:
KAFKA_TOPIC_NAME_CONS = os.environ.get('INPUT_TOPIC')
KAFKA_OUTPUT_TOPIC_NAME_CONS = os.environ.get('OUTPUT_TOPIC')
KAFKA_BOOTSTRAP_SERVERS_CONS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS')

logging.basicConfig(level=logging.INFO)

In [23]:
spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
        .getOrCreate()

spark.sparkContext.setLogLevel("INFO")

schema = StructType().add("id", IntegerType()) \
                     .add("license_no", StringType()) \
                     .add("status", StringType()) \
                     .add("is_detected", BooleanType()) \
                     .add("detected_timestamp", ArrayType(IntegerType())) \
                     .add("detected_frame", ArrayType(StringType()))
stock_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
        .option("startingOffsets", "earliest") \
        .load()

In [24]:
stock_df1 = stock_df.selectExpr("CAST(value AS STRING)", "timestamp")

In [None]:
stock_df2 = stock_df1\
        .select(from_json(col("value"), schema).alias("value_columns"), "timestamp")

# selecting all columns in the "value_columns" column
stock_df3 = stock_df2.select("value_columns.*", "timestamp")

In [None]:
stock_df3.dtypes

## Writing final dataframe to the output topic

In [25]:
current_directory = os.getcwd()

# Set the checkpoint location to the current directory
checkpoint_location = os.path.join(current_directory, "checkpoint")

In [None]:
stock_write_stream = stock_df1.selectExpr("to_json(struct(*)) AS value") \
                              .writeStream \
                              .format("kafka") \
                              .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
                              .option("topic", KAFKA_OUTPUT_TOPIC_NAME_CONS) \
                              .trigger(processingTime='1 seconds') \
                              .outputMode("update") \
                              .option("checkpointLocation", checkpoint_location) \
                              .start()

stock_write_stream.awaitTermination()

24/03/14 09:33:05 INFO ResolveWriteToStream: Checkpoint root /app/checkpoint resolved to file:/app/checkpoint.
24/03/14 09:33:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/14 09:33:05 INFO MicroBatchExecution: Starting [id = fecb0560-68f2-42df-9c50-32c870e251ce, runId = 34e3138d-a1e4-4da7-bddd-a12597b03fd5]. Use file:/app/checkpoint to store the query checkpoint.
24/03/14 09:33:05 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@6500554e] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@6bc7a646]
24/03/14 09:33:05 INFO OffsetSeqLog: BatchIds found from listing: 
24/03/14 09:33:05 INFO OffsetSeqLog: BatchIds found from listing: 
24/03/14 09:33:05 INFO MicroBatchExecution: Starting new streaming query.
24/03/14 09:33:05 INFO MicroBatchExecution: Stream started from {}
24/03/14 09:33:05 INFO AdminClientConfig: Admin