In [None]:
from timing import setup

setup()

In [None]:
import json
import subprocess

from pathlib import Path
from typing import Iterable, Optional, Union

from cattrs.preconf.json import make_converter
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer, LAParams

from dataclasses import dataclass
from decimal import Decimal
from typing import Iterable, List, Tuple


converter = make_converter()


@dataclass
class Box:
    x0: float
    y0: float
    x1: float
    y1: float

    def contains(self, xy: Tuple[float, float]) -> bool:
        return self.x0 <= xy[0] < self.x1 and self.y0 <= xy[1] < self.y1


@dataclass
class Page:
    number: int
    words: "List[Word]"


@dataclass(order=True)
class Word:
    x: float
    y: float
    text: str


def pdf_extract(
    path: Union[Path, str], *, pdf_extract_bin: Optional[Union[Path, str]] = None
) -> Iterable["Page"]:
    if pdf_extract_bin is not None:
        with subprocess.Popen(
            [str(pdf_extract_bin), str(path)],
            stdout=subprocess.PIPE,
            encoding="utf8",
        ) as p:
            for line in p.stdout:
                yield converter.loads(line, Page)

    else:
        laparams = LAParams(
            line_margin=0,
            char_margin=0.1,
            word_margin=0.1,
        )
        for page_idx, page_layout in enumerate(extract_pages(path, laparams=laparams)):
            yield Page(
                number=page_idx + 1,
                words=sorted(
                    Word(
                        x=element.bbox[0],
                        y=element.bbox[1],
                        text=element.get_text().rstrip(),
                    )
                    for element in page_layout
                    if isinstance(element, LTTextContainer)
                ),
            )

In [None]:
with open("data/pdf_files.json", "rt") as fobj:
    pdf_files = json.load(fobj)

In [None]:
subprocess.run(["cargo", "build", "-p", "pdf-parser", "--release"], check=True)

In [None]:
%%record-time parse-pdf-python
pages = list(pdf_extract(pdf_files[0]))

In [None]:
%%record-time parse-pdf-rust
pages = list(pdf_extract(pdf_files[0], pdf_extract_bin="./target/release/pdf-parser.exe"))

In [None]:
import datetime
import re

Structure = List[Tuple[int, Word]]

account_block_start = ["Girokonto", "Buchungstag", "Valuta", "Alter"]
account_page_start = ["Buchungstag", "Valuta"]
account_block_end = ["Neuer"]

structure_box = Box(55, 55, 65, 720)

date_re = re.compile(r"(?P<ahead>A?)(?P<day>\d{2})\.(?P<month>\d{2})\.(?P<year>\d{4})")


def parse_bank_statement(
    path, pdf_extract_bin=None
) -> List[Tuple[datetime.date, Decimal]]:
    pages = list(pdf_extract(path, pdf_extract_bin=pdf_extract_bin))

    structure = [
        (page_idx, word)
        for page_idx, page in enumerate(pages)
        for word in filter_structure_elements(page.words)
    ]

    structure = find_account_start(structure)
    item_starts = find_item_starts(structure)

    res = []
    for page_idx, height, is_pos in item_starts:
        if not is_pos:
            continue

        items_in_line = sorted(
            filter_words_at_height(pages[page_idx].words, height),
            key=lambda word: word.x,
        )
        res.append(parse_line(items_in_line))

    return res


def find_account_start(structure: Structure) -> Structure:
    for i in range(len(structure)):
        if get_text(structure[i : i + 4]) == account_block_start:
            return structure[i + 4 :]

    raise ValueError("Could not find account start")


def find_item_starts(structure: Structure) -> List[Tuple[int, float, bool]]:
    item_starts = []

    for i in range(0, len(structure), 2):
        if get_text(structure[i : i + 1]) == account_block_end:
            page_idx, word = structure[i]
            item_starts.append((page_idx, word.y, False))
            break

        if get_text(structure[i : i + 2]) == account_page_start:
            continue

        page_idx, word = structure[i]
        item_starts.append((page_idx, word.y, True))

    return item_starts


def parse_line(items_in_line) -> Tuple[datetime.date, Decimal]:
    m = date_re.match(items_in_line[0].text)
    assert m is not None
    assert m.group("ahead") == ""

    date = datetime.date(
        int(m.group("year")),
        int(m.group("month")),
        int(m.group("day")),
    )
    delta = parse_number(items_in_line[-1].text)

    return date, delta


def get_text(structure: Structure) -> List[str]:
    return [word.text for _, word in structure]


def parse_number(val: str) -> Decimal:
    return Decimal(val.replace(".", "").replace(",", "."))


def filter_structure_elements(elems: Iterable[Word]) -> List[Word]:
    elems = (word for word in elems if word.text.strip())
    elems = filter_words_in_box(elems, structure_box)
    elems = sorted(elems, key=lambda word: word.y, reverse=True)
    return elems


def filter_words_in_box(words: Iterable[Word], box: Box) -> Iterable[Word]:
    return (word for word in words if box.contains((word.x, word.y)))


def filter_words_at_height(
    words: Iterable[Word], y: float, delta: float = 5.0
) -> Iterable[Word]:
    return (word for word in words if abs(word.y - y) < delta)

In [None]:
for page in pages:
    structure_elements = filter_structure_elements(page.words)
    structure_text = [word.text for word in structure_elements]

    account_start = None

    for i in range(len(structure_elements)):
        if structure_text[i : i + 4] == ["Girokonto", "Buchungstag", "Valuta", "Alter"]:
            account_start = i

In [None]:
%%record-time convert-pdf-end-to-end-python

for path in pdf_files:
    parse_bank_statement(path)

In [None]:
%%record-time convert-pdf-end-to-end-rust

for path in pdf_files:
    parse_bank_statement(path, pdf_extract_bin="./target/release/pdf-parser.exe")