In [None]:
#%pip install wikitextparser

import xml.etree.ElementTree as ET
import wikitextparser as wtp
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
from tqdm import tqdm
import pandas as pd
from heapq import heappush,heappop
import os
from concurrent.futures import ThreadPoolExecutor

In [2]:
tree = ET.parse('ruwiki-20241201-pages-articles-multistream1.xml')
root = tree.getroot()
len(root)

130478

In [3]:
prefix = '{http://www.mediawiki.org/xml/export-0.11/}'

In [13]:
# Выходная папка
output_dir = "docx2json_example"

if not os.path.exists(output_dir):
	os.makedirs(output_dir)
	

In [None]:
# Преобработка

indexies = []

for i in range(len(pages)):
	text = pages[i].find(f"{prefix}revision").find(f"{prefix}text").text
	if(text and text.find("{|")):
		indexies.append(i)


'\nfor i in range(len(pages)):\n\ttext = pages[i].find(f"{prefix}revision").find(f"{prefix}text").text\n\tif(text and text.find("{|")):\n\t\tindexies.append(i)\n'

In [None]:
def parse_to_docx(parsed : wtp.WikiText, title : str, filename : str) -> None:
	elements = []

	sections,tables,lists = parsed.get_sections(),parsed.get_tables(),parsed.get_lists()

	for i in range(len(sections)):
		heappush(elements,(sections[i].span[0],"section",i))

	for i in range(len(tables)):
		heappush(elements,(tables[i].span[0],"table",i))

	for i in range(len(lists)):
		heappush(elements,(lists[i].span[0],"list",i))
		
	doc = Document()
	doc.add_heading(f"Содержимое страницы: {title}", level = 1)

	while elements:
		elem = heappop(elements)

		if elem[1] == "section":
			sec = sections[elem[2]]

			if sec.title:
				secText = wtp.remove_markup(sec.title).strip()
				if secText:
					doc.add_heading(secText, level=sec.level)
		if elem[1] == "table":
			tbl = tables[elem[2]]

			data = tbl.data()

			if len(data) <= 0 or len(data[0]) <= 1:
				continue

			table = doc.add_table(rows=len(data), cols=len(data[0]))

			doc.add_paragraph(f"Таблица. {tbl.caption}").alignment = WD_ALIGN_PARAGRAPH.CENTER

			for i, row in enumerate(data):
				for j, cell in enumerate(row):
					if not cell:
						continue

					cellText = wtp.remove_markup(cell).strip()

					if cellText:
						table.cell(i, j).text = cellText
		elif elem[1] == "list":
			lst = lists[elem[2]]

			for item in lst.items:
				itemText = wtp.remove_markup(item).strip()

				if itemText:
					doc.add_paragraph(itemText, style="List Bullet")

	doc.save(os.path.join(output_dir, filename))


In [None]:
def parse_to_json(parsed : wtp.WikiText, title : str, filename : str) -> None:
	elements = []

	sections,tables,lists = parsed.get_sections(),parsed.get_tables(),parsed.get_lists()

	for i in range(len(sections)):
		heappush(elements,(sections[i].span[0],"section",i))

	for i in range(len(tables)):
		heappush(elements,(tables[i].span[0],"table",i))

	for i in range(len(lists)):
		heappush(elements,(lists[i].span[0],"list",i))
		
	with open(os.path.join(output_dir, filename),"w", encoding="utf-8") as jf:
		data = {
			"title": title,
            "sections": [],
            "tables": [],
            "lists": []
		}
		
		json.dump(data,jf)


In [None]:
def parse_to_markdown(parsed : wtp.WikiText, title : str, filename : str) -> None:
	elements = []

	sections,tables,lists = parsed.get_sections(),parsed.get_tables(),parsed.get_lists()

	for i in range(len(sections)):
		heappush(elements,(sections[i].span[0],"section",i))

	for i in range(len(tables)):
		heappush(elements,(tables[i].span[0],"table",i))

	for i in range(len(lists)):
		heappush(elements,(lists[i].span[0],"list",i))
		
	doc = Document()
	doc.add_heading(f"Содержимое страницы: {title}", level = 1)

	with open(os.path.join(output_dir, filename),"w", encoding="utf-8") as mdf:
		mdf.write(f"# Содержимое страницы: {title}")
		
		while elements:
			elem = heappop(elements)

			if elem[1] == "section":
				sec = sections[elem[2]]

				if sec.title:
					secText = wtp.remove_markup(sec.title).strip()
					if secText:
						mdf.write(f"{'#' * sec.level} {secText}\n")
			if elem[1] == "table":
				tbl = tables[elem[2]]

				data = tbl.data()

				if(len(data) <= 0):
					continue

				df = pd.DataFrame(data[1:],columns=data[0])

				df.
				for i, row in enumerate(data):
					for j, cell in enumerate(row):
						if not cell:
							continue

						cellText = wtp.remove_markup(cell).strip()

						if cellText:
							if i == 0:
								df.columns.values[j] = cellText
							else:
								df.at[i-1,df.columns[j]] = cellText

				mdf.write(df.to_markdown(index=False))
				mdf.write('\n')
			elif elem[1] == "list":
				lst = lists[elem[2]]

				for item in lst.items:
					itemText = wtp.remove_markup(item).strip()

					if itemText:
						mdf.write(f"- {itemText}\n")

		mdf.write("\n---\n")

In [22]:
def parse_page(page : ET.Element) -> None:
	title = page.find(f"{prefix}title").text

	filename = "".join(x for x in title if x.isalnum()) + ".docx"

	text = page.find(f"{prefix}revision").find(f"{prefix}text").text #.split("\n")

	if not text:
		return

	parsed = wtp.parse(text)

	parse_to_docx(parsed,title,filename)

In [23]:
pages = root.findall(f'{prefix}page')

parse_page(pages[1])
parse_page(pages[2])

In [None]:
# Парсим этот пиздец
with ThreadPoolExecutor(max_workers=10) as executor:
	list(tqdm(executor.map(parse_page, pages), desc='Создание страниц', total=len(pages)))

Создание страниц:: 100%|██████████| 130477/130477 [1:22:33<00:00, 26.34it/s]  


In [16]:
for page in root.findall(f'{prefix}page'):
	title = page.find(f"{prefix}title").text

	text = page.find(f"{prefix}revision").find(f"{prefix}text").text #.split("\n")

	if not text:
		continue

	if text.find("#REDIRECT") >= 0:
		continue

	parsed = wtp.parse(text)

	lists = parsed.get_lists()

	if not lists:
		continue

	start = 0

	new_text = ""

	html_list_to_table = ""

	for lst in parsed.get_lists():
		new_text += text[start:lst.span[0]]
		
		list_to_table_text = '{|class="wikitable"\n|+Список\n'

		for i in range(len(lst.items)):
			list_to_table_text += f'|{lst.items[i]}\n|-\n'

		list_to_table_text += "|}"

		new_text += list_to_table_text

		start = lst.span[1]

	html_list_to_table += text[start:]

	new_parsed = wtp.parse(new_text)
	
	tbl = new_parsed.get_tables()[0]

	for row in tbl.data():
		print(row)

	display(tbl.caption)

	break


['[[Алитусский уезд]]']
['[[Вильнюсский уезд]]']
['[[Каунасский уезд]]']
['[[Клайпедский уезд]]']
['[[Мариямпольский уезд]]']
['[[Паневежский уезд]]']
['[[Таурагский уезд]]']
['[[Тельшяйский уезд]]']
['[[Утенский уезд]]']
['[[Шяуляйский уезд]]']


'Список'