In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType

In [3]:
spark = SparkSession.builder.appName('StructuredStreamingKafka').getOrCreate()

In [4]:
BOOTSTRAP_SERVERS = 'confluent-local-broker-1:50459'
TOPIC = 'wikimedia_events'

In [5]:
kafka_stream_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS)
    .option('subscribe', TOPIC)
    .load()
)

In [6]:
def test(df, sleep_time=5, output_mode='append'):
    import time
    query = df.writeStream.outputMode(output_mode).format('console').start()
    time.sleep(sleep_time)
    query.stop()

In [7]:
schema = StructType([
    StructField('timestamp', IntegerType()),
    StructField('bot', BooleanType()),
    StructField('minor', BooleanType()),
    StructField('user', StringType()),
    StructField('meta', StructType([
        StructField('domain', StringType())
    ])),
    StructField('length', StructType([
        StructField('old', IntegerType()),
        StructField('new', IntegerType())
    ]))
])

In [8]:
df = kafka_stream_df.select(F.col('value').cast('string'))
df = df.select(F.from_json(df.value, schema).alias('data'))
df = df.select('data.timestamp', 
               'data.bot', 
               'data.minor', 
               'data.user', 
               'data.meta.domain', 
               'data.length.old', 
               'data.length.new'           
              )

In [9]:
df = df.withColumn('length_diff', F.col('new') - F.col('old'))
df = df.withColumn('length_diff_percent',
                  F.round((((F.col('new') - F.col('old')) / F.col('old')) * 100), 2)
                  )

In [10]:
test(df, sleep_time=5, output_mode='append')

In [25]:
top_domain = df.groupBy('domain').count().orderBy(F.desc('count')).limit(5)

In [26]:
test(top_domain, sleep_time=5, output_mode='complete')

In [27]:
top_users = df.groupBy('user').agg(F.sum('length_diff').alias('added_sum')).orderBy(F.desc('added_sum')).limit(5)

In [28]:
test(top_users, sleep_time=5, output_mode='complete')

In [42]:
calc_df = df.agg(
    F.count('timestamp').alias('total_count'),
    F.round(((F.count_if(F.col('bot') == True) / F.count('bot')) * 100), 2).alias('percent_bot'),
    F.round(F.mean('length_diff'), 2).alias('average_length_diff'),
    F.min('length_diff').alias('min_length_diff'),
    F.max('length_diff').alias('max_length_diff')
)

In [43]:
test(calc_df, sleep_time=5, output_mode='complete')