In [1]:
from pathlib import Path


# # No evidence that template expanding helps in most cases, so going to skip it in favour of speed
wikiextractor_cmd_base = 'python -m wikiextractor.WikiExtractor --json -o -'
#wikiextractor_cmd_base = 'python -m wikiextractor.WikiExtractor --no-templates --json -o -'
filter_categories_by_keywords = ['Flight', 'Travel', 'Tourism', 'Aerospace', 'Airlines', 'Airports', 'Airfields', 'Aviation', 'Transport']
csv_file = "/home/dnk8n/Downloads/travel-wiki-extract-full-templates-processed.csv"

# exclude recombined gz and index files
#pattern = 'enwiki-20210720-pages-articles-multistream[0-9]*.xml*.bz2'  ## For some reason selecting multiple files bugs out
pattern = 'enwiki-20210720-pages-articles-multistream.xml.bz2'  ## Full wikipedia dump
#pattern = 'enwiki-20210720-pages-articles-multistream16.xml-p20460153p20570392.bz2'  ## Smallest file to dev on
wiki_dir = Path('/home/dnk8n/wikipedia')

In [2]:
import subprocess
from typing import Pattern, Union
import xml.sax
from xml.sax import SAXParseException
from xml.sax.expatreader import ExpatParser

In [3]:
import re

RE_CATEGORY = re.compile('\[\[Category\:([\w\-\s]+)\]\]')

class WikiXmlHandler(xml.sax.handler.ContentHandler):
    """Content handler for Wiki XML data using SAX"""
    def __init__(self, filter_categories_by=None):
        super().__init__()
        self._current_tag = []
        self._current_id = ""
        self._temp_holding_dict = {}
        self._filter_categories_by = filter_categories_by or []
        self.matching_docs = {}

    def startElement(self, name, attrs):
        """Opening tag of element"""
        self._current_tag.append(name)

    def characters(self, content):
        """Characters between opening and closing tags"""
        if self._current_tag:
            if self._current_tag[-1] == "id":
                outer_tag = self._current_tag[-2]
                if outer_tag == "page":                    
                    self._current_id = content
                elif outer_tag == "revision":
                    self._temp_holding_dict['revid'] = content
            elif self._current_tag[-1] == "timestamp":
                self._temp_holding_dict['timestamp'] = content
            elif self._current_tag[-1] == "text":
                category_match = RE_CATEGORY.match(content)
                if category_match:
                    category = category_match.group(1)
                    if self._temp_holding_dict.get('categories') is None:
                        self._temp_holding_dict['categories'] = []
                    self._temp_holding_dict['categories'].append(category)

    def endElement(self, name):
        """Closing tag of element"""
        if self._current_tag:
            if name == self._current_tag[-1]:
                ended_tag = self._current_tag.pop()
                if ended_tag == "page":
                    wiki_categories = self._temp_holding_dict.get('categories')
                    if wiki_categories:
                        if self._filter_categories_by:
                            if not any(substr.lower() in wiki_category.lower() for substr in self._filter_categories_by for wiki_category in wiki_categories):
                                self._temp_holding_dict = {}
                        if self._temp_holding_dict:
                            self._temp_holding_dict['categories'] = '\n'.join(self._temp_holding_dict['categories'])
                            self.matching_docs[self._current_id] = self._temp_holding_dict
                            self._temp_holding_dict = {}
                    else:
                        if self._filter_categories_by:
                            self._temp_holding_dict = {}
                        else:
                            self.matching_docs[self._current_id] = self._temp_holding_dict
                            self._temp_holding_dict = {}

In [4]:
def manualextract_bz_dir_serial(bz_dir: Union[str, Path], pattern: Union[str, Pattern], parser: ExpatParser):
    bz_dir_path = Path(bz_dir)
    assert bz_dir_path.is_dir()
    for f in bz_dir_path.glob(pattern):
        manualextract_bz_file(f)

In [5]:
def manualextract_bz_file(f: Union[str, Path]):
    f_path = Path(f)
    print('Processing file: ', f)
    assert f_path.is_file()
    for line in subprocess.Popen(
        ["bzcat"],
        stdin = f.open(),
        stdout = subprocess.PIPE
    ).stdout:
        try:
            parser.feed(line)
        except SAXParseException as e:
            print('error with file: ', f)
            break
        except StopIteration:
            break

In [6]:
import json

def wikiextract_bz(bz_dir: Union[str, Path], pattern: Union[str, Pattern]):
    bz_dir_path = Path(bz_dir)
    assert bz_dir_path.is_dir()
    for f in bz_dir_path.glob(pattern):
        wikiextractor_cmd = wikiextractor_cmd_base.split() + [f.as_posix()]
        print('wikiextractor_cmd: ', wikiextractor_cmd)
        for line in subprocess.Popen(wikiextractor_cmd, stdout=subprocess.PIPE).stdout:
            yield json.loads(line)

In [7]:
def merge_docs(main_docs, meta_docs):
    for doc in main_docs:
        id_ = doc['id']
        meta = meta_docs.pop(id_, None)
        if meta:
            yield {
                'id': id_,
                'url': doc['url'] + '&oldid=' + meta['revid'],
                'title': doc['title'],
                'text': doc['text'],
                **meta
            }
        if not meta_docs:
            break

In [8]:
handler = WikiXmlHandler(filter_categories_by_keywords)
parser = xml.sax.make_parser()
parser.setContentHandler(handler)

In [9]:
# %%time
# # Paralised
# from multiprocessing import Pool

# partitions = [f for f in wiki_dir.glob(pattern)]
# pool = Pool(processes = 3)
# pool.map(manualextract_bz_file, partitions)
# pool.close()
# pool.join()

In [10]:
%%time
# Serial
manualextract_bz_dir_serial(wiki_dir, pattern, parser)

Processing file:  /home/dnk8n/wikipedia/enwiki-20210720-pages-articles-multistream.xml.bz2
CPU times: user 1h 14min 15s, sys: 53.9 s, total: 1h 15min 9s
Wall time: 1h 31min 6s


In [11]:
%%time
# Naturally paralised
wikiextract_docs = wikiextract_bz(wiki_dir, pattern)

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 5.96 µs


In [12]:
%%time
import csv

csv_columns = ['id', 'url', 'title','text', 'revid', 'timestamp', 'categories']
try:
    with open(csv_file, 'w') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
        writer.writeheader()
        for data in merge_docs(wikiextract_docs, handler.matching_docs):
            writer.writerow(data)
except IOError:
    print("I/O error")

wikiextractor_cmd:  ['python', '-m', 'wikiextractor.WikiExtractor', '--json', '-o', '-', '/home/dnk8n/wikipedia/enwiki-20210720-pages-articles-multistream.xml.bz2']
CPU times: user 5min 44s, sys: 18.8 s, total: 6min 3s
Wall time: 3h 10min 27s


In [13]:
# TODO: Potentially locate category pages like below and traverse subcategories, thus expanding search of categories to filter by
#     https://en.wikipedia.org/wiki/Category:Flight
#     https://en.wikipedia.org/wiki/Category:Travel
#     https://en.wikipedia.org/wiki/Category:Tourism
#     https://en.wikipedia.org/wiki/Category:Aerospace
#     https://en.wikipedia.org/wiki/Category:Airlines
#     https://en.wikipedia.org/wiki/Category:Airports
#     https://en.wikipedia.org/wiki/Category:Airfields
#     https://en.wikipedia.org/wiki/Category:Aviation
#     https://en.wikipedia.org/wiki/Category:Transport
