## Imports

In [109]:
import numpy as np 
import pandas as pd
import urllib
import re
from collections import defaultdict

import os
import shutil
import pickle

import wikitextprocessor as wtp

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType
from pyspark.accumulators import AccumulatorParam

conf = pyspark.SparkConf().setMaster("local[10]").setAll([
                                   ('spark.jars.packages', 'com.databricks:spark-xml_2.12:0.8.0'),
                                   ('spark.executor.memory', '4g'),
                                   ('spark.driver.memory','5g'),
                                   ('spark.driver.maxResultSize', '20G'),
                                   ('spark.executor.heartbeatInterval', '60s'),
                                   ('spark.network.timeout', '61s')
                                  ])
# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# create the context
sc = spark.sparkContext

22/05/01 21:42:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# sc.setLogLevel('DEBUG')

In [38]:
spark

In [4]:
# Inputs

COMMONS_DUMP = '/scratch/WikipediaImagesTaxonomy/dumps/commonswiki-20220220-pages-articles-multistream.xml.bz2'
CATEGORIES_DUMP = '/scratch/WikipediaImagesTaxonomy/dumps/commonswiki-20220220-categories-multistream.xml.bz2'
# COMMONS_DUMP_REDUCED = '../../commonswiki-20220220-pages-articles-multistream1.xml-p1p1500000.bz2'
# COMMONS_DUMP_REDUCED = '../../commonswiki-20220220-pages-articles-multistream6.xml-p114543930p115400363.bz2'

# Outputs (ideally in scratch/, but I don't have permissions)
CATEGORIES_PATH = '/scratch/WikipediaImagesTaxonomy/commonswiki-20220220-category-network.parquet'
FILES_PATH = '/scratch/WikipediaImagesTaxonomy/commonswiki-20220220-files.parquet'

In [56]:
# Adapted from https://github.com/epfl-dlab/WikiPDA/blob/master/PaperAndCode/TopicsExtractionPipeline/GenerateDataframes.py
def normalize_title(title, dumps=True):
    """ Replace _ with space, remove anchor and namespace prefix, capitalize """
    title = urllib.parse.unquote(title)
    if(dumps):
        try:
            title = title.split(':', 1)[1]
        # Currently happens only for broken cross-namespace redirects
        except IndexError:
            return ''
    title = title.strip()
    if len(title) > 0:
        title = title[0].upper() + title[1:]
    n_title = title.replace("_", " ")
    if '#' in n_title:
        n_title = n_title.split('#')[0]
    return n_title

## Categories

In [6]:
commons_categories_raw = spark.read.format('com.databricks.spark.xml') \
                                .options(rowTag='page').load(COMMONS_DUMP).filter("ns = '14'")
# commons_categories_raw.persist()

                                                                                

In [7]:
commons_categories_raw.printSchema()

root
 |-- id: long (nullable = true)
 |-- ns: long (nullable = true)
 |-- redirect: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _title: string (nullable = true)
 |-- revision: struct (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _deleted: string (nullable = true)
 |    |-- contributor: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _deleted: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- format: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- minor: string (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- parentid: long (nullable = true)
 |    |-- sha1: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |  

In [10]:
commons_categories_raw.write.mode("overwrite").parquet(CATEGORIES_DUMP)

                                                                                

In [70]:
commons_categories_raw = spark.read.parquet(CATEGORIES_DUMP)

In [71]:
# Build a dictionary of redirects (old_title -> redirect_title)
category_redirects = {normalize_title(r.title): normalize_title(r.redirect._title) 
                      for r in commons_categories_raw.filter('redirect is not null').collect()}
len(category_redirects)

                                                                                

751

In [72]:
categories_regex = re.compile('(?<!wpTextbox1\.value\+=\')(?<!wpTextbox1\.value=\')\[\[(Category:[^\|]*?)(?:\|.*?)*\]\]')
hiddencat_regex = re.compile('__HIDDENCAT__' + 
                             '|\{\{[hH]iddencat\}\}' + 
                             '|\{\{[uU]ser category.*?\}\}' +
                             '|\{\{[gG]lobal maintenance category\}\}' +
                             '|\[\[(Category:Categories for discussion[^\|]*?)(?:\|.*?)*\]\]' +
                             '|\[\[(Category:Media contributed by[^\|]*?)(?:\|.*?)*\]\]')

In [73]:
class ChildsAccumulator(AccumulatorParam):
    '''
    Accumulator for childs: a dictionary mapping each category to its childs
    '''
    def zero(self, value):
        return defaultdict(list)

    def addInPlace(self, val1, val2):
        for key, value in val2.items():
            val1[key] += value
        return val1

In [74]:
def extract_category(row):
    '''
    Extract the details of a category
    '''
    title = normalize_title(row.title)
    text = row.revision.text._VALUE
        
    parents = re.findall(categories_regex, text) if text else []
    parents = [category_redirects[normalize_title(parent)] if normalize_title(parent) 
               in category_redirects.keys() else normalize_title(parent) for parent in parents]
    global acc
    if parents:
        acc += {parent: [title] for parent in parents}
    return Row(
        id=row.id,
        title=title,
        parents=parents,
        hiddencat=re.search(hiddencat_regex, text) is not None if text else False
    )

In [75]:
# Schema of the processed categories DataFrame
schema_cat = StructType([StructField("id", IntegerType(), True),
                         StructField("title", StringType(), True),
                         StructField("parents", ArrayType(StringType()), True),
                         StructField("hiddencat", BooleanType(), True)])

In [76]:
# We ignore redirect categories, eventually remapping parents to their redirects
acc = sc.accumulator(defaultdict(list), ChildsAccumulator())
categories_clean = spark.createDataFrame(commons_categories_raw.filter('redirect is null')\
                                            .rdd.map(extract_category).filter(lambda r: r is not None), 
                                         schema=schema_cat)

# commons_categories_raw.unpersist()
# categories_clean.persist();

In [77]:
# Workaround for the fact that the value of acc is used before it is filled

TEMP_PATH = '../../dump.xml'

categories_clean.write.format("com.databricks.spark.xml").mode("overwrite")\
                                         .options(rowTag='page', rootTag='pages').save(TEMP_PATH)

# Remove files
shutil.rmtree(TEMP_PATH)

                                                                                

In [78]:
schema_childs = StructType([StructField('title', StringType(), True),
                            StructField('childs', ArrayType(StringType(), True), True)])

In [79]:
childs_df = spark.createDataFrame(acc.value.items(), schema=schema_childs)

In [80]:
categories = categories_clean.alias('c').join(childs_df, categories_clean.title==childs_df.title, how='left').select('c.*', 'childs')
# categories_clean.unpersist();
# categories.persist();

In [81]:
categories.write.mode("overwrite").parquet(CATEGORIES_PATH)

22/05/01 21:25:40 WARN TaskSetManager: Stage 7 contains a task of very large size (254960 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [82]:
categories = spark.read.parquet(CATEGORIES_PATH)

In [83]:
categories.show()

+---------+--------------------+--------------------+---------+------+
|       id|               title|             parents|hiddencat|childs|
+---------+--------------------+--------------------+---------+------+
| 37673751|"Bioagra" plant i...|[Goświnowice, Fac...|    false|  null|
|113462211|"Dancing Dervishe...|[Kamal-ud-din Bih...|    false|  null|
| 33938219|"Dardanelles", Po...|[Polkemmet collie...|    false|  null|
| 12707820|"Der Verwalter", ...|[Buildings in Dor...|    false|  null|
|112648762|"Doktorhaus" (Got...|[Cultural propert...|    false|  null|
|102833104|"Evaluarea impact...|[Photos from Parl...|    false|  null|
| 66089937|"Forever alive" m...|                  []|    false|  null|
| 89849682|"Gracias" rainbow...|[Rainbows, COVID-19]|    false|  null|
| 18536700|"Kniende" (Karl T...|[Statues in Berli...|    false|  null|
|109840454|     "Krym" roadster|[Roadsters by brand]|    false|  null|
| 88168182|"La Villa", Schwe...|[Houses in Schwei...|    false|  null|
|10827

In [84]:
categories.count()

11029650

In [85]:
hidden_categories = categories.filter('hiddencat is True').select('title').rdd.flatMap(lambda x: x).collect()

In [86]:
len(hidden_categories)

121711

## Files

In [110]:
commons_files_raw = spark.read.format('com.databricks.spark.xml') \
                                .option("inferSchema", "false")\
                                .schema(commons_categories_raw.schema)\
                                .options(rowTag='page').load(COMMONS_DUMP)\
                                .filter("ns = '6'")
# commons_files_raw.persist();

In [58]:
# Build a dictionary of redirects
file_redirects = {normalize_title(r.title): normalize_title(r.redirect._title)
                  for r in commons_files_raw.filter('redirect is not null').collect()}
len(file_redirects)

1903071

For now, we consider only the images that appear in en.wikipedia, discarding all the others. We can also ignore redirects.

In [61]:
# list of chunks of the WIT dataset
WIT_DATASET = [f'/scratch/WIT_Dataset/wit_v1.train.all-0000{str(i)}-of-00010.tsv.gz' for i in np.arange(0, 10)]
# WIT_DATASET = ['/scratch/WIT_Dataset/wit_v1.train.all-1percent_sample.tsv.gz']

WIT_NAMES = '/scratch/WikipediaImagesTaxonomy/wit_names.pkl'

In [58]:
wiki_image_names = []

for chunk in WIT_DATASET:
    wiki_image_names += pd.read_csv(chunk, sep="\t").query("language == 'en'")\
                            .image_url.apply(lambda r: normalize_title(r.split('/')[-1], False)).tolist()

In [65]:
# Keep only unique values
wiki_image_names = set(wiki_image_names)

# Remap redirects
wiki_image_names = {file_redirects[name] if name in file_redirects.keys() else name for name in wiki_image_names}

In [67]:
with open(WIT_NAMES, 'wb') as f:
    pickle.dump(wiki_image_names, f)

In [68]:
with open(WIT_NAMES, 'rb') as f:
    wiki_image_names = pickle.load(f)

In [111]:
len(wiki_image_names)

3935543

In [112]:
commons_files_raw.schema['title'].dataType

StringType

In [113]:
wiki_image_names_df = spark.createDataFrame(wiki_image_names, commons_files_raw.schema['title'].dataType)

In [115]:
wiki_image_names_df.count()

22/05/01 21:44:35 WARN TaskSetManager: Stage 0 contains a task of very large size (15492 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

3935543

In [207]:
normalize_title_udf = udf(lambda r: normalize_title(r))

In [218]:
wit_files_raw = commons_files_raw.withColumn('title_norm', normalize_title_udf(commons_files_raw.title))\
                                 .join(wiki_image_names_df, col('title_norm') == wiki_image_names_df.value)

In [88]:
def extract_file(row):
    '''
    Extract the details of a file
    '''
    text = row.revision.text._VALUE

    categories = re.findall(categories_regex, text) if text else []
    
    # No way to do this with a list comprehension (nested conditions work only if there is always an else)
    # Remap categories to their redirect and filter hidden categories
    categories_nohidd = []
    for category in categories:
        category_norm = normalize_title(category)
        if(category_norm not in hidden_categories):
            if(category_norm in category_redirects.keys()):
                if((c:=category_redirects[category_norm]) not in hidden_categories):
                    categories_nohidd.append(c)
            else:
                categories_nohidd.append(category_norm)

    return Row(
        id=row.id,
        title=normalize_title(row.title),
        categories=categories_nohidd
    )

In [36]:
# Schema of the processed files DataFrame
schema_files = StructType([StructField("id", IntegerType(), True),
                           StructField("title", StringType(), True),
                           StructField("categories", ArrayType(StringType()), True)])

In [37]:
# Also for files, we ignore redirects
files = spark.createDataFrame(wit_files_raw.filter('redirect is null')\
                                           .rdd.map(extract_file).filter(lambda r: r is not None), 
                              schema=schema_files)
# commons_files_raw.unpersist();

In [38]:
files.write.mode("overwrite").parquet(FILES_PATH)

                                                                                

In [39]:
files = spark.read.parquet(FILES_PATH)

In [40]:
files.show()

+---------+--------------------+--------------------+
|       id|               title|          categories|
+---------+--------------------+--------------------+
|114792323|PATRICK MAGO (179...|[Players of Mount...|
|114792324|RETURN TO REDFERN...|[Photographs by N...|
|114792325|View from the Sev...|         [Arlingham]|
|114792326|RETURN TO REDFERN...|[Photographs by N...|
|114792328|RETURN TO REDFERN...|[Photographs by N...|
|114792329| Marina Goliasse.jpg|                  []|
|114792330|Downtown Ferndale...|                  []|
|114792331|MATT PLACE (18392...|[Players of Mount...|
|114792332|Benchmark on Citi...|[Richmond, North ...|
|114792334|Portrait photogra...|  [Joséphin Péladan]|
|114792335|SONNY BRISTOW (18...|[Players of Mount...|
|114792336|The River Ayr - g...|[Ayr (civil paris...|
|114792337|MATT PLACE (18370...|[Players of Mount...|
|114792338|Path to Coton - g...|[Coton, Cambridge...|
|114792339|MICHAEL MORRIS (1...|[Players of Mount...|
|114792340|On top of A'Bhuid

## Categories/2

In [41]:
# List of categories that appear in en.wikipedia
categories_in_wikipedia = files.rdd.flatMap(lambda x: x.categories).distinct().map(Row("title")).toDF()
categories_in_wikipedia = categories_in_wikipedia.withColumn('in_en_wiki', lit(True))

                                                                                

In [42]:
categories_in_wikipedia.show(5)

+--------------------+----------+
|               title|in_en_wiki|
+--------------------+----------+
| Santon, Isle of Man|      true|
|Christmas 2020 in...|      true|
|Pacific Sogo Depa...|      true|
|Players (men) by ...|      true|
|Songs of the Beatles|      true|
+--------------------+----------+
only showing top 5 rows



In [43]:
categories_in_wikipedia.count()

190009

In [44]:
categories = categories.alias('c').join(categories_in_wikipedia, 'title', 'left').select('c.*', categories_in_wikipedia.in_en_wiki)
categories = categories.na.fill(False, subset=["in_en_wiki"])

In [45]:
categories.filter('in_en_wiki == True').count()

1746

In [48]:
temp_path = '.'.join(CATEGORIES_PATH.split('.')[:-1]) + '-temp.parquet'
categories.write.mode("overwrite").parquet(temp_path)

shutil.rmtree(CATEGORIES_PATH)
os.rename(temp_path, CATEGORIES_PATH)

## Close

In [108]:
spark.stop()

22/05/01 21:42:28 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$3@490b6c5c rejected from java.util.concurrent.ThreadPoolExecutor@751468e9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1299]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
	at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:61)
	at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:815)
	at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:791)
	at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalS