In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, explode, to_json

BROKER_IP = "localhost"
BROKER_PORT = "9092"
TOPIC = "darwin"

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Read Kafka Topic into a Spark Streaming DataFrame
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f"{BROKER_IP}:{BROKER_PORT},broker:29092") \
  .option("subscribe", TOPIC) \
  .load()

# Create Schema for JSON value from Kafka Topic
# attr_schema = StructType([StructField("tag", StringType()),
#                           StructField("attributes", StringType()),
#                           StructField("text", StringType())]
#                         )
attr_schema = MapType(StringType(), StringType())


jsonschema = StructType([StructField("timestamp", TimestampType()),
                         StructField("elements", ArrayType(attr_schema))
                        ]
                       )

## Read into JSON from Kafka
base_df = df.selectExpr("CAST(value as STRING)")\
            .select(from_json(col("value"), jsonschema).alias("value"))

In [2]:
! rm -rf data
! rm -rf checkpoints
! mkdir data
! mkdir checkpoints

In [3]:
base_df.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- timestamp: timestamp (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)



In [4]:
base_df = base_df.select("value.*") \
                 .select(explode("elements"))

In [5]:
base_df.columns

['col']

In [6]:
base_df.writeStream \
       .format("parquet") \
       .option("path", "./data") \
       .option("checkpointLocation", "./checkpoints/") \
       .outputMode("append") \
       .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5f0047e1d0>

In [7]:
base_df.writeStream \
       .format("console") \
       .outputMode("append") \
       .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5ebadcce10>

In [8]:
base_df.isStreaming

True

In [9]:
base_df.

SyntaxError: invalid syntax (3033829300.py, line 1)

In [None]:
# .select(to_json(col("col")).alias("each_element")) \
# .select(from_json(col("each_element"), attr_schema)) \