# Unpaywall Adapter (DOI → OA discovery & license enrichment)

Purpose:
- Query Unpaywall for a DOI to discover OA/full-text URLs and license metadata.
- Enrich an existing `health_document` record (from PubMed or PMC adapter) with `access` and `license` fields according to the internal schema.
- Save raw Unpaywall responses for traceability.

## Dependencies

In [18]:
import requests
import time
import json
import requests
from typing import Dict, Any, Optional, List
from urllib.parse import quote
from datetime import datetime,timezone
from pathlib import Path

## Set storage path for file

In [42]:
STORAGE_DIR = Path.cwd() / "storage"
STORAGE_DIR.mkdir(exist_ok=True)
STORAGE_DIR

WindowsPath('C:/Users/Aman Sheikh/Desktop/Projects/VeriFact/Model/harvester/storage')

## Configuration

In [20]:
# REQUIRED: provide an email address to identify your application to Unpaywall.
UNPAYWALL_EMAIL = "abamsheikh@gmail.com"

UNPAYWALL_BASE = "https://api.unpaywall.org/v2"
# polite default: 1 request / second (adjust if higher quota)
REQUEST_DELAY = 1.0

# Retry/backoff config
MAX_RETRIES = 4
BACKOFF_FACTOR = 1.5

# Headers for requests (polite identification)
HEADERS = {
    "User-Agent": "health-harvester-unpaywall/1.0 (abamsheikh@gmail.com)",
    "Accept": "application/json"
}

REQUEST_TIMEOUT = 30  # seconds

## Helpers

In [21]:
def now_iso():
    return datetime.now(timezone.utc).isoformat()

def sha256_bytes(b: bytes) -> str:
    import hashlib
    return hashlib.sha256(b).hexdigest()

In [22]:
def extract_doi(identifiers: List[Dict[str,str]]) -> Optional[str]:
    """
    Extract DOI value from identifiers list.
    Prefers type == 'doi'
    """
    for ident in identifiers:
        if ident.get("type", "").lower() == "doi":
            return ident.get("value")
    return None

## Fetcher

In [23]:
def fetch_unpaywall(doi: str, email: str = UNPAYWALL_EMAIL,
                    max_retries: int = MAX_RETRIES, timeout: int = REQUEST_TIMEOUT) -> Dict[str, Any]:
    """
    Query Unpaywall API for a DOI. Returns the parsed JSON on success.
    Retries on network errors / 5xx with exponential backoff.
    """
    if not doi:
        raise ValueError("DOI required")
    # Ensure DOI is URL-encoded for use in path
    doi_quoted = quote(doi, safe='')
    url = f"{UNPAYWALL_BASE}/{doi_quoted}"
    params = {"email": email}

    attempt = 0
    delay = REQUEST_DELAY
    while attempt <= max_retries:
        try:
            resp = requests.get(url, headers=HEADERS, params=params, timeout=timeout)
            # Respect simple rate-limiting between calls
            time.sleep(REQUEST_DELAY)
            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 404:
                # DOI not found in Unpaywall
                return {"found": False, "status_code": 404}
            elif 400 <= resp.status_code < 500:
                # Client error (bad doi etc.) — do not retry
                resp.raise_for_status()
            else:
                # 5xx server error — retry
                attempt += 1
                time.sleep(delay)
                delay *= BACKOFF_FACTOR
                continue
        except requests.exceptions.RequestException as e:
            attempt += 1
            time.sleep(delay)
            delay *= BACKOFF_FACTOR
            if attempt > max_retries:
                raise
    # If loop ends without return:
    raise RuntimeError("Unpaywall fetch failed after retries")

## Parse & choose best OA locations

In [24]:
def pick_oa_locations(unpay_json: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    From Unpaywall response, extract candidate OA locations in normalized form:
    {"url": ..., "format": "pdf|html|unknown", "source": "publisher|repository|pmc|other", "version": "...", "host_type": "..."}
    Uses top-level 'is_oa' and 'oa_locations' array; falls back to 'best_oa_location'.
    """
    if not unpay_json or unpay_json.get("found") is False:
        return []

    locations = []
    # Unpaywall provides best_oa_location: {url, url_for_pdf, host_type, version, repository_institution, license}
    best = unpay_json.get("best_oa_location")
    if best:
        url = best.get("url") or best.get("url_for_pdf")
        if url:
            loc = {
                "url": url,
                "format": "pdf" if best.get("url_for_pdf") else "html" if best.get("url") else "unknown",
                "source": best.get("host_type") or "other",
                "version": best.get("version"),
                "license": best.get("license"),
                "evidence": "best_oa_location"
            }
            locations.append(loc)

    # add all oa_locations (may include repositories and publisher)
    for loc in unpay_json.get("oa_locations", []) or []:
        url = loc.get("url") or loc.get("url_for_pdf")
        if not url:
            continue
        loc_entry = {
            "url": url,
            "format": "pdf" if loc.get("url_for_pdf") else "html" if loc.get("url") else "unknown",
            "source": loc.get("host_type") or "other",
            "version": loc.get("version"),
            "license": loc.get("license"),
            "evidence": "oa_locations"
        }
        # Avoid duplicates by url
        if not any(existing["url"] == loc_entry["url"] for existing in locations):
            locations.append(loc_entry)

    return locations


## Enrichment function

In [31]:
def enrich_document_with_unpaywall(document: Dict[str,Any], unpay_json: Dict[str,Any],
                                   raw_ref: Optional[str] = None, raw_bytes: Optional[bytes] = None,
                                   adapter_name: str = "unpaywall_adapter", adapter_version: str = "1.0") -> Dict[str,Any]:
    """
    Mutates a copy of document (health_document schema) and returns enriched document.
    - populates document['access'] and document['license'] as best as possible.
    - updates ingestion & processing metadata (adapter info, raw payload ref, sha, size).
    """
    doc = dict(document)  # shallow copy; deep-copy if you want immutability
    now = now_iso()

    # default access structure if missing
    access = doc.get("access") or {
        "has_fulltext": False,
        "access_type": "unknown",
        "fulltext_urls": []
    }

    # Unpaywall summary fields
    is_oa = bool(unpay_json.get("is_oa")) if isinstance(unpay_json, dict) else False
    best_loc = unpay_json.get("best_oa_location") if isinstance(unpay_json, dict) else None

    # pick locations
    locations = pick_oa_locations(unpay_json)

    # fill fulltext_urls
    for loc in locations:
        # map format to something consistent: pdf/html/unknown
        fmt = loc.get("format") or "unknown"
        src = loc.get("source") or "other"
        url = loc.get("url")
        entry = {"url": url, "format": fmt, "source": f"unpaywall:{src}"}
        if entry not in access.get("fulltext_urls", []):
            access.setdefault("fulltext_urls", []).append(entry)

    access["has_fulltext"] = bool(access.get("fulltext_urls"))
    # map access_type
    if is_oa:
        access["access_type"] = "open"
    elif access["has_fulltext"]:
        access["access_type"] = "open"
    else:
        access["access_type"] = access.get("access_type", "unknown")

    # license: Unpaywall top-level 'license' or best_loc license or locations' license
    license_obj = doc.get("license") or {"type": "unknown", "url": None, "notes": None}
    # check top-level
    top_license = unpay_json.get("license")
    if top_license:
        license_obj["type"] = top_license
        license_obj["url"] = None
    # else check best_oa_location/license or first location's license
    if best_loc and best_loc.get("license"):
        license_obj["type"] = best_loc.get("license")
    else:
        for loc in locations:
            if loc.get("license"):
                license_obj["type"] = loc.get("license")
                break

    # Put best available OA evidence into processing or tags
    tags = list(doc.get("tags", []))
    if is_oa:
        if "oa" not in tags:
            tags.append("oa")
    else:
        if "oa_candidate" not in tags and access["has_fulltext"]:
            tags.append("oa_candidate")

    # ingestion raw metadata
    raw_size = len(raw_bytes) if raw_bytes is not None else None
    raw_sha = sha256_bytes(raw_bytes) if raw_bytes is not None else None

    # update document
    doc["access"] = access
    doc["license"] = license_obj
    doc["tags"] = tags

    # update ingestion block
    ingestion = doc.get("ingestion", {})
    ingestion.update({
        "last_unpaywall_checked_at": now,
        "unpaywall_adapter": {
            "name": adapter_name,
            "version": adapter_version,
            "fetched_at": now,
            "source_fetch_url": f"{UNPAYWALL_BASE}/{quote((doc.get('identifiers') or [{'value':''}])[0]['value'])}"
        },
        "raw_unpaywall_ref": raw_ref,
        "raw_unpaywall_size_bytes": raw_size,
        "raw_unpaywall_sha256": raw_sha
    })
    doc["ingestion"] = ingestion

    # update processing.canonical_id if DOI present
    identifiers = doc.get("identifiers", [])
    doi_val = None
    for i in identifiers:
        if i.get("type") and i["type"].lower() == "doi":
            doi_val = i.get("value")
            break
    if doi_val:
        processing = doc.get("processing", {})
        processing["canonical_id"] = doi_val.lower()
        doc["processing"] = processing

    # last_updated
    doc["last_updated"] = now

    return doc


## Save raw Unpaywall JSON (traceability)

In [53]:
def save_unpaywall_raw(doi: str, unpay_json: Dict[str,Any], out_dir: str = "./raw_unpaywall") -> str:
    """
    Saves raw Unpaywall JSON to a file and returns the filepath.
    Filenames are DOI-safe (slashes replaced).
    """
    raw_path = STORAGE_DIR / f"unpaywall_{doi.replace('/', '_')}_raw.json"
    with open(raw_path, "w", encoding="utf-8") as f:
        json.dump(unpay_json, f, indent=2)
    return raw_path


## Example Usage

In [55]:
# Example: assume normalized PubMed doc (from earlier cells)
# Example normalized_docs[0] should be available; otherwise load from file

# Path to your JSON file
# json_file_path = STORAGE_DIR/"pubmed_normalized.json"
#
# # Open and load the file
# with open(json_file_path, "r", encoding="utf-8") as f:
#     normalized_documents = json.load(f)
#
# # Check how many documents were loaded
# print(f"✅ Loaded {len(normalized_documents)} normalized PubMed documents")
# # print(f"ℹ️ First document preview:")
# # print(json.dumps(normalized_documents[0], indent=2))
# for doc in normalized_documents:
#     # 1) get DOI from document
#     doi_val = None
#     for ident in doc.get("identifiers", []):
#         if ident.get("type","").lower() == "doi":
#             doi_val = ident.get("value")
#             break
#
#     if doi_val:
#         print("DOI:", doi_val)
#         # 2) fetch unpaywall
#         unpay = fetch_unpaywall(doi_val)
#         # 3) save raw json
#         raw_path = save_unpaywall_raw(doi_val, unpay)
#         with open(raw_path, "rb") as f:
#             raw_bytes = f.read()
#         # 4) enrich
#         enriched = enrich_document_with_unpaywall(doc, unpay, raw_ref=raw_path, raw_bytes=raw_bytes)
#         print(json.dumps(enriched["access"], indent=2))
#         print("License:", enriched["license"])
#         print("Tags:", enriched["tags"])
#     else:
#         print("No DOI found in document; cannot call Unpaywall.")
# print(f"✅ Enrichment completed.")

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Aman Sheikh\\Desktop\\Projects\\VeriFact\\Model\\harvester\\adapters\\storage\\pubmed_normalized.json'