In [12]:
import pyspark
import re
import pyspark.sql
from pyspark.sql import *
import pandas as pd
import matplotlib.pyplot as plt
import hashlib
import os.path
from pyspark.sql.functions import desc
from datetime import timedelta, date
import json

%matplotlib inline
#spark_hive = pyspark.sql.HiveContext(sc)

In [13]:
import findspark
findspark.init()

from pyspark.sql import dataframe
from pyspark.sql import functions as F

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [14]:
# import class XmlWiki
import os
import sys
my_class_dir = '../src/'
if my_class_dir not in sys.path:
    sys.path.append(my_class_dir)
    
from XmlWiki import *

In [4]:
# required to iterate the range of dates
def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)

# Nas 
---

# Load the data 

Reading semi-structured files in Spark can be efficient if you know the **schema before accessing the data.** [link](https://szczeles.github.io/Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark/). 

In [15]:
DATA_DIR = 'data/' 
WIKIPEDIA_XML_DUMP = DATA_DIR + 'pawiki-20181101-pages-articles-multistream.xml.bz2'

In [16]:
print(XmlWiki)

<class 'XmlWiki.XmlWiki'>


In [17]:
%%time
# initialize object from class XmlWikidump to load the file + save schema of file

WikiXML = XmlWiki(
    path = WIKIPEDIA_XML_DUMP, 
    path_schema = DATA_DIR + 'pawiki-20181101-pages-articles-multistream-schema',
    sampling_ratio=0.8)

wikipedia = WikiXML.dataframe

Saving schema in data/pawiki-20181101-pages-articles-multistream-schema 

root
 |-- _corrupt_record: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ns: long (nullable = true)
 |-- revision: struct (nullable = true)
 |    |-- comment: string (nullable = true)
 |    |-- contributor: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- format: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- parentid: long (nullable = true)
 |    |-- sha1: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _space: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)

CPU times: user 4.51 ms, sys: 2.74 ms, total: 7.25 ms
Wall time: 17.7 s


In [27]:
wikipedia.filter("ns = '10'").filter("title like '%infobox%'").show(5)

+---------------+---+---+--------+-----+
|_corrupt_record| id| ns|revision|title|
+---------------+---+---+--------+-----+
+---------------+---+---+--------+-----+



In [35]:
wikipedia.filter("ns = '0'").select("revision.text._VALUE").filter("_VALUE like '%{{Infobox military conflict%'").show(2)  

+--------------------+
|              _VALUE|
+--------------------+
|{{Infobox militar...|
|
{{Infobox milita...|
+--------------------+
only showing top 2 rows



In [30]:
# Pietro corrupt_id
wikipedia.filter("_corrupt_record like '%<%'").count()

38197

In [31]:
wikipedia.select("id").count()

65348

In [18]:
wikipedia.select('_corrupt_record').take(10)

[Row(_corrupt_record=None),
 Row(_corrupt_record='<page>\n    <title>HomePage</title>\n    <ns>0</ns>\n    <id>2</id>\n    <redirect title="ਮੁੱਖ ਸਫ਼ਾ" />'),
 Row(_corrupt_record=None),
 Row(_corrupt_record='<page>\n    <title>ਵਿਕੀਪੀਡੀਆ:ਕੱਚਾ ਖਾਕਾ</title>\n    <ns>4</ns>\n    <id>785</id>\n    <revision>\n      <id>91046</id>\n      <parentid>91045</parentid>\n      <timestamp>2012-10-29T12:57:30Z</timestamp>\n      <contributor>\n        <username>Itar buttar</username>\n        <id>3341</id>\n      </contributor>\n      <minor />'),
 Row(_corrupt_record=None),
 Row(_corrupt_record=None),
 Row(_corrupt_record=None),
 Row(_corrupt_record=None),
 Row(_corrupt_record='<page>\n    <title>ਫਰਮਾ:ਹੋਰ ਵਿਕੀ</title>\n    <ns>10</ns>\n    <id>1034</id>\n    <revision>\n      <id>89392</id>\n      <parentid>87208</parentid>\n      <timestamp>2012-10-23T14:52:07Z</timestamp>\n      <contributor>\n        <username>Itar buttar</username>\n        <id>3341</id>\n      </contributor>\n      <minor />'),

## [PART 1] Get number of references per page

Use the loaded public XML dump and we count the number of tag of type ```<ref>```, by parsing all pages to get the number of references.

The analysis is based on work from *Research:Characterizing Wikipedia Citation Usage*: [MetaPageQueries](https://meta.wikimedia.org/wiki/Research:Characterizing_Wikipedia_Citation_Usage/First_Round_of_Analysis#Dimensions_of_Analysis)

Example of reference:
```html
<ref>{{cite web| url=http://geonames.nga.mil/ggmagaz/geonames4.asp 
    |title=NGA GeoName Database |publisher=[[National Geospatial Intelligence Agency]] 
    |accessdate=2008-07-05 
    |archiveurl = https://web.archive.org/web/20080608190852/http://geonames.nga.mil/ggmagaz/geonames4.asp 
    <!-- Bot retrieved archive --> |archivedate = 2008-06-08}}</ref>
```

Namespace ns = 0 is a normal wiki page

1. quantify the number of references per page
    * using regex expression to find the references in the page
    * counting the number of references

In [13]:
# quantify the number of references per page

# Compile a regular expression pattern into a regular expression object, 
#which can be used for matching using its findall.
just_ref_regex = re.compile(r'<ref[^>]*[^\/]>|<ref[ ]*>')

# find # references <ref> per page 
def get_refs_count(entity, regex_expression=just_ref_regex ):
    # get access to value in text in revision
    text = entity.revision.text._VALUE
    # find references
    refs = just_ref_regex.findall(text)
    print(refs)
    return Row(id=entity.id, refs_count=len(refs))

# select only pages having at least 1 revision and normal page ns = 0
# TODO .filter("redirect._title is null")
articles = wikipedia.filter("ns = '0'") \
                .filter("revision.text._VALUE is not null") \
                .filter("length(revision.text._VALUE) > 0") 

In [None]:
# get the # of references per page
# Creates a DataFrame from an RDD, apply counting function with regex expression
reference_count_page = sqlContext.createDataFrame(articles.rdd.map(get_refs_count))
reference_count_page.sort('refs_count', ascending=False).show(5)
reference_count_page_sql = reference_count_page.registerTempTable('reference_count_page_sql')

In [None]:
# quantify the # articles with # ref > 0: 
non_zero_ref_query = """
select count(DISTINCT id)
from reference_count_page_sql
where refs_count > 0
"""
non_zero_articles = spark.sql(non_zero_ref_query)
non_zero_articles_pd = non_zero_articles.toPandas().iloc[0,0]
print("Number of articles with #references > 0: " + str(non_zero_articles_pd))
#non_zero_articles = reference_count_page.select('id').filter('refs_count > 0').distinct().count()

**Number of pages having at least x ref counts**

In [None]:
# count the number of pages per ref count --> need to have at least x counts function
# for x ref, how many pages have at least x ref?

ref_numbers = reference_count_page.select('refs_count').distinct().sort('refs_count', ascending=True).collect()
#xs = pd.Series(range(1,maxTagsCounts,step));
#gratherThanData = xs.apply(lambda x: (pdTagsCounts.counts[pdTagsCounts.counts>=x]).count());

In [None]:
max_ref_count_page = reference_count_page.select('refs_count') \
                        .agg({'refs_count': 'max'}) \
                        .collect()[0]
#max_ref_count_page["max(refs_count)"]
step = 10
ref_numbers_pd = pd.Series(range(1,max_ref_count_page["max(refs_count)"],step))

In [None]:
references_sql_view = reference_count_page.registerTempTable('references_sql_view')

ref_dist = """
select refs_count, count('*') as Frequency 
from references_sql_view
where refs_count > 0
group by refs_count
"""

result_ref_dist = spark.sql(ref_dist)
print(result_ref_dist.show(5))

**Distribution of number of references** We bin the number of references to get the number of pages having at least x #references.

In [None]:
just_refs_count = reference_count_page.select('refs_count').toPandas()
just_refs_count.plot(kind="hist", bins=100, log=True, figsize=(12,7), title="Distribution of number of references")
plt.xlabel('# References')
plt.ylabel('Frequency: # Pages having at least x references')
plt.show()

---

# [PART 2] Look into references

In [5]:
%%time
# initialize object from class XmlWikidump to load the file + save schema of file

WikiXML_2 = XmlWiki(
    path = WIKIPEDIA_XML_DUMP, 
    path_schema = DATA_DIR + 'pawiki-20181101-pages-articles-multistream-schema',
    sampling_ratio=0.8)

wikipedia_2 = WikiXML_2.dataframe

Saving schema in data/pawiki-20181101-pages-articles-multistream-schema 

root
 |-- _corrupt_record: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ns: long (nullable = true)
 |-- revision: struct (nullable = true)
 |    |-- comment: string (nullable = true)
 |    |-- contributor: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- format: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- parentid: long (nullable = true)
 |    |-- sha1: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _space: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)

CPU times: user 4.92 ms, sys: 3.24 ms, total: 8.16 ms
Wall time: 18.9 s


In [6]:
# get whole reference
full_ref_regex = re.compile(r'(<ref[^>]*[^/]>|<ref[ ]*>){{([^<]*)}}</ref')

def get_full_ref(entity, regex_ref=full_ref_regex):
    text_ = entity.revision.text._VALUE
    refs_ = regex_ref.findall(text_)
    return refs_

# filter pages to make sure you have a content for the page
# TODO .filter("redirect._title is null")
articles_2 = wikipedia_2.filter("ns = '0'") \
                .filter("revision.text._VALUE is not null") \
                .filter("length(revision.text._VALUE) > 0")
references_2 = articles_2.rdd.map(get_full_ref)
references_2.take(2)

[[],
 [('<ref name=pibmumbai>',
   'cite press release | work=Press Information Bureau, Mumbai|publisher=Press Information Bureau, Government of India|url=http://pibmumbai.gov.in/scripts/detail.asp?releaseId=E2011IS3|title=India stats: Million plus cities in India as per Census 2011|date=31 October 2011')]]

In [7]:
def get_ref_info(entity, full_ref_regex=re.compile(r'(<ref[^>]*[^/]>|<ref[ ]*>){{([^<]*)}}</ref')):

    
    text = entity.revision.text._VALUE
    # remove bot
    text = re.sub("(<!--.*?-->)", "", text, flags=re.MULTILINE)
    refs = full_ref_regex.findall(text)
    result = []
    for r in refs:
        ref_content = r[1].split(r"|")
        template = ref_content.pop(0).strip()
        properties = {}
        for field in ref_content:
            equal_index = field.find("=")
            field_name = field[0:equal_index].strip()
            field_value = field[equal_index+1:].strip()
            properties[field_name] = field_value
        result.append(Row(id=entity.id, 
                          template=template.lower(),  
                          url=properties.get("url", ""), 
                          title=properties.get("title")))
    return result

    """
    Input:
    entity:             RDD dataframe from data/**wiki-20181101-pages-articles-multistream-schema 
                        loaded with XmlWiki.py class
    full_ref_regex:     regex expression to extract a reference
    
    Return:
    result:             Spark dataframe with id, template, url, title
    """

# Top domains in the references

In [8]:
references_rrd = articles_2.rdd.flatMap(get_ref_info)

references = sqlContext.createDataFrame(references_rrd)

In [9]:
references.show()

+----+------------------+--------------------+--------------------+
|  id|          template|               title|                 url|
+----+------------------+--------------------+--------------------+
|1050|cite press release|India stats: Mill...|http://pibmumbai....|
|1067|          cite web|''National Anthem...|http://india.gov....|
|1067|          cite web|Constituent Assem...|http://parliament...|
|1067|          cite web|''National Song''...|http://india.gov....|
|1067|          cite web|   India at a Glance|http://india.gov....|
|1067|          cite web|India at a glance...|http://censusindi...|
|1067|          cite web|               India|http://www.imf.or...|
|1067|          cite web|Field Listing - D...|https://www.cia.g...|
|1067|          cite web| Total Area of India|http://lcweb2.loc...|
|1067|          cite web|Ethlologue report...|http://www.ethnol...|
|1067|          cite web|           Hindustan|http://www.britan...|
|1080|         cite book|ਗੁਰ ਸ਼ਬਦ ਰਤਨਾਕਰ ਮ...|  

**URL parsing**

Parse a URL into six components, returning a 6-tuple. This corresponds to the general structure of a URL: scheme://netloc/path;parameters?query#fragment. Each tuple item is a string, possibly empty.

EXAMPLE: 
from urllib.parse import urlparse

> o = urlparse('http://www.cwi.nl:80/%7Eguido/Python.html')

> o  

ParseResult(scheme='http', netloc='www.cwi.nl:80', path='/%7Eguido/Python.html',
            params='', query='', fragment='')
> o.scheme

'http'

> o.port

80

> o.geturl()

'http://www.cwi.nl:80/%7Eguido/Python.html'

We want to find the top URLs. To do so, we use the `urlparse` function and apply it on each row of references.

In [11]:
mapped_domains = references.where("length(url)>0")
#£mapped_domains(row['url'])
#not_corrupted_recs.collect()[0].not_corrupted_rec
mapped_domains.select("url").take(2
)

[Row(url='http://pibmumbai.gov.in/scripts/detail.asp?releaseId=E2011IS3'),
 Row(url='http://india.gov.in/knowindia/national_anthem.php')]

In [None]:
mapped_domains = references.where("length(url)>0").rdd.map(get_domain).filter(lambda row: row is not None)


In [None]:
from urllib.parse import urlparse
# only get netloc which is the web address
def get_domain(row):
    try:
        parsed_uri = urlparse(row['url'])
        return ('{uri.netloc}'.format(uri=parsed_uri), 1)
    except:
        return None
    
mapped_domains = references.where("length(url)>0").rdd.map(get_domain).filter(lambda row: row is not None)
domains_count = mapped_domains.reduceByKey(lambda a,b: a+b).filter(lambda r: len(r[0])>0).sortBy(lambda r: -r[1])
domains_count.take(30)

Total number of links:

In [None]:
from operator import add
domains_count.map(lambda r: r[1]).reduce(add)

In [None]:
def to_row(rdd):
    return Row(domain=rdd[0], count=rdd[1], perc=rdd[1]/18908198)

domains_distribution = sqlContext.createDataFrame(domains_count.map(to_row)).sort("count", ascending=False)
domains_distribution.show()

In [None]:
dd = domains_distribution.toPandas()

In [None]:
dd.head(15).plot(kind='bar',
                  x='domain',
                  y='perc',
                  figsize=(12,7))

---

# Get the pageviews by page

In this preliminary analysis, we use the table 'webrequest' to get the number of times a page is loaded and to evaluate the click rate. In the next data collection, we will have a 'page-load' event.

Get the pageviews by day (step 1) and aggregate (step 2).

In [None]:
# User must be NOT loggedIn, not a bot and the view must be in the English version of Wikipedia
# The views are aggregated by page, country, and access method

start_date = date(2018, 6, 29)
end_date = date(2018, 7, 9)

# step 1
links_query = """
select page_id, geocoded_data.continent, geocoded_data.country_code, access_method, count(*) as pageviews
from wmf.webrequest 
where day = {}
AND month = {}
AND year = {}
AND x_analytics_map['loggedIn'] is NULL
AND namespace_id = 0
AND agent_type = 'user'
AND is_pageview = TRUE
AND (uri_host = 'en.wikipedia.org' OR uri_host = 'en.m.wikipedia.org')
AND access_method <> 'mobile app'
group by page_id, geocoded_data.continent, geocoded_data.country_code, access_method
"""

pageviews = sc.emptyRDD()
for d in daterange(start_date, end_date):
    views_info = spark.sql(links_query.format(d.day, d.month, d.year))
    pageviews = pageviews.union(views_info.rdd)


# Convert the access_method string to be consistent with the events table
def convert_name(row):
    mode = 'desktop'
    if row['access_method'].startswith('mobile'):
        mode = 'mobile'
    return Row(page_id=row['page_id'], 
               continent=row['continent'], 
               country_code=row['country_code'], 
               access_method=mode,
               pageviews=row['pageviews']
              )


views_info_daily = sqlContext.createDataFrame(pageviews.map(convert_name).filter(lambda row: row is not None))
views_info_daily.registerTempTable("views_info_daily")

# step 2
aggregate_query = """
select page_id, continent, country_code, access_method, sum(pageviews) as pageviews
from views_info_daily
group by page_id, continent, country_code, access_method
"""

views_info = spark.sql(aggregate_query).cache()


# Count the number of visit with at least 1 extClick events


In [None]:
start_date = date(2018, 6, 29)
end_date = date(2018, 7, 9)

# step 1
events_query = """
select event.page_id, geocoded_data.continent, geocoded_data.country_code, 
        event.mode, count(distinct(event.session_token)) total 
from event.citationusage
where wiki = 'enwiki'
AND day = {}
AND month = {}
AND year = {}
AND useragent.is_bot = FALSE
and event.in_infobox = FALSE
and event.footnote_number IS NOT NULL
and event.action = 'extClick'
group by event.page_id, geocoded_data.continent, geocoded_data.country_code, event.mode
"""

events_rdd = sc.emptyRDD()
for d in daterange(start_date, end_date):
    daily_events = spark.sql(events_query.format(d.day, d.month, d.year))
    events_rdd = events_rdd.union(daily_events.rdd)

events_merged = sqlContext.createDataFrame(events_rdd)
events_merged.registerTempTable("events_merged")

# step 2
aggregate_query = """
select page_id, continent, country_code, mode, sum(total) extClick_count
from events_merged
group by page_id, continent, country_code, mode
"""

clicks_count = spark.sql(aggregate_query)

# Get the click rate

Step 1: Filter out all the articles that do NOT contain any reference.

In [None]:
views_info.registerTempTable('views_info')
references_count.registerTempTable('references_count')

# join to filter out the pages without references
pages_with_refs_sql = """
select vi.*, rc.refs_count
from views_info vi
join references_count rc
on vi.page_id = rc.id
where rc.refs_count > 0
"""

pages_with_refs = spark.sql(pages_with_refs_sql)
pages_with_refs.cache().show()

Step 2: join with the external clicks count. Left join, set 0 clicks when there are no events.

In [None]:
clicks_count.registerTempTable('clicks_count')
pages_with_refs.registerTempTable('pages_with_refs')


pages_with_refs_sql = """
select pr.*,
    CASE WHEN ce.extClick_count is NULL THEN 0 ELSE extClick_count END AS extClick_count
from pages_with_refs pr
left join clicks_count ce
on pr.page_id = ce.page_id
and pr.continent = ce.continent
and pr.country_code = ce.country_code
and pr.access_method = ce.mode
"""

all_info = spark.sql(pages_with_refs_sql)

all_info.registerTempTable('all_info')
with_ratio_sql = """
select *, extClick_count / pageviews as clickrate
from all_info
"""

all_info = spark.sql(with_ratio_sql)
all_info.cache().show()

Get pages with events:

In [None]:
all_info.where('extClick_count>0').count()

### Get click rate by country 

In [None]:
all_info.registerTempTable('all_info')

avg_by_country = """
select country_code, access_method, avg(clickrate) clickrate_avg, count(*) count, sum(pageviews) total_views
from all_info
group by country_code, access_method
sort by clickrate_avg desc
"""

clickrate_by_country = spark.sql(avg_by_country)


In [None]:
cr = clickrate_by_country.toPandas()

Remove countries where there are less that 100 views in 1 week:

In [None]:
cr = cr[cr.total_views>100].sort_values(by='clickrate_avg', ascending=False)

** Top 15 **

In [None]:
cr[cr.access_method=='desktop'].head(15).plot(kind='bar', 
                                              x='country_code', 
                                              y='clickrate_avg',
                                              figsize=(12,7))

** Last 15 **

In [None]:
cr[cr.access_method=='desktop'].tail(15).plot(kind='bar', 
                                              x='country_code', 
                                              y='clickrate_avg',
                                              figsize=(12,7))

# Domains analysis - extClick events

In [None]:
from urllib.parse import urlparse

def get_domain(row):
    try:
        parsed_uri = urlparse(row['link_url'])
        return ('{uri.netloc}'.format(uri=parsed_uri), 1)
    except:
        return None

start_date = date(2018, 6, 29)
end_date = date(2018, 7, 9)

# step 1
events_query = """
select event.link_url, 1 as count
from event.citationusage
where wiki = 'enwiki'
AND day = {}
AND month = {}
AND year = {}
AND useragent.is_bot = FALSE
and event.in_infobox = FALSE
and event.footnote_number IS NOT NULL
and event.action = 'extClick'
"""

links_rdd = sc.emptyRDD()
for d in daterange(start_date, end_date):
    daily_events = spark.sql(events_query.format(d.day, d.month, d.year))
    links_rdd = links_rdd.union(daily_events.rdd)


links_rdd = links_rdd.map(get_domain)\
            .filter(lambda row: row is not None)\
            .reduceByKey(lambda a,b: a+b)\
            .filter(lambda r: len(r[0])>0).map(lambda r: Row(domain=r[0], count=r[1]))
    
links_merged = sqlContext.createDataFrame(links_rdd).sort("count", ascending=False)
links_merged.show()

In [None]:
click_urls = links_merged.toPandas()

** Top 15 **

In [None]:
click_urls.head(15).plot(kind='bar', 
                  x='domain', 
                  y='count',
                  figsize=(12,7))