In [None]:
from pathlib import Path

import httpx
import base64
import json
from io import BytesIO
from pdf2image import convert_from_path

import instructor
from openai import OpenAI
from dotenv import dotenv_values
from pydantic import BaseModel, Field
from typing import Optional, Literal, List

In [None]:
MAIN_DIR_PATH = Path("../.")

DATA_PATH = MAIN_DIR_PATH / "data"
RAW_DATA_PATH = DATA_PATH / "raw"
PROCESSED_DATA_PATH = DATA_PATH / "processed"
PROCESSED_DATA_PATH.mkdir(parents=True, exist_ok=True)

In [None]:
env = dotenv_values(MAIN_DIR_PATH / ".env")
openai_client = instructor.from_openai(
    OpenAI(api_key=env["OPENAI_API_KEY"], timeout=httpx.Timeout(40.0))
)

In [None]:
# Marked: 1, 2, 6, 8, 10, 14, 15
FILE_PATH = RAW_DATA_PATH / "wzor-1.pdf"

# Marked: 8, 10
# FILE_PATH = RAW_DATA_PATH / "wzor-2.pdf"

# Marked: 8, 9, 10
# FILE_PATH = RAW_DATA_PATH / "wzor-3.pdf"

## PDF to JPEG pages

In [None]:
DPI = 300
OUTPUT_FORMAT = "JPEG"

pages = convert_from_path(FILE_PATH, dpi=DPI)
pages_base64 = []

for page in pages:
    buffer = BytesIO()
    page.save(buffer, format=OUTPUT_FORMAT)
    encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
    pages_base64.append(encoded_image)

In [None]:
# print(pages_base64[4])

## BaseModel

In [None]:
class ApplicationForm(BaseModel):
    surname: Optional[str] = Field(
        None, alias="Nazwisko", description="Nazwisko uczestnika"
    )
    name: Optional[str] = Field(None, alias="Imię", description="Imię uczestnika")
    sex: Optional[Literal["kobieta", "mężczyzna"]] = Field(
        None, alias="Płeć", description="Płeć zaznaczona na formularzu"
    )
    birth_date: Optional[str] = Field(
        None, alias="Data urodzenia", description="Data urodzenia uczestnika"
    )
    phone: Optional[str] = Field(
        None, alias="Numer telefonu", description="Numer telefonu kontaktowego"
    )
    email: Optional[str] = Field(
        None, alias="Adres e-mail", description="Adres e-mail uczestnika"
    )
    residence_or_workplace: Optional[str] = Field(
        None,
        alias="Podaj stałe miejsce zamieszkania (kod pocztowy, miejscowość)* Tj. min. 3 miesiące przed złożeniem fiszki zgłoszeniowej, we wskazanym miejscu. Przez stałe zamieszkanie należy rozumieć zamieszkanie w określonej miejscowości pod oznaczonym adresem z zamiarem stałego pobytu. O miejscu zamieszkania decydują występujące łącznie dwie przesłanki faktyczne: przebywanie w znaczeniu fizycznym w określonej miejscowości i zamiar stałego pobytu. Na stałość pobytu w danej miejscowości wskazuje skupienie w niej życiowej aktywności, związanej z pracą czy rodziną lub podaj miejsce pracy (kod pocztowy oraz miejscowość głównej siedziby lub oddziału firmy, w której jesteś zatrudniony)",
        description="Adres zamieszkania lub miejsce pracy",
    )
    is_entrepreneur: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy jesteś przedsiębiorcą w rozumieniu art. 4 ust. 1-2 ustawy Prawo przedsiębiorców? *",
        description="Czy uczestnik prowadzi działalność gospodarczą lub jest wspólnikiem spółki cywilnej",
    )
    career_counseling: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy deklarujesz udział w doradztwie zawodowym?",
        description="Deklaracja udziału w doradztwie zawodowym",
    )
    green_competencies: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy deklarujesz udział w usłudze rozwojowej prowadzącej do nabycia zielonych kompetencji/kwalifikacji?",
        description="Deklaracja udziału w usłudze prowadzącej do zielonych kwalifikacji",
    )
    accessibility_support: Optional[str] = Field(
        None,
        alias="Jeśli jesteś osobą z niepełnosprawnością, zaznacz właściwą opcję, z której chciałbyś korzystać w przypadku zakwalifikowania się do projektu:",
        description="Lista potrzebnych udogodnień dla osób z niepełnosprawnością",
    )
    question_12_mining_related: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy pracujesz w górnictwie, branży około górniczej lub opuściłaś/łeś którąś z tych branż po 1 stycznia 2021 r.?",
        description="Status zatrudnienia w branży górniczej po 01.01.2021",
    )
    question_13_disadvantaged_group: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy jesteś osobą spełniającą co najmniej jedno kryterium a-f, należącą do grupy osób znajdujących się w niekorzystnej sytuacji?",
        description="Przynależność do grupy w niekorzystnej sytuacji społecznej lub zawodowej",
    )
    question_14_residence_area: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy stale zamieszkujesz1 w województwie śląskim na terenie wskazanym w Rozporządzaniu Rady Ministrów z dnia 16 września 2024 r. w sprawie wprowadzenia stanu klęski żywiołowej...",
        description="Zamieszkanie na terenie objętym stanem klęski żywiołowej",
    )
    question_15_ris_scope: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy deklarujesz udział w usłudze rozwojowej, której zakres tematyczny wynika z Regionalnej Strategii Innowacji Województwa Śląskiego 2030...",
        description="Deklaracja udziału w usłudze zgodnej z RIS 2030",
    )
    question_16_certification_path: Optional[Literal["tak", "nie"]] = Field(
        None,
        alias="Czy deklarujesz udział w usłudze rozwojowej prowadzącej do nabycia kwalifikacji?",
        description="Deklaracja udziału w usłudze rozwojowej zakończonej certyfikacją",
    )
    legal_declaration: Optional[bool] = Field(
        None,
        alias="Świadoma/y odpowiedzialności karnej za podanie fałszywych informacji oświadczam, że:",
        description="Potwierdzenie zgodności danych i akceptacji regulaminu",
    )
    marketing_consent: Optional[Literal["TAK", "NIE"]] = Field(
        None,
        alias="Wyrażam zgodę na przetwarzanie moich danych osobowych podanych w umowie/we wniosku przez Fundusz Górnośląski S.A. w celach marketingowych",
        description="Zgoda na przetwarzanie danych w celach marketingowych",
    )
    marketing_channels: Optional[List[str]] = Field(
        None,
        alias="Zgoda marketingowa – formy kontaktu",
        description="Formy kontaktu marketingowego zaakceptowane przez uczestnika (e-mail, SMS, telefon)",
    )

## OpenAI connection

In [None]:
results: List[ApplicationForm] = []

for page in pages_base64:
    response = openai_client.chat.completions.create(
        model="gpt-4o",
        response_model=ApplicationForm,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": (
                            "Wyodrębnij dane z formularza zgłoszeniowego do projektu "
                            "zgodnie z poniższym modelem. Każde pole dopasuj do jego zawartości. "
                            "Zwróć tylko dane obecne na tej stronie dokumentu PDF. "
                            "Jeśli dane nie są obecne, pomiń pole (zostanie uzupełnione z innej strony)."
                        ),
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{page}"},
                    },
                ],
            }
        ],
        temperature=0.0,
    )
    results.append(response)

In [None]:
with open(PROCESSED_DATA_PATH/"raw_results.json", "w", encoding="utf-8") as f:
    json.dump([r.dict() for r in results], f, ensure_ascii=False, indent=2)

## Result

In [None]:
def merge_form_results(results_list):
    merged = {}

    # Convert ApplicationForm to dict
    dict_results = []
    for result in results_list:
        if hasattr(result, "dict"):  # pydantic model
            dict_results.append(result.dict())
        elif hasattr(result, "__dict__"):  # class
            dict_results.append(result.__dict__)
        else:  # dict
            dict_results.append(result)

    # Init with None
    for result in dict_results:
        for key in result.keys():
            if key not in merged:
                merged[key] = None

    # Fill value - take first unempty value
    for key in merged.keys():
        for result in dict_results:
            value = result.get(key)
            if value is not None and value != "" and value != []:
                merged[key] = value
                break

    return merged


def process_and_save_results(results, data_path):
    merged_result = merge_form_results(results)

    with open(data_path / "final_result.json", "w", encoding="utf-8") as f:
        json.dump(merged_result, f, ensure_ascii=False, indent=2)

    return merged_result

In [None]:
merged = process_and_save_results(results, PROCESSED_DATA_PATH)