## Process dump

In [1]:
%pip install pyarrow pandas tqdm pyre2

Note: you may need to restart the kernel to use updated packages.


In [7]:
from os.path import join
from sys import stdout
from subprocess import check_output
from tqdm import tqdm
import xml.etree.ElementTree as ET
from os.path import join
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

try:
    import re2 as re
except ImportError:
    import re
    print('[WARN] Not using re2')



def count_pages(filename: str) -> int:
    print(f'[INFO] Counting how many "<pages>" in \'{filename}\'')
    command = ['grep', '-wc', '<page>', join('./multistream/decompressed/', filename)]
    output = check_output(command).decode(stdout.encoding).strip()

    del command

    return int(output)


def revision(text: str):
    if text:
        # findall(r'\[\[(.*?)\]\]')                            > find all [[tag]]
        # sub(r'<ref>\[\[')                                    > remove <ref>([[tag]])
        # sub(r'<quot>\[\[')                                   > remove <quot>([[tag]])
        # sub(r'\[\[([^\[\]\n]*?\[\[.*?\]\][^\[\[\n]*?)+\]\]') > remove [[tag [[tag]]]]    
        # sub(r'\(.*?\)')                                      > remove ([[tag]])
        
        while True:
            i = start = text.find('[[')
            while i != -1:
                if text[i + 2:].find('[[') < text[i + 2:].find(']]'):
                    i = text[i + 2:].find(']]') + i + 2
                else:
                    break

            if start == i:
                break
            else:
                text = text[:start] + text[text[i + 2:].find(']]') + i + 4:]

        return re.findall(r'(?<!>)\[\[(.*?)\]\]', \
                          re.sub(r'{{.*?}}', '', \
                                 re.sub(r'\(.*?\)', '', text)))
    else:
        return []


def index_pages(filename: str, wikinamedate: str) -> None:
    total_pages = 2_632_633 #count_pages(filename)
    context = iter(ET.iterparse(join('./multistream/decompressed/', filename), events=('end',)))
    # Initialize variables
    chunk_size = 100_000  # Define the size of each chunk
    rows = []
    pqwriter = None

    # Create dir recursively
    # https://docs.python.org/3/library/os.html#os.makedirs
    os.makedirs(join('./output/', wikinamedate.replace('/', '-')), exist_ok=True)
    
    with tqdm(total=total_pages, unit=' pages', unit_scale=True, desc='[INFO] Processing pages', initial=0, file=stdout) as pbar:
        title, id, namespace = [None] * 3
        
        for event, elem in context:
            match elem.tag:
                case '{http://www.mediawiki.org/xml/export-0.11/}title':
                    title = elem.text
                case '{http://www.mediawiki.org/xml/export-0.11/}ns':
                    namespace = elem.text
                case '{http://www.mediawiki.org/xml/export-0.11/}id':
                    if id == None:
                        id = elem.text
                case '{http://www.mediawiki.org/xml/export-0.11/}text':
                    rows.append([title, id, namespace, revision(elem.text)])
                    id = None
                    
                    pbar.update()

            elem.clear()

            if len(rows) >= chunk_size:
                df_chunk = pd.DataFrame(rows, columns=['Page Title', 'Page ID', 'Page Namespace', 'Page References'])
                table = pa.Table.from_pandas(df_chunk)
                
                # Append to Parquet file
                if pqwriter == None:
                    pqwriter = pq.ParquetWriter(join('./output/', wikinamedate.replace('/', '-'), 'raw.parquet'), table.schema) 
                    
                pqwriter.write_table(table)
                
                rows = []  # Clear rows to free memory


    # Save remaining rows if any
    if rows:
        df_chunk = pd.DataFrame(rows, columns=['Page Title', 'Page ID', 'Page Namespace', 'Page References'])
        table = pa.Table.from_pandas(df_chunk)
    
        if pqwriter == None:
            pqwriter = pq.ParquetWriter(join('./output/', wikinamedate.replace('/', '-'), 'raw.parquet'), table.schema) 

        pqwriter.write_table(table)

    if pqwriter:
        pqwriter.close()

    del title, id, namespace, event, elem, context, rows, pqwriter, chunk_size, df_chunk, table

In [4]:
%%time
# Restore variable from different Jupyter notebook
%store -r filename wikinamedate

index_pages(filename, wikinamedate)

[INFO] Processing pages: 100%|██████████| 2.63M/2.63M [19:04<00:00, 2.30k pages/s]
CPU times: user 18min 48s, sys: 23.5 s, total: 19min 12s
Wall time: 19min 5s
