# RomKorr – Scraper (clean)

Dieses Notebook lädt Metadaten zu Briefen von **briefe-der-romantik.de** und schreibt zwei Dateien:

- `data/raw/rom_korr_full.csv` – alles, was gefunden wird
- `data/processed/rom_korr_full_website.csv` – gefiltert: Zeilen, bei denen *alle* Kernfelder `Unknown` sind (bzw. Dispatch/Destination beide `Unknown`)

> Hinweis: Bitte scrapen mit Maß (Rate-Limit) und im Zweifel `END_ID` klein starten.

In [None]:
from __future__ import annotations

import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import pandas as pd
import requests
from bs4 import BeautifulSoup

In [None]:
# =========================
# Konfiguration
# =========================

BASE_URL = "https://briefe-der-romantik.de/letters/view/"
START_ID = 1
END_ID = 21000

# Politeness / Stabilität
SLEEP_SECONDS = 0.5           # zwischen Requests
TIMEOUT_SECONDS = 30
MAX_RETRIES = 3
BACKOFF_FACTOR = 1.5          # exponential backoff
USER_AGENT = "RomKorr/1.0 (research; contact: you@example.com)"  # bitte anpassen

# Output
PROJECT_ROOT = Path.cwd()
RAW_OUT = PROJECT_ROOT / "data" / "raw" / "rom_korr_full.csv"
PROCESSED_OUT = PROJECT_ROOT / "data" / "processed" / "rom_korr_full_website.csv"
CACHE_DIR = PROJECT_ROOT / "data" / "cache" / "html"   # optional: HTML cache pro ID

# Wenn True: speichert HTML unter data/cache/html/<id>.html und nutzt Cache beim Restart
USE_HTML_CACHE = True

# Wenn True: setzt die URL ohne query_id-Parameter (robuster, falls query_id wechselt)
DROP_QUERY_ID = True

In [None]:
# =========================
# Helpers
# =========================

@dataclass
class LetterMeta:
    letter_id: int
    Date: Optional[str] = None
    Sender: Optional[str] = None
    Recipient: Optional[str] = None
    Place_of_Dispatch: Optional[str] = None
    Place_of_Destination: Optional[str] = None
    Dispatch_GeoNames: Optional[str] = None
    Destination_GeoNames: Optional[str] = None
    link: Optional[str] = None

    def to_dict(self) -> dict:
        return {
            "letter_id": self.letter_id,
            "Date": self.Date,
            "Sender": self.Sender,
            "Recipient": self.Recipient,
            "Place of Dispatch": self.Place_of_Dispatch,
            "Place of Destination": self.Place_of_Destination,
            "Dispatch_GeoNames": self.Dispatch_GeoNames,
            "Destination_GeoNames": self.Destination_GeoNames,
            "link": self.link,
        }


def clean_kdata_text(text: str) -> str:
    \"\"\"Nimmt nur das Segment vor dem ersten '·' (z.B. ohne '· GND').\"\"\"
    return re.split(r"\\s*·\\s*", text, maxsplit=1)[0].strip()


def build_letter_url(letter_id: int) -> str:
    if DROP_QUERY_ID:
        # bewusst minimal: query_id ist häufig instabil
        return f"{BASE_URL}{letter_id}?left=text"
    return f"{BASE_URL}{letter_id}?left=text&query_id=6773ee556fe85"

In [None]:
def get_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "de-DE,de;q=0.9,en;q=0.7",
    })
    return s


def fetch_html(session: requests.Session, letter_id: int) -> Optional[str]:
    \"\"\"Lädt HTML (mit Retries). Optional: lokaler Cache pro letter_id.\"\"\"
    url = build_letter_url(letter_id)
    cache_file = CACHE_DIR / f"{letter_id}.html"

    if USE_HTML_CACHE and cache_file.exists():
        return cache_file.read_text(encoding="utf-8", errors="ignore")

    last_err: Optional[Exception] = None
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = session.get(url, timeout=TIMEOUT_SECONDS)
            if resp.status_code == 404:
                return None
            resp.raise_for_status()

            html = resp.text
            if USE_HTML_CACHE:
                CACHE_DIR.mkdir(parents=True, exist_ok=True)
                cache_file.write_text(html, encoding="utf-8")

            return html

        except Exception as e:
            last_err = e
            sleep = (BACKOFF_FACTOR ** (attempt - 1)) * SLEEP_SECONDS
            time.sleep(sleep)

    print(f"[WARN] Failed letter_id={letter_id}: {last_err}")
    return None

In [None]:
def parse_metadata(html: str, letter_id: int) -> Optional[LetterMeta]:
    soup = BeautifulSoup(html, "html.parser")

    metadata_section = soup.find("div", id="metadata-tab-pane")
    if not metadata_section:
        return None

    meta = LetterMeta(letter_id=letter_id, link=build_letter_url(letter_id))

    # Erwartetes DOM: <li><span class="label">...</span><span class="kdata">...</span></li>
    for li in metadata_section.find_all("li"):
        label_span = li.find("span", class_="label")
        kdata_span = li.find("span", class_="kdata")
        if not label_span or not kdata_span:
            continue

        label = label_span.get_text(strip=True)
        raw = kdata_span.get_text(strip=True)
        value = clean_kdata_text(raw) if raw else None

        # Mapping auf unser Schema
        if label == "Date":
            meta.Date = value
        elif label == "Sender":
            meta.Sender = value
        elif label == "Recipient":
            meta.Recipient = value
        elif label == "Place of Dispatch":
            meta.Place_of_Dispatch = value
            for a in kdata_span.find_all("a", href=True):
                if "geonames.org" in a["href"]:
                    meta.Dispatch_GeoNames = a["href"]
                    break
        elif label == "Place of Destination":
            meta.Place_of_Destination = value
            for a in kdata_span.find_all("a", href=True):
                if "geonames.org" in a["href"]:
                    meta.Destination_GeoNames = a["href"]
                    break

    return meta

In [None]:
def scrape_range(start_id: int, end_id: int) -> pd.DataFrame:
    session = get_session()

    rows = []
    for letter_id in range(start_id, end_id + 1):
        if letter_id % 100 == 0:
            print(f"[INFO] at letter_id={letter_id}")

        html = fetch_html(session, letter_id)
        if not html:
            time.sleep(SLEEP_SECONDS)
            continue

        meta = parse_metadata(html, letter_id)
        if meta:
            rows.append(meta.to_dict())

        time.sleep(SLEEP_SECONDS)

    return pd.DataFrame(rows)


def filter_for_website(df: pd.DataFrame) -> pd.DataFrame:
    # Robust gegen fehlende Spalten
    needed = ["Sender", "Recipient", "Date", "Place of Dispatch", "Place of Destination"]
    for col in needed:
        if col not in df.columns:
            df[col] = None

    mask_all_unknown = (
        df["Sender"].fillna("Unknown").eq("Unknown")
        & df["Recipient"].fillna("Unknown").eq("Unknown")
        & df["Date"].fillna("Unknown").eq("Unknown")
        & df["Place of Dispatch"].fillna("Unknown").eq("Unknown")
        & df["Place of Destination"].fillna("Unknown").eq("Unknown")
    )

    mask_place_unknown = (
        df["Place of Dispatch"].fillna("Unknown").eq("Unknown")
        & df["Place of Destination"].fillna("Unknown").eq("Unknown")
    )

    mask_combined = mask_all_unknown | mask_place_unknown
    print(f"[INFO] rows removed by filter: {int(mask_combined.sum())} / {len(df)}")
    return df.loc[~mask_combined].copy()

In [None]:
# =========================
# Run
# =========================

RAW_OUT.parent.mkdir(parents=True, exist_ok=True)
PROCESSED_OUT.parent.mkdir(parents=True, exist_ok=True)

df = scrape_range(START_ID, END_ID)

df.to_csv(RAW_OUT, index=False)
df_website = filter_for_website(df)
df_website.to_csv(PROCESSED_OUT, index=False)

df.head(), df_website.head()