In [1]:
!python --version

Python 2.7.5


In [2]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('wiki-changes-event-consumer197')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4")
         .getOrCreate())
sc = spark.sparkContext

print("********************  SPARK Context Created!***********************")

********************  SPARK Context Created!***********************


In [3]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("subscribe", "wiki-changes") 
  .option("startingOffsets", "earliest")
  .load())
  
print("********************  DF Loaded ***********************")


********************  DF Loaded ***********************


In [4]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [5]:
df1.columns

['key', 'value', 'topic', 'partition', 'offset', 'timestamp', 'timestampType']

In [6]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

# Event data schema
schema_wiki = StructType(
    [StructField("$schema",StringType(),True),
     StructField("bot",BooleanType(),True),
     StructField("comment",StringType(),True),
     StructField("id",StringType(),True),
     StructField("length",
                 StructType(
                     [StructField("new",IntegerType(),True),
                      StructField("old",IntegerType(),True)]),True),
     StructField("meta",
                 StructType(
                     [StructField("domain",StringType(),True),
                      StructField("dt",StringType(),True),
                      StructField("id",StringType(),True),
                      StructField("offset",LongType(),True),
                      StructField("partition",LongType(),True),
                      StructField("request_id",StringType(),True),
                      StructField("stream",StringType(),True),
                      StructField("topic",StringType(),True),
                      StructField("uri",StringType(),True)]),True),
     StructField("minor",BooleanType(),True),
     StructField("namespace",IntegerType(),True),
     StructField("parsedcomment",StringType(),True),
     StructField("patrolled",BooleanType(),True),
     StructField("revision",
                 StructType(
                     [StructField("new",IntegerType(),True),
                      StructField("old",IntegerType(),True)]),True),
     StructField("server_name",StringType(),True),
     StructField("server_script_path",StringType(),True),
     StructField("server_url",StringType(),True),
     StructField("timestamp",StringType(),True),
     StructField("title",StringType(),True),
     StructField("type",StringType(),True),
     StructField("user",StringType(),True),
     StructField("wiki",StringType(),True)])

# Create dataframe setting schema for event data
df_wiki = (df1
           # Sets schema for event data
           .withColumn("value", from_json("value", schema_wiki))
          )

In [7]:
df_wiki.columns

['key', 'value', 'topic', 'partition', 'offset', 'timestamp', 'timestampType']

In [8]:
from pyspark.sql.functions import col, from_unixtime, to_date, to_timestamp

# Transform into tabular 
# Convert unix timestamp to timestamp
# Create partition column (change_timestamp_date)
df_wiki_formatted = (df_wiki.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.$schema").alias("schema")
    ,"value.bot"
    ,"value.comment"
    ,"value.id"
    ,col("value.length.new").alias("length_new")
    ,col("value.length.old").alias("length_old")
    ,"value.minor"
    ,"value.namespace"
    ,"value.parsedcomment"
    ,"value.patrolled"
    ,col("value.revision.new").alias("revision_new")
    ,col("value.revision.old").alias("revision_old")
    ,"value.server_name"
    ,"value.server_script_path"
    ,"value.server_url"
    ,to_timestamp(from_unixtime(col("value.timestamp"))).alias("change_timestamp")
    ,to_date(from_unixtime(col("value.timestamp"))).alias("change_timestamp_date")
    ,"value.title"
    ,"value.type"
    ,"value.user"
    ,"value.wiki"
    ,col("value.meta.domain").alias("meta_domain")
    ,col("value.meta.dt").alias("meta_dt")
    ,col("value.meta.id").alias("meta_id")
    ,col("value.meta.offset").alias("meta_offset")
    ,col("value.meta.partition").alias("meta_partition")
    ,col("value.meta.request_id").alias("meta_request_id")
    ,col("value.meta.stream").alias("meta_stream")
    ,col("value.meta.topic").alias("meta_topic")
    ,col("value.meta.uri").alias("meta_uri")
))

In [9]:
df_wiki_formatted

DataFrame[event_key: string, event_topic: string, event_timestamp: timestamp, schema: string, bot: boolean, comment: string, id: string, length_new: int, length_old: int, minor: boolean, namespace: int, parsedcomment: string, patrolled: boolean, revision_new: int, revision_old: int, server_name: string, server_script_path: string, server_url: string, change_timestamp: timestamp, change_timestamp_date: date, title: string, type: string, user: string, wiki: string, meta_domain: string, meta_dt: string, meta_id: string, meta_offset: bigint, meta_partition: bigint, meta_request_id: string, meta_stream: string, meta_topic: string, meta_uri: string]

In [10]:
from pyspark.sql.functions import *
# Same query as staticInputDF
streamingCountsDF = (                 
  df_wiki_formatted
    .groupBy(
      df_wiki_formatted.event_topic, 
      window(df_wiki_formatted.change_timestamp, "1 second"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

In [11]:
from pyspark.sql.functions import *
# Same query as staticInputDF
streamingCountsDF = (                 
  df_wiki_formatted
    .groupBy(
      df_wiki_formatted.server_name, 
      window(df_wiki_formatted.event_timestamp, "1 minute"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

In [12]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [13]:
len(spark.streams.active)

1

In [14]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "counts" in lst_queries:
            # Count number of events
            spark.sql("select * from counts").show()
        else:
            print("'wiki_changes_count' query not found.")

        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:5
+--------------------+--------------------+-----+
|         server_name|              window|count|
+--------------------+--------------------+-----+
|    id.wikipedia.org|[1970-01-01 05:29...|   30|
|    vi.wikipedia.org|[2021-09-03 16:05...|   25|
|    th.wikipedia.org|[2021-09-03 16:08...|    3|
|   ca.wiktionary.org|[2021-09-03 16:12...|   12|
|    ar.wikipedia.org|[2021-09-03 16:12...|   21|
|    te.wikipedia.org|[2021-09-03 16:09...|    2|
|zh-classical.wiki...|[2021-09-03 16:04...|    2|
|    zh.wikipedia.org|[2021-09-03 16:08...|   26|
|    nl.wikipedia.org|[2021-09-03 16:08...|    8|
|   tr.wikisource.org|[2021-09-03 16:06...|    1|
|   tr.wikisource.org|[2021-09-03 16:11...|    1|
|    ta.wikipedia.org|[2021-09-03 16:12...|    2|
|    ur.wikipedia.org|[2021-09-03 16:11...|    1|
|    sk.wikipedia.org|[2021-09-03 16:12...|    4|
|     ru.wikinews.org|[2021-09-03 16:02...|   40|
|   tr.wikisource.org|[2021-09-03 16:02...|    1|
|   en.wiktionary.org|[2021-09-03 16:07...| 

NameError: name 'queryStreamMem' is not defined