In [1]:
## Running list of issues

# can't seem to write the full raw event to hdfs (probably because how set up the schema)
# respect different between timestamp and event time - for knowing when to handle watermarks
# not really sure what the windowing is telling us or how might be used optimally in analysis, would need to join with fill denormalized tables?
# issues with writing to kafka stream
# add vandalism as topic: "vandalism" in comment
# capture diffs: https://en.wikipedia.org/w/index.php?title=1828_in_the_United_States&diff=827995885&oldid=827995860

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re

In [3]:
sc.setLogLevel("ERROR")

In [4]:
def prune_event(df_IN, filter_IN):
    df_OUT = df_IN.select("parsed_wiki_values.*") \
            .select("id", \
                    "user", \
                    "timestamp", \
                    "bot", \
                    "comment", \
                    "server_name", \
                    "wiki", \
                    "title", \
                    "type", \
                    "log_action", \
                    "log_action_comment", \
                    "log_type", \
                    "minor", \
                    "namespace", \
                    "parsedcomment", \
                    "patrolled", \
                    col("meta.dt").alias("event_date"), \
                    col("meta.schema_uri").alias("event_schema_uri"), \
                    col("meta.uri").alias("wikipage_uri"), \
                    col("meta.domain").alias("event_domain"), \
                    col("length.old").alias("len_old"), \
                    col("length.new").alias("len_new"), \
                    col("revision.old").alias("rev_old"), \
                    col("revision.new").alias("rev_new"), \
                   ).where(col("server_name")==filter_IN)
    return df_OUT

In [5]:
def write_to_hdfs(stream, location):
    pathout = "hdfs://sandbox.hortonworks.com:8020/tmp/{}".format(location)
    return stream.writeStream \
    .format("parquet") \
    .option("startingOffsets", "earliest") \
    .option("path", pathout) \
    .option("checkpointLocation", pathout) \
    .start()

In [6]:
def windowed_counts(df_IN, col_IN):
    return df_IN.groupBy(
        window(df_IN[col_IN], "1 minutes", "30 seconds"),
        df_IN.user
    ).count()

In [7]:
def match_anonymous(userid):
    '''It is assumed in wikipedia that anonymous users are
       given the userid of the IP address from which their
       traffic is coming from. IPs can be ipv4 or ipv6'''
    
    ANONYMOUS = False
    
    # ipv4 and ipv6 expressions used from: http://nbviewer.jupyter.org/github/rasbt/python_reference/blob/master/tutorials/useful_regex.ipynb
    ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
    ipv6_pattern = r'^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$'
    ipv4_ans = bool(re.match(ipv4_pattern, userid))
    ipv6_ans = bool(re.match(ipv6_pattern, userid))
    if ipv4_ans is True:
        ANONYMOUS = True
    elif ipv6_ans is True:
        ANONYMOUS = True
    else:
        ANONYMOUS = False
    return ANONYMOUS

matchAnonUDF = udf(match_anonymous, BooleanType())

In [8]:
df = spark.readStream.format("kafka") \
                     .option("kafka.bootstrap.servers","sandbox.hortonworks.com:6667") \
                     .option("subscribe", "wiki-rc-stream") \
                     .option("startingOffsets", "earliest") \
                     .load()

In [9]:
df.printSchema 

<bound method DataFrame.printSchema of DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]>

In [10]:
data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [11]:
data

DataFrame[key: string, value: string]

In [12]:
jsonschema = StructType().add("bot", BooleanType()) \
                         .add("comment", StringType()) \
                         .add("id", IntegerType()) \
                         .add("length", StructType() \
                            .add("new", IntegerType()) \
                            .add("old", IntegerType())) \
                         .add("meta", StructType() \
                            .add("domain", StringType()) \
                            .add("dt", StringType()) \
                            .add("id", StringType()) \
                            .add("request_id", StringType()) \
                            .add("schema_uri", StringType()) \
                            .add("topic", StringType()) \
                            .add("partition", IntegerType()) \
                            .add("uri", StringType()) \
                            .add("offset", IntegerType())) \
                         .add("minor",  BooleanType()) \
                         .add("namespace", IntegerType()) \
                         .add("parsedcomment", StringType()) \
                         .add("patrolled", BooleanType()) \
                         .add("revision", StructType() \
                            .add("new", IntegerType()) \
                            .add("old", IntegerType())) \
                         .add("server_name", StringType()) \
                         .add("server_script_path", StringType()) \
                         .add("server_url", StringType()) \
                         .add("timestamp", StringType()) \
                         .add("title", StringType()) \
                         .add("type", StringType()) \
                         .add("user", StringType()) \
                         .add("wiki", StringType()) \
                         .add("log_action", StringType()) \
                         .add("log_action_comment", StringType()) \
                         .add("log_id", IntegerType()) \
                         .add("log_params", StructType()) \
                         .add("log_type", StringType())

In [13]:
wiki_raw = df.select(from_json(col("value") \
                                .cast("string"), jsonschema) \
                                .alias("parsed_wiki_values"))

In [14]:
wiki_raw.printSchema 

<bound method DataFrame.printSchema of DataFrame[parsed_wiki_values: struct<bot:boolean,comment:string,id:int,length:struct<new:int,old:int>,meta:struct<domain:string,dt:string,id:string,request_id:string,schema_uri:string,topic:string,partition:int,uri:string,offset:int>,minor:boolean,namespace:int,parsedcomment:string,patrolled:boolean,revision:struct<new:int,old:int>,server_name:string,server_script_path:string,server_url:string,timestamp:string,title:string,type:string,user:string,wiki:string,log_action:string,log_action_comment:string,log_id:int,log_params:struct<>,log_type:string>]>

In [15]:
en_wiki = prune_event(wiki_raw, "en.wikipedia.org") \
                        .withColumn("anonymous", matchAnonUDF("user")) \
                        .withColumn("timestamp_dt", from_unixtime("timestamp", "yyyy-MM-dd HH:mm:ss.SSS"))

In [16]:
en_wiki.printSchema 

<bound method DataFrame.printSchema of DataFrame[id: int, user: string, timestamp: string, bot: boolean, comment: string, server_name: string, wiki: string, title: string, type: string, log_action: string, log_action_comment: string, log_type: string, minor: boolean, namespace: int, parsedcomment: string, patrolled: boolean, event_date: string, event_schema_uri: string, wikipage_uri: string, event_domain: string, len_old: int, len_new: int, rev_old: int, rev_new: int, anonymous: boolean, timestamp_dt: string]>

In [17]:
wikidata = prune_event(wiki_raw, "www.wikidata.org") \
                        .withColumn("anonymous", matchAnonUDF("user")) \
                        .withColumn("timestamp_dt", from_unixtime("timestamp", "yyyy-MM-dd HH:mm:ss.SSS"))

In [18]:
wikidata.printSchema

<bound method DataFrame.printSchema of DataFrame[id: int, user: string, timestamp: string, bot: boolean, comment: string, server_name: string, wiki: string, title: string, type: string, log_action: string, log_action_comment: string, log_type: string, minor: boolean, namespace: int, parsedcomment: string, patrolled: boolean, event_date: string, event_schema_uri: string, wikipage_uri: string, event_domain: string, len_old: int, len_new: int, rev_old: int, rev_new: int, anonymous: boolean, timestamp_dt: string]>

In [None]:
query3 = en_wiki.writeStream.outputMode("append").format("console") \
                                             .start()
query3.awaitTermination(timeout=10)

In [None]:
query4 = wikidata.writeStream.outputMode("append").format("console") \
                                             .start()
query4.awaitTermination(timeout=10)

In [None]:
query_en_wiki_hdfs = write_to_hdfs(en_wiki, "en_wiki")
query_en_wiki_hdfs.awaitTermination(timeout=15)

In [None]:
query_wikidata_hdfs = write_to_hdfs(wikidata, "wikidata")
query_wikidata_hdfs.awaitTermination(timeout=15)

In [19]:
windowedEnWikiCounts = windowed_counts(en_wiki, 'timestamp_dt')

In [20]:
windowedWikidataCounts = windowed_counts(wikidata, 'timestamp_dt')

In [None]:
query5 = windowedEnWikiCounts.writeStream.outputMode("complete").format("console") \
                                             .start()
query5.awaitTermination(timeout=20)

In [None]:
query6 = windowedWikidataCounts.writeStream.outputMode("complete").format("console") \
                                             .start()
query6.awaitTermination(timeout=20)

In [21]:
# TODO: write to individual kafka streams (need other details?)
# enWikiCounts
# wikidataCounts
# 
'''
streamQuery1 = windowedEnWikiCounts.select(
    col("user").cast("string").alias("key"),
    col("count").cast("string").alias("value")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
    
'''
'''
streamQuery1 = windowedEnWikiCounts.select(
    to_json(struct("user", "window")).alias("key"),
    col("count").cast("string").alias("value")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
'''

streamQuery1 = windowedEnWikiCounts.select(
    to_json(struct("user", "window")).alias("key"),
    col("count").cast("string").alias("value")) \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "sandbox.hortonworks.com:6667") \
    .option("topic", "enWikiCounts") \
    .option("checkpointLocation", "hdfs://sandbox.hortonworks.com:8020/tmp/enWikiCounts") \
    .outputMode("complete") \
    .start()


streamQuery1.awaitTermination(timeout=20)

StreamingQueryException: 'Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 4, localhost, executor driver): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;\n\tat org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)\n\tat org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)\n\tat org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)\n\tat org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)\n\tat org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)\n\tat org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:\n=== Streaming Query ===\nIdentifier: [id = e3841b70-201b-4c79-8510-1ea71b52d4ce, runId = d743dfa0-657b-4f8e-9e21-619c2c292d50]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaSource[Subscribe[wiki-rc-stream]]: {"wiki-rc-stream":{"2":70541,"1":69974,"0":70011}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(key#381 as string) AS key#386, cast(value#382 as string) AS value#387]\n+- Project [structtojson(named_struct(user, user#39, window, window#311)) AS key#381, cast(count#340L as string) AS value#382]\n   +- Aggregate [window#341, user#39], [window#341 AS window#311, user#39, count(1) AS count#340L]\n      +- Filter ((cast(timestamp_dt#139 as timestamp) >= window#341.start) && (cast(timestamp_dt#139 as timestamp) < window#341.end))\n         +- Expand [ArrayBuffer(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(0 as bigint)) - cast(2 as bigint)) * 30000000) + 0), end, (((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(0 as bigint)) - cast(2 as bigint)) * 30000000) + 0) + 60000000)), id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, rev_new#77, anonymous#112, timestamp_dt#139), ArrayBuffer(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) - cast(2 as bigint)) * 30000000) + 0), end, (((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) - cast(2 as bigint)) * 30000000) + 0) + 60000000)), id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, rev_new#77, anonymous#112, timestamp_dt#139), ArrayBuffer(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(2 as bigint)) - cast(2 as bigint)) * 30000000) + 0), end, (((((CEIL((cast((precisetimestamp(cast(timestamp_dt#139 as timestamp)) - 0) as double) / cast(30000000 as double))) + cast(2 as bigint)) - cast(2 as bigint)) * 30000000) + 0) + 60000000)), id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, rev_new#77, anonymous#112, timestamp_dt#139)], [window#341, id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, ... 3 more fields]\n            +- Project [id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, rev_new#77, ... 2 more fields]\n               +- Project [id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, event_date#70, event_schema_uri#71, wikipage_uri#72, event_domain#73, len_old#74, len_new#75, rev_old#76, rev_new#77, match_anonymous(user#39) AS anonymous#112]\n                  +- Filter (server_name#33 = en.wikipedia.org)\n                     +- Project [id#25, user#39, timestamp#36, bot#23, comment#24, server_name#33, wiki#40, title#37, type#38, log_action#41, log_action_comment#42, log_type#45, minor#28, namespace#29, parsedcomment#30, patrolled#31, meta#27.dt AS event_date#70, meta#27.schema_uri AS event_schema_uri#71, meta#27.uri AS wikipage_uri#72, meta#27.domain AS event_domain#73, length#26.old AS len_old#74, length#26.new AS len_new#75, revision#32.old AS rev_old#76, revision#32.new AS rev_new#77]\n                        +- Project [parsed_wiki_values#20.bot AS bot#23, parsed_wiki_values#20.comment AS comment#24, parsed_wiki_values#20.id AS id#25, parsed_wiki_values#20.length AS length#26, parsed_wiki_values#20.meta AS meta#27, parsed_wiki_values#20.minor AS minor#28, parsed_wiki_values#20.namespace AS namespace#29, parsed_wiki_values#20.parsedcomment AS parsedcomment#30, parsed_wiki_values#20.patrolled AS patrolled#31, parsed_wiki_values#20.revision AS revision#32, parsed_wiki_values#20.server_name AS server_name#33, parsed_wiki_values#20.server_script_path AS server_script_path#34, parsed_wiki_values#20.server_url AS server_url#35, parsed_wiki_values#20.timestamp AS timestamp#36, parsed_wiki_values#20.title AS title#37, parsed_wiki_values#20.type AS type#38, parsed_wiki_values#20.user AS user#39, parsed_wiki_values#20.wiki AS wiki#40, parsed_wiki_values#20.log_action AS log_action#41, parsed_wiki_values#20.log_action_comment AS log_action_comment#42, parsed_wiki_values#20.log_id AS log_id#43, parsed_wiki_values#20.log_params AS log_params#44, parsed_wiki_values#20.log_type AS log_type#45]\n                           +- Project [jsontostruct(StructField(bot,BooleanType,true), StructField(comment,StringType,true), StructField(id,IntegerType,true), StructField(length,StructType(StructField(new,IntegerType,true), StructField(old,IntegerType,true)),true), StructField(meta,StructType(StructField(domain,StringType,true), StructField(dt,StringType,true), StructField(id,StringType,true), StructField(request_id,StringType,true), StructField(schema_uri,StringType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(uri,StringType,true), StructField(offset,IntegerType,true)),true), StructField(minor,BooleanType,true), StructField(namespace,IntegerType,true), StructField(parsedcomment,StringType,true), StructField(patrolled,BooleanType,true), StructField(revision,StructType(StructField(new,IntegerType,true), StructField(old,IntegerType,true)),true), StructField(server_name,StringType,true), StructField(server_script_path,StringType,true), StructField(server_url,StringType,true), StructField(timestamp,StringType,true), StructField(title,StringType,true), StructField(type,StringType,true), StructField(user,StringType,true), StructField(wiki,StringType,true), StructField(log_action,StringType,true), StructField(log_action_comment,StringType,true), StructField(log_id,IntegerType,true), StructField(log_params,StructType(),true), StructField(log_type,StringType,true), cast(value#1 as string)) AS parsed_wiki_values#20]\n                              +- StreamingExecutionRelation KafkaSource[Subscribe[wiki-rc-stream]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'

In [None]:
# TODO: write to a topic called en_wikipedia_hydrate to notify it's time to add onto that with raw change text and push to HDFS
