In [3]:
import os, re, json, time, argparse
from typing import List, Dict, Any, Optional, Tuple
import pdfplumber, pandas as pd
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

PDF_DEFAULT               = "amazon.pdf"
FACTS_CSV_DEFAULT         = "Output.csv"
TABLES_CSV_DEFAULT        = "tables.csv"
MODEL_DEFAULT             = "qwen-turbo"  
BASE_URL_DEFAULT          = "https://dashscope.aliyuncs.com/compatible-mode/v1"
API_KEY_ENV_DEFAULT       = "DASHSCOPE_API_KEY"  
TEMPERATURE_DEFAULT       = 0.0
MAX_CALLS_DEFAULT         = None

SYSTEM_PROMPT = (
    "You are a structured information extractor for financial/business PDFs. "
    "Return STRICT JSON only (no prose). Extract quantitative facts and attach a short, grounded comment.\n"
)

USER_TMPL = """Extract facts as a JSON array following the schema below.

Rules:
- Focus on quantitative facts (numbers/%, currency) and their meaning (metric). Add period if present.
- currency_symbol: "$"|"€"|"£"|null. unit: "USD"|"EUR"|"GBP"|"UNSPEC".
- Percent: set is_percent=true and value as numeric 12 for "12%".
- Convert 'million'/'billion'/'trillion' to absolute base units in 'value' (e.g., "$12 billion" → 12000000000).
- topic: one of ["revenue","operating_income","net_income","eps","aws","ads","fcf","margin","guidance","costs","capex","general"].
- evidence: short phrase (<=140 chars) anchoring the metric; comment (<=200 chars) — concise, non-speculative.
- Always include the integer 'page' field.

Example:
[
  {{
    "metric": "AWS revenue",
    "value": 90700000000,
    "unit": "USD",
    "currency_symbol": "$",
    "is_percent": false,
    "topic": "aws",
    "period": "FY2024",
    "comment": "Revenue grew on migration of enterprise workloads to cloud",
    "evidence": "AWS revenue increased ...",
    "page": {page}
  }}
]

TEXT (page {page}):
\"\"\"{chunk}\"\"\""""

def split_sentences(text: str) -> List[str]:
    text = " ".join(text.split())
    parts = re.split(r"(?<=[\.\!\?])\s+(?=[A-Z0-9\(])", text)
    return [p.strip() for p in parts if p.strip()] or ([text] if text.strip() else [])

def chunkify(sents: List[str], max_sents: int = 10) -> List[str]:
    return [" ".join(sents[i:i+max_sents]) for i in range(0, len(sents), max_sents)]

try:
    from openai import OpenAI
    _HAS_NEW_SDK = True
except Exception:
    import openai as openai_legacy
    _HAS_NEW_SDK = False

class LLMClient:
    def __init__(self, api_key_env: str, model: str, base_url: Optional[str]):
        api_key = os.getenv(api_key_env)
        if not api_key:
            raise RuntimeError(f"Env var {api_key_env} not set")
        self.model = model
        self.base_url = base_url
        self.api_key = api_key

        if _HAS_NEW_SDK:
            self.client = OpenAI(api_key=api_key, base_url=base_url)
        else:
            openai_legacy.api_key = api_key
            if base_url:
                openai_legacy.base_url = base_url
            self.client = openai_legacy

    @retry(
        retry=retry_if_exception_type(Exception),
        wait=wait_exponential(multiplier=1, min=1, max=20),
        stop=stop_after_attempt(5),
        reraise=True
    )
    def extract(self, *, page: int, chunk: str, temperature: float = 0.0) -> List[Dict[str, Any]]:
        user = USER_TMPL.format(page=page, chunk=chunk)
        if _HAS_NEW_SDK:
            resp = self.client.chat.completions.create(
                model=self.model,
                temperature=temperature,
                messages=[
                    {"role":"system","content":SYSTEM_PROMPT},
                    {"role":"user","content":user}
                ],
                response_format={"type":"json_object"}
            )
            txt = resp.choices[0].message.content
        else:
            resp = self.client.ChatCompletion.create(
                model=self.model,
                temperature=temperature,
                messages=[
                    {"role":"system","content":SYSTEM_PROMPT},
                    {"role":"user","content":user}
                ]
            )
            txt = resp.choices[0].message["content"]

        def coerce_list(obj):
            if isinstance(obj, list):
                return obj
            if isinstance(obj, dict) and "items" in obj:
                return obj["items"]
            if isinstance(obj, dict):
                return [obj]
            return []

        try:
            data = json.loads(txt)
        except Exception:
            s, e = txt.find("["), txt.rfind("]")
            data = json.loads(txt[s:e+1]) if s != -1 and e != -1 and e > s else []

        data = coerce_list(data)
        cleaned: List[Dict[str, Any]] = []
        for it in data:
            if not isinstance(it, dict):
                continue

            it.setdefault("metric", None)
            it.setdefault("value", None)
            it.setdefault("unit", None)
            it.setdefault("currency_symbol", None)
            it.setdefault("is_percent", False)
            it.setdefault("topic", "general")
            it.setdefault("period", None)
            it.setdefault("comment", None)
            it.setdefault("evidence", None)
            it.setdefault("page", page)

            # unit normalization
            unit = (it.get("unit") or "").upper()
            if unit not in {"USD","EUR","GBP","UNSPEC"}:
                sym = it.get("currency_symbol")
                unit = "USD" if sym == "$" else "EUR" if sym == "€" else "GBP" if sym == "£" else "UNSPEC"
            it["unit"] = unit

            # value → float
            val = it.get("value")
            try:
                val = float(val) if val is not None else None
            except Exception:
                val = None
            it["value"] = val

            # Truncate
            if it["comment"]:
                it["comment"] = str(it["comment"])[:200]
            if it["evidence"]:
                it["evidence"] = str(it["evidence"])[:140]

            cleaned.append(it)

        return cleaned

_SCALE_PATTERN = re.compile(r"\b(trillion|billion|million|tn|bn|mn)\b", re.I)

def normalize_value_scale(row: Dict[str, Any]) -> Dict[str, Any]:
    try:
        val = row.get("value", None)
        if val is None or row.get("is_percent", False):
            return row
        unit = (row.get("unit") or "UNSPEC").upper()
        sym  = (row.get("currency_symbol") or "")
        if unit == "UNSPEC" and sym not in {"$", "€", "£"}:
            return row

        hint_text = " ".join([str(row.get("evidence") or ""), str(row.get("comment") or "")])
        m = _SCALE_PATTERN.search(hint_text)
        if not m:
            return row

        scale = m.group(1).lower()
        mult = 1.0
        if scale in ("million", "mn"):
            mult = 1e6
        elif scale in ("billion", "bn"):
            mult = 1e9
        elif scale in ("trillion", "tn"):
            mult = 1e12

        if val is not None and val < 1e7:
            row["value"] = float(val) * mult
        return row
    except Exception:
        return row

def parse_pages_arg(pages_str: str, total_pages: int) -> List[int]:
    if not pages_str:
        return list(range(1, total_pages + 1))
    out: List[int] = []
    for part in pages_str.split(","):
        part = part.strip()
        if "-" in part:
            a, b = part.split("-", 1)
            try:
                a, b = int(a), int(b)
                if a <= b:
                    out.extend(range(a, b + 1))
            except Exception:
                continue
        else:
            try:
                out.append(int(part))
            except Exception:
                continue
    return [p for p in out if 1 <= p <= total_pages]

def ai_extract_pdf_to_rows(
    pdf_path: str,
    model: str = MODEL_DEFAULT,
    api_key_env: str = API_KEY_ENV_DEFAULT,
    base_url: Optional[str] = BASE_URL_DEFAULT,
    temperature: float = TEMPERATURE_DEFAULT,
    max_calls: Optional[int] = MAX_CALLS_DEFAULT,
    pages: Optional[str] = None,
    topics_filter: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
    client = LLMClient(api_key_env=api_key_env, model=model, base_url=base_url)
    rows: List[Dict[str, Any]] = []

    with pdfplumber.open(pdf_path) as pdf:
        allowed_pages = parse_pages_arg(pages or "", len(pdf.pages))
        calls = 0
        for page_idx, page in enumerate(pdf.pages, start=1):
            if allowed_pages and page_idx not in allowed_pages:
                continue

            txt = page.extract_text() or ""
            if not txt.strip():
                continue

            sents  = split_sentences(txt)
            chunks = chunkify(sents, max_sents=10)

            for ch in chunks:
                items = client.extract(page=page_idx, chunk=ch, temperature=temperature)

                for it in items:
                    it["pdf_file"] = os.path.basename(pdf_path)

                items = [normalize_value_scale(it) for it in items]

                if topics_filter:
                    topics_set = {t.strip().lower() for t in topics_filter}
                    items = [it for it in items if str(it.get("topic","general")).lower() in topics_set]

                rows.extend(items)
                calls += 1
                if max_calls and calls >= max_calls:
                    return rows
                time.sleep(0.10)
    return rows

def extract_tables_simple(pdf_path: str) -> pd.DataFrame:
    frames: List[pd.DataFrame] = []
    with pdfplumber.open(pdf_path) as pdf:
        for page_idx, page in enumerate(pdf.pages, start=1):
            for tbl in page.extract_tables() or []:
                df = pd.DataFrame(tbl)
                if df.empty:
                    continue

                header_row = df.iloc[0].tolist()
                non_null = sum(1 for x in header_row if pd.notnull(x) and str(x).strip() != "")
                unique   = len(set(map(lambda x: str(x).strip(), header_row)))
                if non_null >= max(2, int(0.6 * len(header_row))) and unique == len(header_row):
                    df.columns = [f"{str(c).strip()}" if str(c).strip() else f"col_{i}" for i, c in enumerate(header_row)]
                    df = df.drop(index=0).reset_index(drop=True)
                else:
                    df.columns = [f"col_{i}" for i in range(df.shape[1])]

                df.insert(0, "page", page_idx)
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

def run_notebook_defaults():
    facts = ai_extract_pdf_to_rows(
        pdf_path=PDF_DEFAULT,
        model=MODEL_DEFAULT,
        api_key_env=API_KEY_ENV_DEFAULT,
        base_url=BASE_URL_DEFAULT,
        temperature=TEMPERATURE_DEFAULT,
        max_calls=MAX_CALLS_DEFAULT
    )
    df = pd.DataFrame(facts, columns=[
        "pdf_file","page","metric","value","unit","currency_symbol",
        "is_percent","topic","period","comment","evidence"
    ])
    df.to_csv(FACTS_CSV_DEFAULT, index=False)
    print(f"Saved AI facts → {FACTS_CSV_DEFAULT} ({len(df)} rows)")

    tdf = extract_tables_simple(PDF_DEFAULT)
    if not tdf.empty:
        tdf.to_csv(TABLES_CSV_DEFAULT, index=False)
        print(f"Saved tables → {TABLES_CSV_DEFAULT} ({len(tdf)} rows)")
    else:
        print("ℹ️ No tables detected by pdfplumber. For complex tables consider camelot/tabula.")

def run_cli():
    ap = argparse.ArgumentParser(description="AI-enhanced ETL: PDF → facts.csv + tables.csv (Qwen Turbo via DashScope)")
    ap.add_argument("--pdf", default=PDF_DEFAULT, help="Path to PDF")
    ap.add_argument("--facts_csv", default=FACTS_CSV_DEFAULT, help="Output CSV for AI facts")
    ap.add_argument("--tables_csv", default=TABLES_CSV_DEFAULT, help="Output CSV for tables")
    ap.add_argument("--model", default=MODEL_DEFAULT, help="LLM model (default: qwen-turbo)")
    ap.add_argument("--api_key_env", default=API_KEY_ENV_DEFAULT, help="Env var for API key (default: DASHSCOPE_API_KEY)")
    ap.add_argument("--base_url", default=BASE_URL_DEFAULT, help="Base URL (DashScope compatible mode)")
    ap.add_argument("--temperature", type=float, default=TEMPERATURE_DEFAULT)
    ap.add_argument("--max_calls", type=int, default=None)
    ap.add_argument("--pages", type=str, default=None, help='Pages filter like "1-5,7,10-12"')
    ap.add_argument("--topics", type=str, default=None, help='Comma list of topics (e.g., "aws,revenue,fcf")')
    ap.add_argument("--no_tables", action="store_true", help="Skip table extraction")
    args = ap.parse_args()

    topics_filter = [t.strip() for t in args.topics.split(",")] if args.topics else None

    facts = ai_extract_pdf_to_rows(
        pdf_path=args.pdf,
        model=args.model,
        api_key_env=args.api_key_env,
        base_url=args.base_url,
        temperature=args.temperature,
        max_calls=args.max_calls,
        pages=args.pages,
        topics_filter=topics_filter
    )
    df = pd.DataFrame(facts)
    df.to_csv(args.facts_csv, index=False)
    print(f"Saved AI facts → {args.facts_csv} ({len(df)} rows)")

    if not args.no_tables:
        tdf = extract_tables_simple(args.pdf)
        if not tdf.empty:
            tdf.to_csv(args.tables_csv, index=False)
            print(f"Saved tables → {args.tables_csv} ({len(tdf)} rows)")
        else:
            print("No tables detected by pdfplumber. For complex tables consider camelot/tabula.")

if __name__ == "__main__":
    import sys
    if any("ipykernel" in a or "jupyter" in a for a in sys.argv):
        run_notebook_defaults()
    else:
        run_cli()

Saved AI facts → Output.csv (1047 rows)
Saved tables → tables.csv (434 rows)


In [1]:
pip install pdfplumber pandas python-dotenv langchain langchain-openai

Collecting pdfplumberNote: you may need to restart the kernel to use updated packages.

  Downloading pdfplumber-0.11.7-py3-none-any.whl.metadata (42 kB)
Collecting pdfminer.six==20250506 (from pdfplumber)
  Downloading pdfminer_six-20250506-py3-none-any.whl.metadata (4.2 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.0-py3-none-win_amd64.whl.metadata (48 kB)
Downloading pdfplumber-0.11.7-py3-none-any.whl (60 kB)
Downloading pdfminer_six-20250506-py3-none-any.whl (5.6 MB)
   ---------------------------------------- 0.0/5.6 MB ? eta -:--:--
   - -------------------------------------- 0.3/5.6 MB ? eta -:--:--
   ----- ---------------------------------- 0.8/5.6 MB 2.4 MB/s eta 0:00:03
   ------------- -------------------------- 1.8/5.6 MB 3.4 MB/s eta 0:00:02
   ------------------ --------------------- 2.6/5.6 MB 3.5 MB/s eta 0:00:01
   ------------------------ --------------- 3.4/5.6 MB 3.5 MB/s eta 0:00:01
   ----------------------------- ---------- 4.2/

In [3]:
pip install pdfplumber pandas langchain-openai langchain-core openai

Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
os.environ["DASHSCOPE_API_KEY"] = "sk-2de649202adc42b6bff0b0a16865cb7e"