In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('wiki-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5")
         .getOrCreate())
sc = spark.sparkContext

In [2]:
# stream read
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-server:9092") # kafka server
  .option("subscribe", "wiki-changes") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .load())

# regular read to keep it simple before we stream
# df = (spark
#   .read
#   .format("kafka")
#   .option("kafka.bootstrap.servers", "kafka-server:9092") # kafka server
#   .option("subscribe", "wiki-changes") # topic
#   .option("startingOffsets", "earliest") # start from beginning 
#   .load())


In [3]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df = df.withColumn("value", df["value"].cast(StringType()))

# only run these for static df
# df.show(2)
# df.printSchema()

In [4]:
# get schema from the binary value turned into to string
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

df_schema = StructType(
    [StructField("performer",
                 StructType(
                     [StructField("user_id",IntegerType(),True),
                      StructField("user_text",StringType(),True),
                      StructField("user_is_bot",StringType(),True),
                      StructField("user_registration_dt",StringType(),True),
                      StructField("user_edit_count",IntegerType(),True),
                      StructField("request_id",StringType(),True)]),True),
     StructField("meta",
                 StructType(
                     [StructField("dt",StringType(),True)]),True),
    StructField("database", StringType(),True)]
)

# Create dataframe setting schema for event data
df_with_schema = df.withColumn("value", from_json("value", df_schema))

# only run these for static df
# df_with_schema.select('value').show(2, truncate=False)

In [5]:
from pyspark.sql.functions import col, from_unixtime, to_date, to_timestamp

df3 = df_with_schema.select(
    to_timestamp(col("value.meta.dt")).alias("page_create_event_time"),
    col("value.performer.user_id").alias("user_id"),
    col("value.performer.user_is_bot").alias("user_isa_bot"),
    to_timestamp(col("value.performer.user_registration_dt")).alias("date_user_registered"),
    col("value.performer.user_edit_count").alias("num_user_edits"),
    col("value.database").alias("wiki_db"),
)

# only run these for static df
# df3.show(2, truncate=False)
# df3.printSchema()

In [6]:
# Start query stream over stream dataframe
raw_path = "/home/jovyan/work/data-lake/wiki-changes"
checkpoint_path = "/home/jovyan/work/data-lake/wiki-changes-checkpoint"

queryStream =(
    df3
    .writeStream
    .format("parquet")
    .queryName("wiki_changes_ingestion")
    .option("checkpointLocation", checkpoint_path)
    .option("path", raw_path)
    .outputMode("append")
    .partitionBy("wiki_db")
    .start())

In [7]:
# Read parquet files as stream
df_wiki_changes = (
    spark
    .readStream
    .format("parquet")
    .schema(df3.schema)
    .load(raw_path)
)


In [8]:
# Output to memory to count rows
queryStreamMem = (df_wiki_changes
 .writeStream
 .format("memory")
 .queryName("wiki_changes_count")
 .outputMode("update")
 .start())


In [None]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "wiki_changes_count" in lst_queries:
            # Count number of events
            spark.sql("select count(1) as qty from wiki_changes_count").show()
        else:
            print("'wiki_changes_count' query not found.")

        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:10
+---+
|qty|
+---+
| 93|
+---+



In [10]:
# Check active streams
if spark.streams.active:
    print("ID:{} | NAME:{}".format(s.id, s.name))
else:
    print('all done here')

ID:07051997-c2bb-4334-a738-2a6613989ed1 | NAME:wiki_changes_ingestion


In [10]:
# End
queryStream.stop()

In [11]:
sc.stop()