In [1]:
import pdfplumber
import re

import math
import csv

In [2]:
pdf = pdfplumber.open("../data/calendario.pdf")

In [3]:
# headers = ["Gara N", "Squadra A", "Squadra B", "Giorno", "Data", "Ora"]
# coordinates = ["x0", "top", "x1", "bottom"]
# first_page = pdf.pages[0]

# for header in headers:
#     print(f"=== {header} ===")
#     search = first_page.search(header, case=False)[0]
#     for coordinate in coordinates:
#         print(f"{coordinate}: {search[coordinate]}")
#     print("\n")

In [4]:
first_page = pdf.pages[0]

HEADER_BOTTOM = math.ceil(first_page.search("Gara N", case=False)[0]["bottom"])
SQUADRA_A_LEFT = math.floor(first_page.search("Squadra A", case=False)[0]["x0"])
SQUADRA_B_LEFT = math.floor(first_page.search("Squadra B", case=False)[0]["x0"])
GIORNO_LEFT = math.floor(first_page.search("Giorno", case=False)[0]["x0"])
GIORNO_RIGHT = math.ceil(first_page.search("Giorno", case=False)[0]["x1"])

In [5]:
def filter_exclude_header(obj):
    return obj["bottom"] > HEADER_BOTTOM


def filter_bold_chars(obj):
    return "bold" in obj["fontname"].lower()


# def num_gara(obj):
#     return obj["x0"] < 88


def filter_squadra_a(obj):
    return (
        filter_exclude_header(obj)
        and filter_bold_chars(obj)
        and (obj["x0"] > SQUADRA_A_LEFT)
        and (obj["x1"] < SQUADRA_B_LEFT)
    )


def filter_squadra_b(obj):
    return (
        filter_exclude_header(obj)
        and filter_bold_chars(obj)
        and (obj["x0"] > SQUADRA_B_LEFT)
        and (obj["x1"] < GIORNO_LEFT)
    )


def filter_date_time(obj):
    return (
        filter_exclude_header(obj)
        and filter_bold_chars(obj)
        and obj["x0"] > GIORNO_RIGHT
    )


def filter_address(obj):
    return filter_exclude_header(obj) and not filter_bold_chars(obj)

In [6]:
def _split_and_remove_spaces(text: str) -> str:
    # there is always a blank line between consecutive entries
    # while a single entry can be on two consecutive lines
    text = re.split(r"^ *\n", text, flags=re.MULTILINE)
    # remove extra whitespace
    text = [
        re.sub(r"\s{3,}", " ", t.strip()).replace("  ", " ") for t in text if t.strip()
    ]
    return text


def postprocess_teams(text: str) -> list[str]:
    # remove subheaders such "1 Giornata di Andata"
    text = re.sub(r"^.*Giornata di.*$", "", text, flags=re.MULTILINE | re.IGNORECASE)

    teams = _split_and_remove_spaces(text)
    # if there are two digits in subheader such as "10 Giornata di ..."
    # one of the digit may be caught inside the team column,
    # so we remove it
    teams = [t for t in teams if re.sub(r"\d+", "", t)]

    return teams


def postprocess_date_time(text: str) -> list[tuple[str, str]]:
    # there is always a blank line between consecutive entries
    text = _split_and_remove_spaces(text)
    # split date (DD/MM/YYYY) and time (HH:MM) using spaces between them
    dt = [re.split(r" +", t.strip()) for t in text if t.strip()]
    return dt


def postprocess_address(text: str) -> list[tuple[str, str]]:
    # there is always a blank line between consecutive entries
    text = _split_and_remove_spaces(text)
    # get court name and address using the first '-' as separator
    # (optionally with white space around it)
    address = [
        re.split(r" *- *", t, maxsplit=1) for t in text if "note" not in t.lower()
    ]
    return address

In [7]:
filters_func = {
    "squadra_a": filter_squadra_a,
    "squadra_b": filter_squadra_b,
    "date_time": filter_date_time,
    "address": filter_address,
}

postprocess_func = {
    "squadra_a": postprocess_teams,
    "squadra_b": postprocess_teams,
    "date_time": postprocess_date_time,
    "address": postprocess_address,
}

In [8]:
result = {
    "squadra_a": "",
    "squadra_b": "",
    "date_time": "",
    "address": "",
}

In [9]:
for page in pdf.pages[:]:
    for k in result.keys():
        filter_page = page.filter(lambda obj: filters_func[k](obj))
        text = filter_page.extract_text(layout=True, use_text_flow=False)
        result[k] += f"\n{text}"


In [10]:
for k, v in result.items():
    result[k] = postprocess_func[k](v)

In [11]:
# check if all have same length
tot = [len(v) for v in result.values()]
N = tot[0]
if not all([t == N for t in tot]):
    print("Error: found different number of entries")
    for k, v in result.items():
        print(f"  {k}: {len(v)}")
    raise ValueError("Different number of entries")
else:
    print(f"Found {N} entries")

Found 240 entries


In [12]:
out_file = "../data/calendario.csv"
header = ["Home", "Away", "Date", "Time", "Court", "Address"]

team = "arcella"
team = team.lower()

with open(out_file, "w") as f:
    writer = csv.DictWriter(f, fieldnames=header)

    writer.writeheader()
    for i in range(N):
        if team:
            if (
                team not in result["squadra_a"][i].lower()
                and team not in result["squadra_b"][i].lower()
            ):
                continue

        row = {
            "Home": result["squadra_a"][i],
            "Away": result["squadra_b"][i],
            "Date": result["date_time"][i][0],
            "Time": result["date_time"][i][1],
            "Court": result["address"][i][0],
            "Address": result["address"][i][1],
        }
        writer.writerow(row)