In [None]:
# !pip install beautifulsoup4 py-wikimarkup==2.1.3 html5lib lxml==4.9.1

In [None]:
import bz2
from bs4 import BeautifulSoup
import os
import bisect
import re
import time
import pandas as pd
import traceback
from wikimarkup.parser import Parser

import pyspark.pandas as ps
import pyarrow as pa
import pickle

indexpath = "/dbfs/wiki/raw/enwiki-latest-pages-articles-multistream-index.txt"
DUMP_FILE = "/dbfs/wiki/raw/enwiki-latest-pages-articles-multistream.xml.bz2"

def get_page_from_byte_offset(start, end):
    raw = get_xml_pages(start, end)
    soup = BeautifulSoup(raw, "lxml")
    return soup.find_all("page")

  
def get_xml_pages(start, end):
    decomp = bz2.BZ2Decompressor()
    with open(DUMP_FILE, 'rb') as f:
        f.seek(start)
        readback = f.read(end - start - 1)
        page_xml = decomp.decompress(readback).decode()
    return page_xml


def get_tables(soup_page):
    parser = Parser()
#     default_return = [(str(soup_page.title.text), None, None, None)]
    output = []
    try:
        try:
            html = parser.parse(soup_page.text)
        except TypeError:
            return []
        soup = BeautifulSoup(html, 'html.parser')
        tables = soup.find_all('table')
        try:
            df_tables = pd.read_html(html)
        except ValueError:
            df_tables = []
        if len(tables) != len(df_tables):
            print("tables offset..")
            return []
        for i, table in enumerate(tables):
            # all previous headings in order
            headings = []
            for k in range(6):
                h = table.find_previous_sibling(f'h{k}')
                headings.append(
                    str(h.text) if h else ''
                )
            output.append((soup_page.title.text, i, "\n".join(headings), df_tables[i]))
    except Exception as e:
        print(traceback.format_exc())
        print(f"Failed to parse... {soup_page.title.text}")
#     if len(output) == 0 and 'wikitable' in soup_page.text:
#         print(soup_page.text)
#         raise RuntimeError("AHHH!")
    return output


def get_pages(start_byte, end_byte):
    result = get_page_from_byte_offset(start_byte, end_byte)
    final = []
    for page in result:
        for title, i, headers, df in get_tables(page):
            final.append((title, i, headers, pickle.dumps(pa.Table.from_pandas(df))))
#     print(f"Found {len(final)} tables")
    return final

In [None]:
with open(indexpath, "r") as f:
    index_file = f.readlines()
start_bytes = list(sorted(set([int(x.split(":")[0]) for x in index_file])))
start_bytes.append(os.path.getsize(DUMP_FILE) + 1)

In [None]:
offsets = pd.DataFrame({'start_byte': start_bytes[:-1], 'end_byte': start_bytes[1:]})

In [None]:
import ray
import sys
 
sys.stdout.fileno = lambda: False

In [None]:
ray.init(runtime_env={'pip': ["beautifulsoup4", "py-wikimarkup==2.1.3", "html5lib", "lxml==4.9.1"]})

In [None]:
batch = []
for i, x in offsets.iterrows():
    parsed = ray.remote(get_pages).remote(x.start_byte, x.end_byte)
    batch.append(parsed)
    if len(batch) >= 1000:
        st = time.time()
        print("Gathering... ")
        results = ray.get(batch)
        path = f"/dbfs/wiki/tables/wiki_table_part_{i+1:06d}.parquet"
        print(f"Saving! {path} (( gathering took: {time.time() - st} ))")
        pd.DataFrame([x for y in results for x in y], columns=["title", "i", "headers", "pa_table_pickle"]).to_parquet(path)
        batch = []
st = time.time()
print("Gathering... ")
results = ray.get(batch)
path = f"/dbfs/wiki/tables/wiki_table_part_{i:06d}.parquet"
print(f"Saving! {path} (( gathering took: {time.time() - st} ))")
pd.DataFrame([x for y in results for x in y], columns=["title", "i", "headers", "pa_table_pickle"]).to_parquet(path)