In [23]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType
import pandas as pd
import seaborn as sns

In [24]:
spark = SparkSession.builder.appName('StructuredStreamingKafka').getOrCreate()

In [25]:
BOOTSTRAP_SERVERS = "confluent-local-broker-1:51169"
TOPIC = "wikimedia_events"

In [26]:
kafka_stream_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS)
    .option('subscribe', TOPIC)
    .load()
)

In [27]:
def output_console(df):
    query = df.writeStream.outputMode('complete').format('console').start()

In [28]:
def test(df, sleep_time=5, output_mode='append'):
    import time
    query = df.writeStream.outputMode(output_mode).format('console').start()
    time.sleep(sleep_time)
    query.stop()

In [29]:
schema = StructType([
    StructField('timestamp', IntegerType()),
    StructField('bot', BooleanType()),
    StructField('minor', BooleanType()),
    StructField('user', StringType()),
    StructField('meta', StructType([
        StructField('domain', StringType())
    ])),
    StructField('length', StructType([
        StructField('old', IntegerType()),
        StructField('new', IntegerType())
    ]))
])

In [30]:
df = kafka_stream_df.select(F.col('value').cast('string'))
df = df.select(F.from_json(df.value, schema).alias('data'))
df = df.select(
    'data.timestamp',
    'data.bot',
    'data.minor',
    'data.user',
    'data.meta.domain',
    F.col('data.length.old').alias('old_length'),
    F.col('data.length.new').alias('new_length')
)
df = df.withColumn('length_diff', F.col('new_length') - F.col('old_length'))
df = df.withColumn('length_diff_percent', F.col('length_diff') / F.col('old_length') * 100)

In [31]:
# top_five_domains = df.groupBy('domain').count().orderBy(F.desc('count')).limit(5)
# test(top_five_domains, sleep_time=30, output_mode='complete')

In [32]:
# spark-1  | +--------------------+-----+
# spark-1  | |              domain|count|
# spark-1  | +--------------------+-----+
# spark-1  | |    www.wikidata.org|  280|
# spark-1  | |commons.wikimedia...|  236|
# spark-1  | |    en.wikipedia.org|  196|
# spark-1  | |   mg.wiktionary.org|   18|
# spark-1  | |    zh.wikipedia.org|   12|
# spark-1  | +--------------------+-----+

In [33]:
# top_five_users = df.groupBy('user').agg(F.sum('length_diff').alias('length_diff_sum')).orderBy(F.desc('length_diff_sum')).limit(5)
# test(top_five_users, sleep_time=30, output_mode='complete')

In [34]:
# spark-1  | +-------------+---------------+
# spark-1  | |         user|length_diff_sum|
# spark-1  | +-------------+---------------+
# spark-1  | |    Emijrpbot|          38457|
# spark-1  | |     DrThneed|          18450|
# spark-1  | |       Cewbot|          15238|
# spark-1  | |SchlurcherBot|          13888|
# spark-1  | |        Scann|          12814|
# spark-1  | +-------------+---------------+


In [35]:
# summary = df.agg(
#     F.count('timestamp').alias('total_count'),
#     (F.count_if(F.col('bot') == True) / F.count('bot')).alias('bot_percent'),
#     F.mean('length_diff').alias('average_length_diff'),
#     F.min('length_diff').alias('min_length_diff'),
#     F.max('length_diff').alias('max_length_diff')
# )
# test(summary, sleep_time=30, output_mode='complete')

In [36]:
# spark-1  | +-----------+------------------+-------------------+---------------+---------------+
# spark-1  | |total_count|       bot_percent|average_length_diff|min_length_diff|max_length_diff|
# spark-1  | +-----------+------------------+-------------------+---------------+---------------+
# spark-1  | |        598|0.3695652173913043|  307.1011235955056|          -5864|           9851|
# spark-1  | +-----------+------------------+-------------------+---------------+---------------+

In [37]:
query = (
    df.writeStream
    .outputMode('append')
    .option('checkpointLocation', 'output')
    .format('csv')
    .option('path', 'output/wikimedia_events.csv')
    .option('header', True)
    .trigger(processingTime='10 seconds')
    .start()
)
import time
time.sleep(20)
query.stop()