In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Read Kinesis stream
df = (
    spark.readStream.format("kinesis")
    .option("streamName", "demo-raw-events")
    .option("initialPosition", "trim_horizon")
    .option("region", "ap-southeast-1")
    .option("awsAccessKey", dbutils.secrets.get("aws", "demo_access_key"))
    .option("awsSecretKey", dbutils.secrets.get("aws", "demo_secret_key"))
    .option("encoding", "UTF-8")
    .load()
)

In [0]:
df = df.selectExpr("CAST(data as STRING) json_data")

# assuming that the JSON string is enclosed within {}
df = df.withColumn('json_col', regexp_extract('json_data', r'\{.*\}', 0))

In [0]:
schema = StructType([
    StructField("schema", StringType(), True),
    StructField("data", ArrayType(StructType([
        StructField("e", StringType(), True),
        StructField("se_ac", StringType(), True),
        StructField("se_pr", StringType(), True),
        StructField("se_va", DecimalType(10,2), True),
        StructField("eid", StringType(), True),
        StructField("dtm", StringType(), True),
        StructField("p", StringType(), True),
        StructField("tv", StringType(), True),
        StructField("uid", StringType(), True),
        StructField("tz", StringType(), True),
        StructField("res", StringType(), True),
        StructField("stm", StringType(), True)
        ])))
])

df = df.withColumn("data", from_json("json_col",schema)).select(col('data.*'))
df = df.withColumn("extract_data", explode(col("data"))).select( 'extract_data.*')

#df.display()

In [0]:
df.limit(20).display()

In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation","dbfs_path")\
    .table("demo_catalog.demo_schema.streaming_table")

In [0]:
#