In [1]:
import pyspark
import re
import pyspark.sql
from pyspark.sql import *
import pandas as pd
import matplotlib.pyplot as plt
import hashlib
import os.path
from pyspark.sql.functions import *
from datetime import timedelta, date


%matplotlib inline
spark_hive = pyspark.sql.HiveContext(sc)

## Create alternative session ID to replace session_token

In [2]:
pageloads = spark.sql("select event.session_token from event.citationusagepageload").distinct()
citationusage = spark.sql("select event.session_token from event.citationusage").distinct()

all_tokens = pageloads.union(citationusage).distinct().rdd.zipWithIndex()
session_ids = sqlContext.createDataFrame(all_tokens.map(lambda r: 
                                                         Row(session_token=r[0].session_token, session_id=r[1])))

session_ids.cache().write.parquet("session_ids.parquet")
session_ids.show()

+----------+--------------------+
|session_id|       session_token|
+----------+--------------------+
|         0|3adddfc964d9aba65fe0|
|         1|bfed69d0d82a4eb24941|
|         2|8c64ed41d4cad6f413fb|
|         3|fa5e00f7d40c56634bdc|
|         4|9ee275684ae7695caf2f|
|         5|2ca2db728b164b88029a|
|         6|0e0beb852de26e591dd7|
|         7|f51779fb5554954ba8ea|
|         8|9f0313fdc9cfe05cb79c|
|         9|ab31c6c7832b0f006414|
|        10|66ec2f5ed994e562bd9f|
|        11|98eadae74b3ec4f306ef|
|        12|57dfe01291db0742ae47|
|        13|558ce36605655c6b8e66|
|        14|6f486e0b8304f90afe4f|
|        15|57125428c659c7cf265c|
|        16|b4d2f094226bb5d2d4ec|
|        17|d622b70eb919f40f282c|
|        18|4f2f62fee98c122da400|
|        19|2e7ace684706b026cf58|
+----------+--------------------+
only showing top 20 rows



## Get anonymous edits

In [3]:
query = """
SELECT page_id, year(event_timestamp) edit_year, month(event_timestamp) edit_month, 
        dayofmonth(event_timestamp) edit_day, hour(event_timestamp) edit_hour,
        event_user_text_historical ip
FROM wmf.mediawiki_history 
WHERE wiki_db = 'enwiki'
AND event_user_is_anonymous = TRUE
AND to_timestamp(event_timestamp) > '2018-09-15'
AND to_timestamp(event_timestamp) < '2018-11-15'
AND page_namespace = 0
AND page_is_redirect = FALSE
"""

anonymous_edits = spark.sql(query).distinct()
anonymous_edits

DataFrame[page_id: bigint, edit_year: int, edit_month: int, edit_day: int, edit_hour: int, ip: string]

## Get page loads and clicks events

In [4]:
pageloads = spark.sql("select * from event.citationusagepageload")
pageloads

DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]

Total pageload events:

In [5]:
pageloads_original = pageloads.count()
pageloads_original

1677561540

Total unique sessions:

In [6]:
pageloads_original_sessions = pageloads.select("event.session_token").distinct().count()
pageloads_original_sessions

886214547

----

In [7]:
citationusage = spark.sql("select * from event.citationusage")
citationusage

DataFrame[dt: string, event: struct<action:string,citation_in_text_refs:bigint,dom_interactive_time:bigint,event_offset_time:bigint,ext_position:bigint,footnote_number:bigint,freely_accessible:boolean,in_infobox:boolean,link_occurrence:bigint,link_text:string,link_url:string,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,section_id:string,session_token:string,skin:string,citation_identifier_label:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]

Total click events:

In [8]:
citationusage_original = citationusage.count()
citationusage_original

152160328

Unique sessions:

In [9]:
citationusage_original_sessions = citationusage.select("event.session_token").distinct().count()
citationusage_original_sessions

100437517

----
## Get the session tokens with an edit

In [10]:
sessions_with_edits = pageloads.join(anonymous_edits, pageloads.ip == anonymous_edits.ip)\
                                .where(pageloads.year == anonymous_edits.edit_year)\
                                .where(pageloads.month == anonymous_edits.edit_month)\
                                .where(pageloads.day == anonymous_edits.edit_day)\
                                .where(pageloads.hour == anonymous_edits.edit_hour)\
                                .where(pageloads.event.page_id == anonymous_edits.page_id)\
                                .select("event.session_token").distinct()

sessions_with_edits

DataFrame[session_token: string]

Count the sessions to excude:

In [11]:
sessions_with_edits.cache().count()

119072

Left join to keep only the pageloads of the sessions without edits:

In [12]:
sessions_with_edits.registerTempTable("sessions_with_edits")

query = """
SELECT p.*
FROM event.citationusagepageload p
LEFT JOIN sessions_with_edits s
ON p.event.session_token = s.session_token
WHERE s.session_token IS NULL
"""

pageloads_clean = spark.sql(query)
pageloads_clean

DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]

Get the number of pageload events after the cleaning:

In [13]:
pageloads_anonymized = pageloads_clean.count()

Get the number of individual sessions:

In [14]:
pageloads_anonymized_sessions = pageloads_clean.select("event.session_token").distinct().count()

Add the session id and drop the critical fields:

In [15]:
anonymous_pageloads_nested = pageloads_clean\
        .alias("pl")\
        .join(session_ids.alias("ids"), pageloads_clean.event.session_token == session_ids.session_token)\
        .select(["pl.*", "ids.session_id", to_timestamp("pl.dt").alias("event_time")])

anonymous_pageloads_nested.printSchema()

root
 |-- dt: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- dom_interactive_time: long (nullable = true)
 |    |-- event_offset_time: long (nullable = true)
 |    |-- mode: string (nullable = true)
 |    |-- namespace_id: long (nullable = true)
 |    |-- page_id: long (nullable = true)
 |    |-- page_title: string (nullable = true)
 |    |-- page_token: string (nullable = true)
 |    |-- referrer: string (nullable = true)
 |    |-- revision_id: long (nullable = true)
 |    |-- session_token: string (nullable = true)
 |    |-- skin: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- recvfrom: string (nullable = true)
 |-- revision: long (nullable = true)
 |-- schema: string (nullable = true)
 |-- seqid: long (nullable = true)
 |-- useragent: struct (nullable = true)
 |    |-- browser_family: string (nullable = true)
 |    |-- browser_major: string (nullable = true)
 |    |-- browser_minor: string (nullable

In [16]:
anonymous_pageloads_nested.registerTempTable('anonymous_pageloads_nested')

query = """
SELECT session_id, event_time, event.*, 
        recvfrom, revision, schema, seqid, 
        uuid, webhost, wiki, geocoded_data, 
        year, month, day, hour 
FROM anonymous_pageloads_nested
"""

anonymous_pageloads = spark.sql(query).drop('session_token')
anonymous_pageloads.printSchema()

root
 |-- session_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- action: string (nullable = true)
 |-- dom_interactive_time: long (nullable = true)
 |-- event_offset_time: long (nullable = true)
 |-- mode: string (nullable = true)
 |-- namespace_id: long (nullable = true)
 |-- page_id: long (nullable = true)
 |-- page_title: string (nullable = true)
 |-- page_token: string (nullable = true)
 |-- referrer: string (nullable = true)
 |-- revision_id: long (nullable = true)
 |-- skin: string (nullable = true)
 |-- recvfrom: string (nullable = true)
 |-- revision: long (nullable = true)
 |-- schema: string (nullable = true)
 |-- seqid: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- webhost: string (nullable = true)
 |-- wiki: string (nullable = true)
 |-- geocoded_data: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: lon

Sanity check:

In [17]:
anonymous_pageloads_count = anonymous_pageloads.count()
anonymous_pageloads_count

1676585089

Sanity check, number of unique sessions:

In [18]:
anonymous_pageloads_sessions_count = anonymous_pageloads.select("session_id").distinct().count()
anonymous_pageloads_sessions_count

886095475

In [19]:
anonymous_pageloads.write.parquet("anonymous_pageloads.parquet")

----

## Anonymize the citation usage table

In [20]:
anonymous_citationusage_nested = citationusage\
        .alias("cit")\
        .join(session_ids.alias("ids"), citationusage.event.session_token == session_ids.session_token)\
        .select(["cit.*", "ids.session_id", to_timestamp("cit.dt").alias("event_time")])

anonymous_citationusage_nested.printSchema()

root
 |-- dt: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- citation_in_text_refs: long (nullable = true)
 |    |-- dom_interactive_time: long (nullable = true)
 |    |-- event_offset_time: long (nullable = true)
 |    |-- ext_position: long (nullable = true)
 |    |-- footnote_number: long (nullable = true)
 |    |-- freely_accessible: boolean (nullable = true)
 |    |-- in_infobox: boolean (nullable = true)
 |    |-- link_occurrence: long (nullable = true)
 |    |-- link_text: string (nullable = true)
 |    |-- link_url: string (nullable = true)
 |    |-- mode: string (nullable = true)
 |    |-- namespace_id: long (nullable = true)
 |    |-- page_id: long (nullable = true)
 |    |-- page_title: string (nullable = true)
 |    |-- page_token: string (nullable = true)
 |    |-- referrer: string (nullable = true)
 |    |-- revision_id: long (nullable = true)
 |    |-- section_id: string (nullable = true)
 |    |-- sessi

In [21]:
anonymous_citationusage_nested.registerTempTable('anonymous_citationusage_nested')

query = """
SELECT session_id, event_time, event.*, 
        recvfrom, revision, schema, seqid, 
        uuid, webhost, wiki, geocoded_data, 
        year, month, day, hour 
FROM anonymous_citationusage_nested
"""

anonymous_citationusage = spark.sql(query).drop('session_token')
anonymous_citationusage.printSchema()

root
 |-- session_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- action: string (nullable = true)
 |-- citation_in_text_refs: long (nullable = true)
 |-- dom_interactive_time: long (nullable = true)
 |-- event_offset_time: long (nullable = true)
 |-- ext_position: long (nullable = true)
 |-- footnote_number: long (nullable = true)
 |-- freely_accessible: boolean (nullable = true)
 |-- in_infobox: boolean (nullable = true)
 |-- link_occurrence: long (nullable = true)
 |-- link_text: string (nullable = true)
 |-- link_url: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- namespace_id: long (nullable = true)
 |-- page_id: long (nullable = true)
 |-- page_title: string (nullable = true)
 |-- page_token: string (nullable = true)
 |-- referrer: string (nullable = true)
 |-- revision_id: long (nullable = true)
 |-- section_id: string (nullable = true)
 |-- skin: string (nullable = true)
 |-- citation_identifier_label: string (nullable = true)
 |-

Sanity check:

In [22]:
citationusage_anonymized = anonymous_citationusage.count()
citationusage_anonymized

152160328

Sanity check, number of unique sessions:

In [23]:
citationusage_anonymized_sessions = anonymous_citationusage.select("session_id").distinct().count()
citationusage_anonymized_sessions

100437517

In [24]:
anonymous_citationusage.write.parquet("anonymous_citationusage.parquet")

------
### Click events
Number of removed events:

In [38]:
print("Number of clicks events removed:")
print(citationusage_original - citationusage_anonymized)
print("Percentage of the dataset removed:")
print((citationusage_original - citationusage_anonymized)/citationusage_original)

Number of clicks events removed:
0
Percentage of the dataset removed:
0.0


Number of removed sessions:

In [39]:
print("Number of sessions removed:")
print(citationusage_original_sessions - citationusage_anonymized_sessions)
print("Percentage of the dataset removed:")
print((citationusage_original_sessions - citationusage_anonymized_sessions)/citationusage_original_sessions)

Number of sessions removed:
0
Percentage of the dataset removed:
0.0


----
### Pageloads

In [40]:
print("Number of pageload removed removed:")
print(pageloads_original - pageloads_anonymized)
print("Percentage of the dataset removed:")
print((pageloads_original - pageloads_anonymized)/pageloads_original)

Number of pageload removed removed:
976451
Percentage of the dataset removed:
0.0005820656808810722


In [41]:
print("Number of sessions removed:")
print(pageloads_original_sessions - pageloads_anonymized_sessions)
print("Percentage of the dataset removed:")
print((pageloads_original_sessions - pageloads_anonymized_sessions)/pageloads_original_sessions)

Number of sessions removed:
119072
Percentage of the dataset removed:
0.00013436024087291358


----

# Sanity checks

Release cache:

In [29]:
spark.catalog.clearCache()

In [30]:
anonymous_citationusage = spark.read.parquet("anonymous_citationusage.parquet")
anonymous_citationusage

DataFrame[session_id: bigint, event_time: timestamp, action: string, citation_in_text_refs: bigint, dom_interactive_time: bigint, event_offset_time: bigint, ext_position: bigint, footnote_number: bigint, freely_accessible: boolean, in_infobox: boolean, link_occurrence: bigint, link_text: string, link_url: string, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, section_id: string, skin: string, citation_identifier_label: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]

Check count:

In [31]:
anonymous_citationusage.count()

152160328

In [32]:
anonymous_pageloads = spark.read.parquet("anonymous_pageloads.parquet")
anonymous_pageloads

DataFrame[session_id: bigint, event_time: timestamp, action: string, dom_interactive_time: bigint, event_offset_time: bigint, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, skin: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]

Check count:

In [33]:
anonymous_pageloads.count()

1676585089

----
# Check on schema

In [34]:
anonymous_citationusage.printSchema()

root
 |-- session_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- action: string (nullable = true)
 |-- citation_in_text_refs: long (nullable = true)
 |-- dom_interactive_time: long (nullable = true)
 |-- event_offset_time: long (nullable = true)
 |-- ext_position: long (nullable = true)
 |-- footnote_number: long (nullable = true)
 |-- freely_accessible: boolean (nullable = true)
 |-- in_infobox: boolean (nullable = true)
 |-- link_occurrence: long (nullable = true)
 |-- link_text: string (nullable = true)
 |-- link_url: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- namespace_id: long (nullable = true)
 |-- page_id: long (nullable = true)
 |-- page_title: string (nullable = true)
 |-- page_token: string (nullable = true)
 |-- referrer: string (nullable = true)
 |-- revision_id: long (nullable = true)
 |-- section_id: string (nullable = true)
 |-- skin: string (nullable = true)
 |-- citation_identifier_label: string (nullable = true)
 |-

In [35]:
anonymous_pageloads.printSchema()

root
 |-- session_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- action: string (nullable = true)
 |-- dom_interactive_time: long (nullable = true)
 |-- event_offset_time: long (nullable = true)
 |-- mode: string (nullable = true)
 |-- namespace_id: long (nullable = true)
 |-- page_id: long (nullable = true)
 |-- page_title: string (nullable = true)
 |-- page_token: string (nullable = true)
 |-- referrer: string (nullable = true)
 |-- revision_id: long (nullable = true)
 |-- skin: string (nullable = true)
 |-- recvfrom: string (nullable = true)
 |-- revision: long (nullable = true)
 |-- schema: string (nullable = true)
 |-- seqid: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- webhost: string (nullable = true)
 |-- wiki: string (nullable = true)
 |-- geocoded_data: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: lon

# Check at session level

In [42]:
anonymous_pageloads.registerTempTable('anonymous_pageloads')

pageloads_query = """
select event_time, session_id, page_id, 'pageLoad' as action
from anonymous_pageloads
where wiki = 'enwiki'
AND namespace_id = 0
"""

pageloads = spark.sql(pageloads_query)
pageloads

DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]

In [43]:
pageloads.show(5)

+-------------------+----------+--------+--------+
|         event_time|session_id| page_id|  action|
+-------------------+----------+--------+--------+
|2018-10-13 01:56:53|   3298297|   20264|pageLoad|
|2018-10-13 01:56:41|   3298297|   20264|pageLoad|
|2018-10-13 01:56:22|   3298297|   20264|pageLoad|
|2018-10-02 15:17:57|   1353543|  563271|pageLoad|
|2018-10-01 16:35:38|    653142|18868131|pageLoad|
+-------------------+----------+--------+--------+
only showing top 5 rows



Get the unique session IDs to filter the "citationusage" table (the pageload is a subsamping). 

In [44]:
unique_sessions = pageloads.select("session_id").distinct()

In [46]:
anonymous_citationusage.registerTempTable('anonymous_citationusage')
unique_sessions.registerTempTable('unique_sessions')

events_query = """
select event_time, cu.session_id, page_id, action
from anonymous_citationusage cu
join unique_sessions us
on us.session_id=cu.session_id
where wiki = 'enwiki'
"""

events = spark.sql(events_query)
events

DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]

Merge all the events:

In [47]:
sessions_rdd = events.rdd.union(pageloads.rdd)\
                        .map(lambda r: (r.session_id, [(r.event_time, r.page_id, r.action)]))\
                        .reduceByKey(lambda a,b: a+b)\
                        .map(lambda r: (r[0], sorted(r[1], key=lambda x: x[0])))\
                        .map(lambda r: Row(session_id=r[0], events=[
                                    Row(event_time=e[0], page_id=e[1], action=e[2]) for e in r[1]]))

In [48]:
sessions_rdd.take(1)

[Row(events=[Row(action='pageLoad', event_time=datetime.datetime(2018, 10, 21, 5, 51, 27), page_id=19098431), Row(action='fnHover', event_time=datetime.datetime(2018, 10, 21, 5, 51, 52), page_id=19098431), Row(action='fnHover', event_time=datetime.datetime(2018, 10, 21, 5, 51, 53), page_id=19098431), Row(action='fnHover', event_time=datetime.datetime(2018, 10, 21, 5, 51, 55), page_id=19098431), Row(action='fnHover', event_time=datetime.datetime(2018, 10, 21, 5, 51, 58), page_id=19098431), Row(action='fnHover', event_time=datetime.datetime(2018, 10, 21, 5, 52, 1), page_id=19098431)], session_id=262144000)]

In [49]:
sessions = sqlContext.createDataFrame(sessions_rdd)
sessions.show(5)

+--------------------+----------+
|              events|session_id|
+--------------------+----------+
|[[pageLoad, 2018-...| 655360000|
|[[pageLoad, 2018-...| 458752000|
|[[pageLoad, 2018-...| 744238000|
|[[pageLoad, 2018-...| 784636000|
|[[pageLoad, 2018-...|  40398000|
+--------------------+----------+
only showing top 5 rows



In [50]:
def count_missing_load(row):
    events = {}
    for i in range(len(row.events)):
        if row.events[i].action == 'pageLoad' and row.events[i].page_id not in events:
            events[row.events[i].page_id] = i
    for i in range(len(row.events)):
        if row.events[i].action is not 'pageLoad' and (row.events[i].page_id not in events or events[row.events[i].page_id] > i):
            return 1
    return 0

errors_count = sessions.rdd.map(count_missing_load)

In [51]:
errors_rate = errors_count.sum()/unique_sessions.count()
errors_rate

8.213336208809798e-05

Number of sessions:

In [52]:
sessions.count()

886095469