# PREAMBLE

In [None]:
import cfg

import gzip
import json
import os
import re

import parallel_stream as ps
import progress_bar as pb

# READ THE WIKIPEDIA DUMP TO EXTRACT TITLES AND BOLDS

In [None]:
from BeautifulSoup import BeautifulSoup
from HTMLParser import HTMLParser

import wikiextractor.WikiExtractor as WE

## CUSTOMIZED PAGE PARSER

In [None]:
_html_parser = HTMLParser()

# procedure used to read the input copyied from WikiExtractor
def pages_from(row_iterator):
    """
    Scans input extracting pages.
    :return: (page_id, rev_id, page_title, redirect_title, page_rows), where page is a list of lines.
    """
    # we collect individual lines, since str.join() is significantly faster than concatenation

    # page details
    page_rows = []
    page_id = None
    page_title = None
    rev_id = None
    redirect_title = None

    # support variables
    last_page_id = None
    inside_page = False
    inside_text = False

    # loop over the rows
    for line in row_iterator:
        if not isinstance(line, WE.text_type):
            line = line.decode('utf-8')
        # check if the line can contain a tag
        if '<' not in line:  # faster than doing re.search()
            if inside_text:
                page_rows.append(line)
            continue

        # check if the line contain a tag
        m = WE.tagRE.search(line)
        if not m:
            continue
        tag = m.group(2)

        if tag == 'page':
            if inside_page:
                WE.logging.warning("pages_from: After page_id {} nested page tag found".format(last_page_id))
            page_rows = []
            page_id = None
            page_title = None
            rev_id = None
            redirect_title = None
            inside_page = True

        elif not inside_page:
            WE.logging.warning("pages_from: After page_id {} found a tag out of the page. Line: {}".format(last_page_id, line))
            continue

        elif tag == '/text':
            if not inside_text:
                WE.logging.warning("pages_from: After page_id {} the tag </text> has been found, but not <text>".format(last_page_id))
            if m.group(1):
                page_rows.append(m.group(1))
            inside_text = False

        elif inside_text:
            page_rows.append(line)

        elif tag == 'text':
            if m.lastindex == 3 and line[m.start(3)-2] == '/': # self closing: <text xml:space="preserve" />
                continue
            page_rows.append(line[m.start(3):m.end(3)])
            inside_text = (m.lastindex != 4)  # open-close

        elif tag == 'id':
            if page_id is None:
                page_id = m.group(3)
            elif rev_id is None:
                rev_id = m.group(3)

        elif tag == 'title':
            if page_title is not None:
                WE.logging.warning("pages_from: After page_id {} page_title was already set ({} => {})".format(last_page_id, page_title, m.group(3)))
            page_title = m.group(3)

        elif tag == 'redirect':
            l, r = line.find("\"")+1, line.rfind("\"")
            if l < r:
                if redirect_title is not None:
                    WE.logging.warning("pages_from: After page_id {} redirect_title was already set ({} => {})".format(last_page_id, redirect_title, line[l:r]))
                redirect_title = _html_parser.unescape(line[l:r])

        elif tag == '/page':
            if page_id != last_page_id:
                yield (redirect_title, page_id, page_rows, page_title, page_rows)
                last_page_id = page_id
            else:
                WE.logging.warning("pages_from: After page_id {} there is another page with the same id (can it be inside?)".format(last_page_id))
            inside_page = False


## CUSTOMIZED EXTRACTOR

In [None]:
class MyExtractor(WE.Extractor):
    headings_re = re.compile("^\s*==(.*?)==\s*$")
    mention_bold_re = re.compile("\[START_MENTION_B\](.*?)\[END_MENTION_B\]")
    mention_bolditalic_re = re.compile("\[START_MENTION_BI\](.*?)\[END_MENTION_BI\]")

    def __init__(self, page_id, page_revid, page_title, page_rows):
        new_page_rows = []
        for line in page_rows:
            line = line
            if MyExtractor.headings_re.match(line):
                break
            new_page_rows.append(
                WE.bold.sub(
                    r'[START_MENTION_B]\1[END_MENTION_B]',
                    WE.bold_italic.sub(
                        r'[START_MENTION_BI]\1[END_MENTION_BI]',
                        line
                    )
                )
            )
        super(MyExtractor, self).__init__(page_id, page_revid, page_title, new_page_rows)

    def write_output(self, out, text):
        matches = set(
            (
                BeautifulSoup(match).getText() if "<" in match else match  # fix for nested markup inside templates
            ).strip().replace("\t", " ")
            for line in text
            for match in (MyExtractor.mention_bold_re.findall(line) + MyExtractor.mention_bolditalic_re.findall(line))
        )
        matches.discard("")

        out.update(matches)

## INIT THE LIBRARY

In [None]:
if not os.path.isdir(cfg.processed_dir + "wikipedia_raw/"):
    os.mkdir(cfg.processed_dir + "wikipedia_raw/")

if not os.path.isdir(cfg.processed_dir + "wikidata_raw/"):
    os.mkdir(cfg.processed_dir + "wikidata_raw/")

In [None]:
path_template_input = cfg.processed_dir + "enwiki-latest-pages-articles_parts/part_{{}}_{}.xml.gz".format(cfg.wiki_preprocessing_split_into)
path_template_aliases = cfg.processed_dir + "wikipedia_raw/aliases.part_{{}}_{}.tsv.gz".format(cfg.wiki_preprocessing_split_into)
path_template_redirects = cfg.processed_dir + "wikipedia_raw/redirects.part_{{}}_{}.tsv.gz".format(cfg.wiki_preprocessing_split_into)

In [None]:
%%time
# collect siteinfo to set the options (copyied from wikiextractor)
for line in gzip.open(path_template_input.format(1), "r"):
    # When an input file is .bz2 or .gz, line can be a bytes even in Python 3.
    if not isinstance(line, WE.text_type):
        line = line.decode('utf-8')
    m = WE.tagRE.search(line)
    if not m:
        continue
    tag = m.group(2)
    if tag == 'base':
        # discover urlbase from the xml dump file
        # /mediawiki/siteinfo/base
        base = m.group(3)
        WE.options.urlbase = base[:base.rfind("/")]
    elif tag == 'namespace':
        mk = WE.keyRE.search(line)
        WE.options.knownNamespaces[m.group(3)] = mk.group(1) if mk else ''
        if re.search('key="10"', line):
            WE.options.templateNamespace = m.group(3)
            WE.options.templatePrefix = WE.options.templateNamespace + ':'
        elif re.search('key="828"', line):
            WE.options.moduleNamespace = m.group(3)
            WE.options.modulePrefix = WE.options.moduleNamespace + ':'
    elif tag == '/siteinfo':
        break

# set the other options
WE.options.expand_templates = True
WE.options.filter_disambig_pages = True
WE.options.toHTML = False
# set the logger level to avoid warning (mainly due to template substitutions)
WE.logging.basicConfig(filename="/tmp/wikipedia_processing.log", filemode="w", level=WE.logging.WARN, format="%(asctime)s|%(levelname)s|%(message)s")

## LOAD THE TEMPLATES USED TO EXPAND THE MACROS

Because of a bug into wikiextractor we disable this useful option

## EXTRACT RAW REDIRECTS, TITLES AND BOLDS INTO INTERMEDIATE FILES

In [None]:
import signal

class Timer(object):
    """Timer class using ALARM signal."""

    class TimeoutException(Exception):
        def __init__(self, message="TimeoutException"):
            super(Timer.TimeoutException, self).__init__(message)

    def __init__(self, seconds):
        assert isinstance(seconds, (int, long)) and seconds > 0
        self._seconds = seconds

    def __enter__(self):
        # set alarm handler
        signal.signal(signal.SIGALRM, self.raise_timeout_exception)
        # set alarm
        signal.alarm(self._seconds)

    def __exit__(self, *args):
        # disable alarm
        signal.alarm(0)

    def raise_timeout_exception(self, *args):
        WE.logging.error(u"TimeoutException raised")
        raise Timer.TimeoutException()

In [None]:
def _get_emitter(part):
    def _emitter(outqueue):
        with gzip.open(path_template_input.format(part), "r") as infile:
            # skip the header on the first file
            if part == 1:
                for line in infile:
                    if line == "  </siteinfo>\n":
                        break

            # variables used by the next loop
            buffer_rows = []
            buffer_chars = 0
            buffer_max_chars = 256 * 1024

            for line in infile:
                if line == "  <page>\n":
                    # put the buffer into the outqueue when it is big enough
                    if buffer_chars >= buffer_max_chars:
                        outqueue.put(buffer_rows)
                        buffer_rows = []
                        buffer_chars = 0

                buffer_rows.append(line)
                buffer_chars += len(line)

            # put the remaining part into the outqueue
            if buffer_chars > 0:
                outqueue.put(buffer_rows)
                buffer_rows = []
                buffer_chars = 0
    return _emitter

def _worker(worker_id, inqueue, outqueue):
    # support set used to accumulate the aliases
    timer = Timer(seconds=60)
    aliases_raw = set()

    for buffer_rows in inqueue:
        # pages_from goes over the portion of xml and yields page_data each time a page is recognized
        for page_data in pages_from(buffer_rows):
            (page_redirect_title, page_id, page_revid, page_title, page_rows) = page_data

            try:
                if page_redirect_title is None:  # aliases
                    # reset the set
                    aliases_raw.clear()

                    # extract the useful informations using my extractor
                    # use a timer to block the Extractor when it reaches a self-loop state (issue of the library)
                    with timer:
                        e = MyExtractor(page_id, page_revid, page_title, page_rows)
                        e.extract(aliases_raw)
                    aliases_raw.discard(page_title)

                    # normalize the title
                    #page_title = normalize_text(page_title)

                    # normalize the aliases
                    #aliases_raw.add(page_title)
                    #aliases = normalize_aliases_raw(aliases_raw)
                    #aliases.discard(page_title)

                    op = 0
                    #line = "{}\t{}\t{}\n".format(page_id, page_title, "\t".join(aliases))
                    line = u"{}\t{}\t{}\n".format(page_id, page_title, u"\t".join(aliases_raw))

                    # free memory
                    del e
                    #del aliases
                else:  # redirects

                    op = 1
                    #line = "{}\t{}\t{}\n".format(page_id, normalize_text(page_title), normalize_text(page_redirect_title))
                    line = u"{}\t{}\t{}\n".format(page_id, page_title, page_redirect_title)

                outqueue.put((op, line.encode('utf-8')))
                del line
            except Timer.TimeoutException:
                WE.logging.error(u"TimeoutException in the worker at page_id '{}' (page_title '{}')".format(page_id, page_title))
            except Exception as e:
                WE.logging.error(u"Exception in the worker at page_id '{}' (page_title '{}'): '{}'".format(page_id, page_title, str(e)))

            # free memory
            del page_rows
            del page_data
        del buffer_rows

def _get_collector(part, pb, total_count):
    def _collector(inqueue):
        with gzip.open(path_template_aliases.format(part), "w") as aliases_file,\
        gzip.open(path_template_redirects.format(part), "w") as redirects_file:
            internal_count = total_count  # fix: we need to do this assignment. See: https://stackoverflow.com/questions/7535857/why-doesnt-this-closure-modify-the-variable-in-the-enclosing-scope/7535919#7535919
            # write on file the outputs
            for op, line in inqueue:
                (aliases_file if op == 0 else redirects_file).write(line)
                pb.increase()
                internal_count += 1

                if internal_count % 50000 == 0:
                    WE.logging.warn("Processed {} pages".format(internal_count))
            return internal_count
    return _collector

# start the parallel computation
pb_files = pb.ProgressBar(size=cfg.wiki_preprocessing_split_into, labeling_fun={"prefix":"Files"})
pb_collector = pb.ProgressBar(every=500)
try:
    # there are about 17M pages in the dump (17.146.932 <page> 17.152.607 </page> 17.150.956 </text>)
    overall_count = 0
    for part in xrange(1, cfg.wiki_preprocessing_split_into+1):
        WE.logging.warn("Start to process part {} of {}".format(part, cfg.wiki_preprocessing_split_into))

        iter_count = ps.parallel_stream(
            _get_emitter(part),
            _worker,
            _get_collector(part, pb_collector, overall_count),
            emitter_output_chunk_size=1,
            worker_output_chunk_size=100,
            emitter_queue_size=100,
            collector_queue_size=100,
            fork_collector=False,
            n_jobs=-1
        )
        WE.logging.warn("Processed {} pages in the current part. Total processed: {}".format(iter_count - overall_count, iter_count))
        overall_count = iter_count
        pb_files.increase()
    pb_files.stop(True)
    pb_collector.stop(True)
except:
    pb_files.stop(False)
    pb_collector.stop(False)
    raise

# it lasts 1h 14min

In [None]:
del WE

# PROCESS WIKIDATA RAW FILE

In [None]:
# https://www.mediawiki.org/wiki/Wikibase/DataModel/JSON#Claims_and_Statements
assert os.path.isfile(cfg.raw_dir + "wikidata-all.json.gz")

In [None]:
def _emitter(outqueue):
    with gzip.open(cfg.raw_dir + "wikidata-all.json.gz", "r") as infile:
        # skil the first line that contains 
        infile.readline()

        for line in infile:
            outqueue.put(line)

def _worker(worker_id, inqueue, outqueue):
    aliases_raw = set()
    for line in inqueue:
        line = line.rstrip(",\n")
        if line == "]":
            continue
        entity = json.loads(line)

        # get entity id
        entity_id = entity['id'].encode("ascii")

        # get entity label
        label = ""
        if "en" in entity['labels']:
            label = entity['labels']["en"]["value"].replace("\t", " ")
            # I put the label into the set to avoid aliases which are duplicates
            aliases_raw.add(label)

        # get aliases
        if "en" in entity["aliases"]:
            aliases_raw.update(alias_obj["value"].replace("\t", " ") for alias_obj in entity["aliases"]["en"])

        if len(aliases_raw) == 0:
            continue

        # discard the label from the aliases, because it is put a part
        aliases_raw.discard(label)

        line = u"{}\t{}\t{}\n".format(entity_id, label, u"\t".join(alias_raw for alias_raw in aliases_raw))

        # send this aliases to the collector
        outqueue.put(line.encode('utf-8'))

        # reset the support set
        aliases_raw.clear()

def _collector(inqueue):
    with gzip.open(cfg.processed_dir + "wikidata_raw/aliases.tsv.gz", "w") as outfile:
        for line in pb.iter_progress(inqueue):
            outfile.write(line)

ps.parallel_stream(
    _emitter,
    _worker,
    _collector,
    emitter_output_chunk_size=100,
    worker_output_chunk_size=100,
    emitter_queue_size=100,
    collector_queue_size=100,
    fork_collector=False,
    n_jobs=-1
)
# it lasts 1hour 2min