# Set up session

In [1]:
spark

In [2]:
spark.sql("SET spark.sql.shuffle.partitions = 1024")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")  # necessary for dynamic wiki_db partition

DataFrame[key: string, value: string]

In [32]:
import bz2
import os
import re

import pandas as pd

# Build Outlinks Dataset

In [2]:
snapshot = '2020-12'  # data will be current to this date -- e.g., 2020-05 means data is up to 30 April 2020 (at least)
wd_snapshot = '2020-12-07'  # closest Wikidata item-page-link to data snapshot
database = 'isaacj'
table_name = 'outlinks_allwikis_202012'  # Hive table where data will be inserted

## Create table for data
You only have to do this once but re-running it won't overwrite data

In [5]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS {0}.{1} (
        qid_from     STRING  COMMENT 'Wikidata item ID of source Wikipedia article (e.g., Q42)',
        pid_from     INT     COMMENT 'Wikipedia page ID of source Wikipedia article (e.g., 8091)',
        qid_to       STRING  COMMENT 'Wikidata item ID of target Wikipedia article (e.g., Q3107329)',
        pid_to       INT     COMMENT 'Wikipedia page ID of target Wikipedia article (e.g., 478921)',
        wiki_db      STRING  COMMENT 'Wikipedia languauge edition (e.g., enwiki)'
    )
    STORED AS PARQUET
    """.format(database, table_name)

spark.sql(create_table_query)

DataFrame[]

## Compile data and insert it
* Processing:
 * Only Wikipedia wikis (via `hostname like '%wikipedia%'` in `wikipedia_projects` and subsequent inner joins)
 * Resolve redirects -- e.g., Barack Obama -> Chicago, Illinois (redirect) -> Chicago becomes Barack Obama -> Chicago
 * Join in Wikidata items -- this is a left join so pages without Wikidata items are retained

In [6]:
print_for_hive = False
do_execute = True

query = """
WITH wikipedia_projects AS (
        SELECT DISTINCT dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '{0}'
               AND hostname LIKE '%wikipedia%'
        ),
title_to_id AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     INNER JOIN wikipedia_projects wp
           ON (mp.wiki_db = wp.dbname)
     WHERE page_namespace = 0
           AND snapshot = '{0}'
),
redirects AS (
    SELECT mr.rd_from AS rd_from,
           tti.page_id AS rd_to,
           mr.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_redirect mr
     INNER JOIN title_to_id tti
           ON (mr.rd_title = tti.page_title
               AND mr.wiki_db = tti.wiki_db)
     WHERE mr.snapshot = '{0}'
           AND mr.rd_namespace = 0
),
pagelinks_reformatted AS (
    SELECT pl.pl_from AS pl_from,
           tti.page_id AS pl_to,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNER JOIN title_to_id tti
           ON (pl.pl_title = tti.page_title
               AND pl.wiki_db = tti.wiki_db)
      LEFT ANTI JOIN redirects r
           ON (pl.pl_from = r.rd_from
               AND pl.wiki_db = r.wiki_db)
     WHERE snapshot = '{0}'
           AND pl_from_namespace = 0
           AND pl_namespace = 0
),
pagelinks_redirects_resolved AS (
    SELECT DISTINCT pl.pl_from AS pl_from,
           COALESCE(r.rd_to, pl.pl_to) AS pl_to,
           pl.wiki_db AS wiki_db
      FROM pagelinks_reformatted pl
      LEFT JOIN redirects r
           ON (pl.pl_to = r.rd_from
               AND pl.wiki_db = r.wiki_db)
),
wikidata_ids AS (
    SELECT DISTINCT wiki_db,
           page_id,
           item_id
      FROM wmf.wikidata_item_page_link wd
     INNER JOIN wikipedia_projects p
           ON (wd.wiki_db = p.dbname)
     WHERE wd.snapshot = '{3}'
           AND wd.page_namespace = 0
    )
INSERT OVERWRITE TABLE {1}.{2}
SELECT wf.item_id AS qid_from,
       p.pl_from AS pid_from,
       wt.item_id AS qid_to,
       p.pl_to AS pid_to,
       p.wiki_db as wiki_db
  FROM pagelinks_redirects_resolved p
  LEFT JOIN wikidata_ids wf
       ON (p.pl_from = wf.page_id
           AND p.wiki_db = wf.wiki_db)
  LEFT JOIN wikidata_ids wt
       ON (p.pl_to = wt.page_id
           AND p.wiki_db = wt.wiki_db)
""".format(snapshot, database, table_name, wd_snapshot)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH wikipedia_projects AS (
        SELECT DISTINCT dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '2020-12'
               AND hostname LIKE '%wikipedia%'
        ),
title_to_id AS (
    SELECT page_id,
           page_title,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     INNER JOIN wikipedia_projects wp
           ON (mp.wiki_db = wp.dbname)
     WHERE page_namespace = 0
           AND snapshot = '2020-12'
),
redirects AS (
    SELECT mr.rd_from AS rd_from,
           tti.page_id AS rd_to,
           mr.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_redirect mr
     INNER JOIN title_to_id tti
           ON (mr.rd_title = tti.page_title
               AND mr.wiki_db = tti.wiki_db)
     WHERE mr.snapshot = '2020-12'
           AND mr.rd_namespace = 0
),
pagelinks_reformatted AS (
    SELECT pl.pl_from AS pl_from,
           tti.page_id AS pl_to,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNE

## Analytics

In [9]:
# What % of pages for each wiki have associated links?
print_for_hive = False
do_execute = True

query = f"""
WITH wikipedia_projects AS (
        SELECT DISTINCT dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '{snapshot}'
               AND hostname LIKE '%wikipedia%'
        ),
all_pages AS (
    SELECT page_id,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     INNER JOIN wikipedia_projects wp
           ON (mp.wiki_db = wp.dbname)
     WHERE page_namespace = 0
           AND NOT page_is_redirect
           AND snapshot = '{snapshot}'
),
pages_with_links (
    SELECT COUNT(DISTINCT(pl_from)) AS num_pages_with_links,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNER JOIN all_pages ap
           ON (pl.pl_from = ap.page_id
               AND pl.wiki_db = ap.wiki_db)
     WHERE snapshot = '{snapshot}'
           AND pl_from_namespace = 0
           AND pl_namespace = 0
     GROUP BY pl.wiki_db
),
all_page_count AS (
    SELECT wiki_db,
           COUNT(1) AS num_pages
      FROM all_pages
     GROUP BY wiki_db
)
SELECT ap.wiki_db,
       num_pages,
       num_pages_with_links
  FROM all_page_count ap
  LEFT JOIN pages_with_links pl
       ON (ap.wiki_db = pl.wiki_db)
  ORDER BY num_pages DESC
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)
    result.show(500, False)


WITH wikipedia_projects AS (
        SELECT DISTINCT dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '2020-12'
               AND hostname LIKE '%wikipedia%'
        ),
all_pages AS (
    SELECT page_id,
           wiki_db
      FROM wmf_raw.mediawiki_page mp
     INNER JOIN wikipedia_projects wp
           ON (mp.wiki_db = wp.dbname)
     WHERE page_namespace = 0
           AND NOT page_is_redirect
           AND snapshot = '2020-12'
),
pages_with_links (
    SELECT COUNT(DISTINCT(pl_from)) AS num_pages_with_links,
           pl.wiki_db AS wiki_db
      FROM wmf_raw.mediawiki_pagelinks pl
     INNER JOIN all_pages ap
           ON (pl.pl_from = ap.page_id
               AND pl.wiki_db = ap.wiki_db)
     WHERE snapshot = '2020-12'
           AND pl_from_namespace = 0
           AND pl_namespace = 0
     GROUP BY pl.wiki_db
),
all_page_count AS (
    SELECT wiki_db,
           COUNT(1) AS num_pages
      FROM all_pages
     GROUP BY wiki_db
)
SEL

In [7]:
# Number of unique source / target QIDs
outlink_counts_query = """
SELECT
  COUNT(DISTINCT(qid_from)) AS num_qids,
  COUNT(DISTINCT(qid_to)) AS vocab_size
  FROM {0}.{1}
""".format(database, table_name)
print(outlink_counts_query)
spark.sql(outlink_counts_query).show(n=500)


SELECT
  COUNT(DISTINCT(qid_from)) AS num_qids,
  COUNT(DISTINCT(qid_to)) AS vocab_size
  FROM isaacj.outlinks_allwikis_202012

+--------+----------+
|num_qids|vocab_size|
+--------+----------+
|20119210|  18037708|
+--------+----------+



In [7]:
# Data per wiki
outlink_counts_query = """
SELECT wiki_db,
       COUNT(1) AS num_outlinks,
       COUNT(DISTINCT(pid_from)) AS num_unique_src,
       COUNT(DISTINCT(qid_from)) AS num_src_with_qids,
       COUNT(DISTINCT(pid_to)) AS num_unique_trgt,
       COUNT(DISTINCT(qid_to)) AS num_trgt_with_qids
  FROM {0}.{1}
 GROUP BY wiki_db
 ORDER BY wiki_db
 LIMIT 1000
""".format(database, table_name)
print(outlink_counts_query)
spark.sql(outlink_counts_query).show(n=500)

+----------------+------------+--------------+-----------------+---------------+------------------+
|         wiki_db|num_outlinks|num_unique_src|num_src_with_qids|num_unique_trgt|num_trgt_with_qids|
+----------------+------------+--------------+-----------------+---------------+------------------+
|          abwiki|      212622|          6062|             6016|           5506|              5494|
|         acewiki|       56269|         10295|            10288|           2540|              2537|
|         adywiki|        1858|           398|              397|            370|               368|
|          afwiki|     3298881|         95425|            94959|          66321|             65952|
|          akwiki|        3252|          1160|             1139|            296|               296|
|         alswiki|      914355|         27476|            27403|          26164|             26106|
|          amwiki|      111860|         14733|            14715|          11459|             11420|


## Output to TSV

In [5]:
# Write to TSV; preserve wiki
gather_outlinks_query = """
SELECT wiki_db,
       pid_from,
       qid_from,
       CONCAT_WS(' ', COLLECT_SET(qid_to)) as outlinks
  FROM {0}.{1}
 WHERE qid_to IS NOT NULL
 GROUP BY wiki_db, pid_from, qid_from
""".format(database, table_name)
print(gather_outlinks_query)
output_tsv = spark.sql(gather_outlinks_query)
output_tsv.write.csv(path="/user/isaacj/all-outlinks-by-article", compression="bzip2", header=True, sep="\t")


SELECT wiki_db,
       pid_from,
       qid_from,
       CONCAT_WS(' ', COLLECT_SET(qid_to)) as outlinks
  FROM isaacj.outlinks_allwikis_202012
 WHERE qid_to IS NOT NULL
 GROUP BY wiki_db, pid_from, qid_from

