In [3]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('wiki-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5")
         .getOrCreate())
sc = spark.sparkContext

In [4]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-server:9092") # kafka server
  .option("subscribe", "wiki-changes") # topic
  .load())

In [6]:
# Start query stream over stream dataframe
queryStreamMem =(
    df
    .writeStream
    .format("memory")
    .queryName("wiki_changes")
    .outputMode("append")
    .start())

In [9]:
from pyspark.sql.functions import from_json, col, from_unixtime, to_date, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType, IntegerType
from time import sleep
from IPython.display import clear_output

# Event data schema
schema_wiki = StructType(
    [StructField("$schema",StringType(),True),
     StructField("bot",BooleanType(),True),
     StructField("comment",StringType(),True),
     StructField("id",StringType(),True),
     StructField("length",
                 StructType(
                     [StructField("new",IntegerType(),True),
                      StructField("old",IntegerType(),True)]),True),
     StructField("meta",
                 StructType(
                     [StructField("domain",StringType(),True),
                      StructField("dt",StringType(),True),
                      StructField("id",StringType(),True),
                      StructField("offset",LongType(),True),
                      StructField("partition",LongType(),True),
                      StructField("request_id",StringType(),True),
                      StructField("stream",StringType(),True),
                      StructField("topic",StringType(),True),
                      StructField("uri",StringType(),True)]),True),
     StructField("minor",BooleanType(),True),
     StructField("namespace",IntegerType(),True),
     StructField("parsedcomment",StringType(),True),
     StructField("patrolled",BooleanType(),True),
     StructField("revision",
                 StructType(
                     [StructField("new",IntegerType(),True),
                      StructField("old",IntegerType(),True)]),True),
     StructField("server_name",StringType(),True),
     StructField("server_script_path",StringType(),True),
     StructField("server_url",StringType(),True),
     StructField("timestamp",StringType(),True),
     StructField("title",StringType(),True),
     StructField("type",StringType(),True),
     StructField("user",StringType(),True),
     StructField("wiki",StringType(),True)])

try:
    i=1
    # While stream is active, load parquet files 
    while len(spark.streams.active) > 0:
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))

        # Count number of events
        spark.sql("select count(1) as qty from wiki_changes").show()
        
        # Convert binary to string
        df_kafka =  spark.sql("select CAST(key as string) key, CAST(value as string) value, topic, timestamp from wiki_changes")
        
        # Create dataframe setting schema for event data
        df_wiki = (df_kafka
                   # Sets schema for event data
                   .withColumn("value", from_json("value", schema_wiki))
                  )

        # Transform into tabular 
        # Convert unix timestamp to timestamp
        # Create partition column (change_timestamp_date)
        df_wiki_formatted = (df_wiki.select(
            col("key").alias("event_key")
            ,col("topic").alias("event_topic")
            ,col("timestamp").alias("event_timestamp")
            ,col("value.$schema").alias("schema")
            ,"value.bot"
            ,"value.comment"
            ,"value.id"
            ,col("value.length.new").alias("length_new")
            ,col("value.length.old").alias("length_old")
            ,"value.minor"
            ,"value.namespace"
            ,"value.parsedcomment"
            ,"value.patrolled"
            ,col("value.revision.new").alias("revision_new")
            ,col("value.revision.old").alias("revision_old")
            ,"value.server_name"
            ,"value.server_script_path"
            ,"value.server_url"
            ,to_timestamp(from_unixtime(col("value.timestamp"))).alias("change_timestamp")
            ,to_date(from_unixtime(col("value.timestamp"))).alias("change_timestamp_date")
            ,"value.title"
            ,"value.type"
            ,"value.user"
            ,"value.wiki"
            ,col("value.meta.domain").alias("meta_domain")
            ,col("value.meta.dt").alias("meta_dt")
            ,col("value.meta.id").alias("meta_id")
            ,col("value.meta.offset").alias("meta_offset")
            ,col("value.meta.partition").alias("meta_partition")
            ,col("value.meta.request_id").alias("meta_request_id")
            ,col("value.meta.stream").alias("meta_stream")
            ,col("value.meta.topic").alias("meta_topic")
            ,col("value.meta.uri").alias("meta_uri")
        ))
        
        # Write to parquet file partitioned
        df_wiki_formatted.write.mode('append').partitionBy("change_timestamp_date", "server_name").parquet("/home/jovyan/work/data-lake/wiki-changes")
        
        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:150
Input Rows:0
Input Rows per second:0.0
+-----+
|  qty|
+-----+
|17555|
+-----+

stream process interrupted
