# PubMed Processing + Quick Relevance Check

## Notebook outline

| Cell | Purpose |
|------|---------|
| 1 | Load .txt paths from data/pubmed_fetch/, parse each into a record dict, build records list |
| 2 | Define signal terms and has_signal; compute summary stats (abstracts, signal match %, journals) |
| 3 | Spot-check: titles that did not match signal terms (first 10) |
| 4 | Define normalize_whitespace and normalize_date for export |
| 5 | Build record_docs (id, text, metadata), write JSONL to data/pubmed_records_YYYYMMDD.jsonl |
| 6 | Optional: upload JSONL to S3 processed/ prefix (if S3_BUCKET set) |
| 7 | Start Bedrock KB ingestion job (requires BEDROCK_KB_ID, BEDROCK_KB_DATA_SOURCE_ID) |

---

**Prerequisites.** Run the search notebook first so `data/pubmed_fetch/` exists.

This notebook reads the fetched `.txt` records from `data/pubmed_fetch/`, normalizes them into a simple structure, and runs a lightweight relevance check to see if the search filter is producing useful articles. It is intentionally simple and fast to run locally before investing time in chunking/embedding.

**Env (optional).** Cells 6–7 use: `S3_BUCKET`, `S3_PREFIX` (cell 6); `BEDROCK_KB_ID`, `BEDROCK_KB_DATA_SOURCE_ID` (cell 7). Set in `.env` or your shell.

In [30]:
# Cell 1: Load .txt paths from data/pubmed_fetch/, parse each into a record dict, build records list.
import glob
import os
import re
from pathlib import Path
from typing import Any


def load_env(reload: bool = False) -> str | None:
    """Load .env if present; returns the resolved path when found."""
    try:
        from dotenv import find_dotenv, load_dotenv

        dotenv_path = find_dotenv(usecwd=True) or str(Path(".env").resolve())
        load_dotenv(dotenv_path=dotenv_path, override=reload)
        return dotenv_path
    except Exception:
        return None


DOTENV_PATH = load_env()

FETCH_DIR: str = "data/pubmed_fetch"
paths: list[str] = sorted(glob.glob(os.path.join(FETCH_DIR, "*.txt")))

if not paths:
    raise FileNotFoundError(
        f"No records found in {FETCH_DIR}. Run the search notebook first."
    )


def parse_record(text: str) -> dict[str, Any]:
    """Parse our .txt fetch format (MEDLINE-derived; see search notebook) into a simple dict (pmid, title, authors, journal, date, abstract)."""
    record: dict[str, Any] = {
        "pmid": None,
        "title": "",
        "authors": "",
        "journal": "",
        "date": "",
        "abstract": "",
    }
    abstract_lines = []
    in_abstract = False

    for line in text.splitlines():
        if line.startswith("PMID: "):
            record["pmid"] = line.replace("PMID: ", "").strip()
            continue
        if line.startswith("Title: "):
            record["title"] = line.replace("Title: ", "").strip()
            continue
        if line.startswith("Authors: "):
            record["authors"] = line.replace("Authors: ", "").strip()
            continue
        if line.startswith("Journal: "):
            record["journal"] = line.replace("Journal: ", "").strip()
            continue
        if line.startswith("Date: "):
            record["date"] = line.replace("Date: ", "").strip()
            continue
        if line.startswith("Abstract:"):
            in_abstract = True
            abstract_lines.append(line.replace("Abstract:", "").lstrip())
            continue
        if in_abstract:
            abstract_lines.append(line)

    record["abstract"] = "\n".join([line for line in abstract_lines if line]).strip()
    return record


records: list[dict[str, Any]] = []
for path in paths:
    with open(path, "r", encoding="utf-8") as handle:
        records.append(parse_record(handle.read()))

len(records)

500

In [31]:
# Cell 2: Define signal terms and has_signal; compute summary stats (abstracts, signal match %, journals).
from collections import Counter
from typing import Any


def normalize(text: str | None) -> str:
    """Collapse whitespace and lowercase for simple term matching."""
    return re.sub(r"\s+", " ", text or "").strip().lower()


signal_terms: list[str] = [
    "caregiver",
    "caregiving",
    "decision support",
    "clinical decision support",
    "cdss",
    "dementia",
    "alzheimer",
    "mild cognitive impairment",
]


def has_signal(rec: dict[str, Any]) -> bool:
    """True if the record's title or abstract contains any of our signal terms (caregiver, dementia, etc.)."""
    haystack: str = normalize(f"{rec.get('title', '')} {rec.get('abstract', '')}")
    return any(term in haystack for term in signal_terms)


with_abstract: int = sum(1 for rec in records if rec.get("abstract"))
signal_hits: list[dict[str, Any]] = [rec for rec in records if has_signal(rec)]

avg_abstract_len: float = sum(len(rec.get("abstract", "")) for rec in records) / max(
    len(records), 1
)

journal_counts: Counter[str] = Counter(
    rec.get("journal", "").strip() for rec in records
)

summary: dict[str, Any] = {
    "total_records": len(records),
    "with_abstract": with_abstract,
    "with_abstract_pct": round(with_abstract / max(len(records), 1) * 100, 1),
    "signal_match_pct": round(len(signal_hits) / max(len(records), 1) * 100, 1),
    "avg_abstract_len_chars": int(avg_abstract_len),
    "top_journals": journal_counts.most_common(5),
}

summary

{'total_records': 500,
 'with_abstract': 500,
 'with_abstract_pct': 100.0,
 'signal_match_pct': 99.8,
 'avg_abstract_len_chars': 1866,
 'top_journals': [("Alzheimer's & dementia : the journal of the Alzheimer's Association",
   37),
  ("Journal of Alzheimer's disease : JAD", 21),
  ('The Gerontologist', 21),
  ('BMC geriatrics', 19),
  ('BMJ open', 16)]}

In [32]:
# Cell 3: Spot-check — show titles that did not match signal terms (first 10).
no_signal_titles: list[str | None] = [
    rec.get("title") for rec in records if not has_signal(rec)
]
no_signal_titles[:10]

["Emotion recognition in people with Huntington's disease: A comprehensive systematic review."]

In [33]:
# Cell 4: Define normalize_whitespace and normalize_date for export.
import json
from datetime import datetime


def normalize_whitespace(text: str | None) -> str:
    """Collapse whitespace to single spaces and strip; used for export fields."""
    return re.sub(r"\s+", " ", text or "").strip()


def normalize_date(value: str | None) -> str:
    """Best-effort normalization to YYYY-MM-DD; handles '2026 Jan 7', '2025 Dec', '2025'. Falls back to original if we can't parse."""
    value = (value or "").strip()
    if not value:
        return ""
    # Examples observed: "2026 Jan 7", "2025 Dec", "2025"
    try:
        return datetime.strptime(value, "%Y %b %d").strftime("%Y-%m-%d")
    except ValueError:
        pass
    try:
        return datetime.strptime(value, "%Y %b").strftime("%Y-%m-01")
    except ValueError:
        pass
    if re.fullmatch(r"\d{4}", value):
        return f"{value}-01-01"
    return value

print("normalize_whitespace and normalize_date ready.")

normalize_whitespace and normalize_date ready.


In [34]:
# Cell 5: Build record_docs (id, text, metadata), write JSONL to data/pubmed_records_YYYYMMDD.jsonl.
from typing import Any

OUTPUT_DIR: str = "data"
RUN_DATE: str = datetime.utcnow().strftime("%Y%m%d")
OUTPUT_PATH: str = os.path.join(OUTPUT_DIR, f"pubmed_records_{RUN_DATE}.jsonl")

os.makedirs(OUTPUT_DIR, exist_ok=True)

record_docs: list[dict[str, Any]] = []
for rec in records:
    title = normalize_whitespace(rec.get("title", ""))
    abstract = normalize_whitespace(rec.get("abstract", ""))
    full_text = "\n".join([t for t in [title, abstract] if t])
    record_docs.append(
        {
            "id": rec.get("pmid"),
            "text": full_text,
            "metadata": {
                "pmid": rec.get("pmid"),
                "title": title,
                "journal": rec.get("journal"),
                "authors": rec.get("authors"),
                "date": normalize_date(rec.get("date")),
                "source": "pubmed_fetch",
            },
        }
    )

with open(OUTPUT_PATH, "w", encoding="utf-8") as handle:
    for doc in record_docs:
        handle.write(json.dumps(doc, ensure_ascii=True) + "\n")

OUTPUT_PATH, len(record_docs)

  RUN_DATE: str = datetime.utcnow().strftime("%Y%m%d")


('data/pubmed_records_20260130.jsonl', 500)

In [35]:
# Cell 6: Optional — upload JSONL to S3 processed/ prefix (skipped if S3_BUCKET not set).
load_env(reload=True)
S3_BUCKET: str = os.getenv("S3_BUCKET", "")
S3_PREFIX: str = os.getenv("S3_PREFIX", "processed/")
ARCHIVE_PREFIX: str = os.getenv("S3_ARCHIVE_PREFIX", "processed/archive/")

if not S3_BUCKET:
    print("S3_BUCKET not set; skipping upload. Processed output is in OUTPUT_PATH.")
else:
    import boto3

    s3 = boto3.client("s3")
    source_prefix = S3_PREFIX if S3_PREFIX.endswith("/") else f"{S3_PREFIX}/"
    archive_prefix = (
        ARCHIVE_PREFIX if ARCHIVE_PREFIX.endswith("/") else f"{ARCHIVE_PREFIX}/"
    )

    paginator = s3.get_paginator("list_objects_v2")
    moved = 0
    for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=source_prefix):
        for obj in page.get("Contents", []):
            key = obj.get("Key", "")
            if not key.endswith(".jsonl"):
                continue
            dest_key = key.replace(source_prefix, archive_prefix, 1)
            s3.copy_object(
                Bucket=S3_BUCKET,
                CopySource={"Bucket": S3_BUCKET, "Key": key},
                Key=dest_key,
            )
            s3.delete_object(Bucket=S3_BUCKET, Key=key)
            moved += 1

    print(f"Archived {moved} JSONL files to s3://{S3_BUCKET}/{archive_prefix}")

    upload_key = f"{source_prefix}{os.path.basename(OUTPUT_PATH)}"
    s3.upload_file(OUTPUT_PATH, S3_BUCKET, upload_key)
    print(f"Uploaded to s3://{S3_BUCKET}/{upload_key}")

Archived 2 JSONL files to s3://pubmed-rag-data/processed/archive/
Uploaded to s3://pubmed-rag-data/processed/pubmed_records_20260130.jsonl


In [36]:
# Cell 7: Start Bedrock KB ingestion job (requires BEDROCK_KB_ID, BEDROCK_KB_DATA_SOURCE_ID).
import json
import os
import subprocess
from typing import Any

load_env(reload=True)

KB_ID: str = os.getenv("BEDROCK_KB_ID", "")
DATA_SOURCE_ID: str = os.getenv("BEDROCK_KB_DATA_SOURCE_ID", "")

if not KB_ID or not DATA_SOURCE_ID:
    print(
        "BEDROCK_KB_ID or BEDROCK_KB_DATA_SOURCE_ID not set; skipping ingestion. "
        "Set both in .env or your shell to start a KB sync job."
    )
else:
    result: subprocess.CompletedProcess[str] = subprocess.run(
        [
            "aws",
            "bedrock-agent",
            "start-ingestion-job",
            "--knowledge-base-id",
            KB_ID,
            "--data-source-id",
            DATA_SOURCE_ID,
        ],
        check=True,
        capture_output=True,
        text=True,
    )

    payload: dict[str, Any] = json.loads(result.stdout)
    job_id: str = payload["ingestionJob"]["ingestionJobId"]
    print(f"Started ingestion job: {job_id}")
    {"knowledgeBaseId": KB_ID, "dataSourceId": DATA_SOURCE_ID, "ingestionJobId": job_id}

Started ingestion job: FHQWRGQRLK
