Define data schema.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType

eventSchema = ( StructType()
  .add('InvoiceNo', StringType()) 
  .add('StockCode', StringType())
  .add('Description', StringType()) 
  .add('Quantity', IntegerType()) 
  .add('InvoiceDate', StringType()) 
  .add('UnitPrice', DoubleType()) 
  .add('CustomerID', IntegerType()) 
  .add('Country', StringType())     
)

In [0]:
kafka_df = (spark
                 .readStream
                   .format("kafka")
                   .option("kafka.bootstrap.servers", "localhost:9092")
                   .option("subscribe", "retail_events")
                   .option("startingOffsets", "earliest")
                 .load()
            )

In [0]:
from pyspark.sql.functions import col, from_json, to_date

retail_df = (kafka_df
              .select(from_json(col("value").cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime"))
              .select("message.*", "EventTime")
)

In [0]:
#%fs rm -r /tmp/data-lake/retail_events.parquet

In [0]:
base_path = "/tmp/data-lake/retail_events.parquet"

(retail_df
  .withColumn("EventDate", to_date(retail_df.EventTime))
    .writeStream
      .format('parquet')
      .outputMode("append")
      .trigger(once=True)
      .option('checkpointLocation', base_path + '/_checkpoint')
  .start(base_path)
)

In [0]:
%fs ls dbfs:/tmp/data-lake/retail_events.parquet

path,name,size
dbfs:/tmp/data-lake/retail_events.parquet/_checkpoint/,_checkpoint/,0
dbfs:/tmp/data-lake/retail_events.parquet/_spark_metadata/,_spark_metadata/,0
dbfs:/tmp/data-lake/retail_events.parquet/part-00000-7dc1a77e-db4e-494d-ad79-8c3996546e7e-c000.snappy.parquet,part-00000-7dc1a77e-db4e-494d-ad79-8c3996546e7e-c000.snappy.parquet,4984197
