In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder \
        .appName("kafka-userdata-stream") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .getOrCreate()

In [None]:
reguser_schema = StructType([
    StructField("username", StringType()),
    StructField("userlocation", StringType()),
    StructField("created_ts", StringType())
])

In [None]:
kafka_df = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "<KAFKA_BROKER_VM_IP:9092") \
            .option("subscribe", "userdata") \
            .option("startingOffsets", "latest") \
            .load()

In [None]:
value_df = kafka_df.select(from_json(col("value").cast("string"), reguser_schema).alias("value"))

In [None]:
#value_df.printSchema()

In [None]:
exploded_df = value_df.selectExpr("value.username", "value.userlocation", "value.created_ts")

In [None]:
reguser_writer_query = exploded_df.writeStream \
    .format("json") \
    .queryName("user writer") \
    .outputMode("append") \
    .option("path", "<GCS_BUCKET_OUTPUT>") \
    .option("checkpointLocation", "<GCS_BUCKET_CHK_POINT_DIR>) \
    .trigger(processingTime="1 minute") \
    .start()

In [None]:
reguser_writer_query.awaitTermination()