In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (SparkSession.builder
         .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.3.3")
         .getOrCreate()
        )

schema = (spark.read
             .parquet("output/kafka_0.parquet/*.parquet")
            ).schema

kafka_df = (spark.readStream
     .format("parquet")
     .schema(schema)
     .option("path", "output/kafka_0.parquet/*.parquet")
     .load()
)


In [None]:
json_schema = """
STRUCT<location: STRUCT<street: STRUCT<number: INT,
                                name: STRING>,
    city: STRING,
    state: STRING,
    country: STRING,
    postcode: INT,
    coordinates: STRUCT<latitude: STRING,
                        longitude: STRING>,
    timezone: STRUCT<offset: STRING,
                    description: STRING>
                >
>
                """


In [None]:
kafka_df = (kafka_df
    .select(F.from_json(F.col("value").cast("string"), json_schema).alias("json"),
            F.col("timestamp").alias("ts"))
    .select("ts","json.location.state")
    .withWatermark("ts", "15 second")
    .groupBy("state",F.window("ts","10 second"))
    .count()
    .select("state", "window.start", "window.end", "count")
)

In [None]:
kafka_df.printSchema()

In [None]:
(kafka_df.writeStream 
    .outputMode("append") 
    .format("org.elasticsearch.spark.sql") 
    .option("checkpointLocation", "checkpoints/elastic_window") 
    .option("es.resource", "kafka_window") 
    .option("es.nodes", "elasticsearch") 
    .start()
    .awaitTermination()
)