# Set up session

In [None]:
!pip install --upgrade numpy
!pip install git+https://github.com/wikimedia/wmfdata-python.git@release
!pip install findspark

In [1]:
import os
import re

import numpy as np
import wmfdata
import wmfdata.spark as wmfspark

import findspark
findspark.init('/usr/lib/spark2')
from graphframes import *
from pyspark.sql import SparkSession

In [2]:
os.environ['PYSPARK_DRIVER_PYTHON'] = 'notebook'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.5'

spark_config = {
    ## this configuration adds graphframes
    'spark.driver.extraJavaOptions':
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()),
    'spark.jars.packages':'graphframes:graphframes:0.6.0-spark2.3-s_2.11'
}

# Make sure to update with your username
# Easy to find in https://yarn.wikimedia.org/cluster/scheduler for tracking progress then
spark = wmfspark.get_session(
    type='large',
    app_name='Pyspark notebook (<username> -- pagerank)',
    extra_settings=spark_config
)
spark

# Build Outlinks Dataset

In [3]:
snapshot = '2020-07'  # data will be current to this date -- e.g., 2020-07 means data is up to 30 June 2020 (at least)
wiki = 'enwiki'  # wikidb you want to run pagerank for

## Gather edges dataset

In [31]:
print_for_hive = False
do_execute = True

query = """
WITH title_to_id AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     WHERE page_namespace = 0
           AND snapshot = '{0}'
           AND wiki_db = '{1}'
),
redirects AS (
    SELECT mr.rd_from AS rd_from,
           tti.page_id AS rd_to,
           mr.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_redirect mr
     INNER JOIN title_to_id tti
           ON (mr.rd_title = tti.page_title
               AND mr.wiki_db = tti.wiki_db)
     WHERE mr.snapshot = '{0}'
           AND mr.rd_namespace = 0
           AND mr.wiki_db = '{1}'
),
pagelinks_reformatted AS (
    SELECT pl.pl_from AS pl_from,
           tti.page_id AS pl_to,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNER JOIN title_to_id tti
           ON (pl.pl_title = tti.page_title
               AND pl.wiki_db = tti.wiki_db)
      LEFT ANTI JOIN redirects r
           ON (pl.pl_from = r.rd_from
               AND pl.wiki_db = r.wiki_db)
     WHERE snapshot = '{0}'
           AND pl_from_namespace = 0
           AND pl_namespace = 0
           AND pl.wiki_db = '{1}'
)
    SELECT DISTINCT pl.pl_from AS src,
           COALESCE(r.rd_to, pl.pl_to) AS dst
      FROM pagelinks_reformatted pl
      LEFT JOIN redirects r
           ON (pl.pl_to = r.rd_from
               AND pl.wiki_db = r.wiki_db)
""".format(snapshot, wiki)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    src_dst = spark.sql(query)
    src_dst.createOrReplaceTempView("src_dst")


WITH title_to_id AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     WHERE page_namespace = 0
           AND snapshot = '2020-07'
           AND wiki_db = 'enwiki'
),
redirects AS (
    SELECT mr.rd_from AS rd_from,
           tti.page_id AS rd_to,
           mr.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_redirect mr
     INNER JOIN title_to_id tti
           ON (mr.rd_title = tti.page_title
               AND mr.wiki_db = tti.wiki_db)
     WHERE mr.snapshot = '2020-07'
           AND mr.rd_namespace = 0
           AND mr.wiki_db = 'enwiki'
),
pagelinks_reformatted AS (
    SELECT pl.pl_from AS pl_from,
           tti.page_id AS pl_to,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNER JOIN title_to_id tti
           ON (pl.pl_title = tti.page_title
               AND pl.wiki_db = tti.wiki_db)
      LEFT ANTI JOIN redirects r
           ON (pl.pl_from = r.rd_from
               AND pl.wiki_

In [32]:
src_dst.show(n=10)

+--------+----+
|     src| dst|
+--------+----+
|17178107|3149|
|59811191|4348|
|30270910|4348|
|11117831|4348|
|48534267|4348|
| 5795377|4348|
| 7928030|4348|
| 5765223|4348|
|42045679|4348|
| 6165691|9334|
+--------+----+
only showing top 10 rows



## Gather node metadata

In [33]:
print_for_hive = False
do_execute = True

query = """
WITH all_pageids AS (
    SELECT DISTINCT(page_id)
      FROM (
        SELECT src as page_id
          FROM src_dst
         UNION ALL
        SELECT dst as page_id
          FROM src_dst
          ) p
),
pageid_to_title AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     WHERE page_namespace = 0
           AND snapshot = '{0}'
           AND wiki_db = '{1}'
)
SELECT p.page_id as id,
       t.page_title as page_title
  FROM all_pageids p
  LEFT JOIN pageid_to_title t
            ON (p.page_id = t.page_id)
""".format(snapshot, wiki)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    nodes = spark.sql(query)
    nodes.createOrReplaceTempView("nodes")


WITH all_pageids AS (
    SELECT DISTINCT(page_id)
      FROM (
        SELECT src as page_id
          FROM src_dst
         UNION ALL
        SELECT dst as page_id
          FROM src_dst
          ) p
),
pageid_to_title AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     WHERE page_namespace = 0
           AND snapshot = '2020-07'
           AND wiki_db = 'enwiki'
)
SELECT p.page_id as id,
       t.page_title as page_title
  FROM all_pageids p
  LEFT JOIN pageid_to_title t
            ON (p.page_id = t.page_id)



In [34]:
nodes.show(n=10)

+-----+--------------------+
|   id|          page_title|
+-----+--------------------+
| 1365|             Ammonia|
| 1990|            August_5|
| 2835|        Afghan_Hound|
| 2851|Abraham_Joshua_He...|
| 7312|          Chauvinism|
| 9762|  Ecumenical_council|
| 9890|   Electron_counting|
|10696|Military_of_the_F...|
|14392|            Howitzer|
|15392| Imperial_Conference|
+-----+--------------------+
only showing top 10 rows



## Run PageRank

In [36]:
## create graph object
g = GraphFrame(nodes, src_dst)

In [37]:
g.inDegrees.show(n=10)

+--------+--------+
|      id|inDegree|
+--------+--------+
|14625636|      72|
| 6815074|      95|
|   37299|   12926|
|  912025|    1900|
|34446095|     493|
|54645354|       1|
|  614284|     665|
|63426542|      31|
|  109900|      38|
|21830778|     320|
+--------+--------+
only showing top 10 rows



In [38]:
# See: https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.pageRank
# Hyperparameters:
#   - resetProbability (inverse of damping factor: https://en.wikipedia.org/wiki/PageRank#Damping_factor)
#     - most sources suggest it should be 0.15
#   - maxIter is set to 40 here as that is the parameter used in: https://www.aifb.kit.edu/images/e/e5/Wikipedia_pagerank1.pdf
#     - you could also set the tolerance to 0.01 but I don't know how long that takes to converge for enwiki
# This shouldn't take more than 20-30 minutes for English Wikipedia
# There will be k jobs you can track at https://yarn.wikimedia.org/cluster/scheduler where k is the number of iterations
pr = g.pageRank(resetProbability=0.15, maxIter=40)
result = pr.vertices.sort('pagerank', ascending=False)
result.createOrReplaceTempView('pagerank')

## Write to HDFS

In [39]:
# write pagerank results to TSV
query = """
SELECT pr.id as page_id,
       pr.pagerank as pagerank,
       n.page_title as page_title
  FROM pagerank pr
  LEFT JOIN nodes n
       ON (pr.id = n.id)
"""
results = spark.sql(query)
# this will write to 512 bzipped TSVs -- they can be easily compiled into 1 via Python or just use .coalesce(1) here
# to pull onto stat machines: stat100x$ hdfs dfs -copyToLocal /user/isaacj/pagerank-enwiki/part* .
results.write.csv(path="/user/isaacj/pagerank-{0}".format(wiki), compression="bzip2", header=True, sep="\t")