**Bulk Open PageRank Fetcher — Developer & Operator Guide**
*(version 2025‑06‑16)*

---

## 1  What this module does

* Queries the **Open PageRank (OPR) REST API** for PageRank®‐like scores.
* Accepts any mixture of URLs, host‑names or raw strings, extracts **valid registrable domains** and ignores the rest.
* Caches results locally (`cache.json`) and refreshes them **only when OPR publishes a newer dataset**.
* Obeys the service’s quotas automatically:

  * ≤ 10 000 HTTP calls per rolling hour.
  * ≤ 4 300 000 domains retrieved per rolling 24 h window.
* Runs a **background thread** that can refresh the entire cache after a dataset update, while still serving the foreground request.

---

## 2  High‑level architecture

```
                +-------------+
input strings → | extractor   |-- invalid → drop
                +-------------+
                       │ valid domains
                       ▼
               +------------------+
               | cache look‑up    |
               +------------------+
          hit  │              │ miss /
               │              │ stale
               ▼              ▼
      return cached   +-------------------+
         (O(µs))      | fetch_opr_batch   |—→ OPR API
                      +-------------------+
                             │
                             ▼
                    +------------------+
                    | quota checkers   |  (hourly / daily)
                    +------------------+
                             │
                             ▼
                 update cache & return result
```

A separate scheduler thread periodically calls the same `fetch_opr_batch` in **chunks of 100** until every stale record is refreshed, throttled by the same quota monitors.

---

## 3  Installation prerequisites

| Component | Version / Note                       |
| --------- | ------------------------------------ |
| Python    |  3.8 +                               |
| Packages  |  `requests` *(pip install requests)* |

No external databases or message brokers are required; the cache is a single JSON file kept next to the script.

---

## 4  Configuration

| Name                  | Where                                                                | Meaning                                                                       |
| --------------------- | -------------------------------------------------------------------- | ----------------------------------------------------------------------------- |
| `API_KEY`             | hard‑coded in `__main__` or supplied from an env‑var that you inject | Your personal key from [https://openpagerank.com/](https://openpagerank.com/) |
| `cache.json`          | same directory as the script (default)                               | Persistent store for all domains already fetched                              |
| `MAX_CALLS_PER_HOUR`  | module constant                                                      | Leave at 10 000 unless OPR changes its policy                                 |
| `MAX_DOMAINS_PER_DAY` | module constant                                                      | Leave at 4 300 000                                                            |
| `DOMAINS_PER_CALL`    | module constant                                                      | Must stay at 100 (API limit)                                                  |

---

## 5  Public entry points

### 5.1 `process_urls_in_batches(...)`

| Parameter   | Type              | Description                                                   |
| ----------- | ----------------- | ------------------------------------------------------------- |
| `urls`      | `List[Any]`       | Arbitrary list of strings / URLs / host‑names                 |
| `cache`     | `Dict[str, dict]` | The in‑memory cache object (usually loaded by `load_cache()`) |
| `api_key`   | `str`             | Your OPR key                                                  |
| `global_lu` | `str`             | Latest **global** dataset date (`YYYY‑MM‑DD`)                 |

**Returns** `Dict[str, Any]` – mapping every *valid* domain found in `urls` to a float PageRank value or `None` if the API had no data.

> Use this when you want a **synchronous** answer for an ad‑hoc list of URLs.

---

### 5.2 `schedule_full_refresh(...)`

Spawns a **daemon thread** that calls `refresh_full_dataset` and then `save_cache`. You normally call this once at application start‑up *after* you have learnt `global_lu` from a cheap single‑domain probe (`google.com` is used in the template).

---

### 5.3 Helper utilities

| Function                                       | Purpose                                                                                 |
| ---------------------------------------------- | --------------------------------------------------------------------------------------- |
| `load_cache(path="cache.json")` / `save_cache` | JSON persistence                                                                        |
| `extract_domain(obj)`                          | Converts heterogenous input to a bare domain or returns `None`                          |
| `is_valid_domain(s)`                           | RFC‑ish regex check                                                                     |
| `parse_opr_date(s)`                            | Converts OPR’s `last_updated` header (“25th Dec 2024” or “1st June 2025”) into ISO date |

---

## 6  How to embed in your own project

```python
from opr_fetcher import (
    load_cache, save_cache,
    fetch_opr_batch, process_urls_in_batches,
    schedule_full_refresh
)

API_KEY = os.getenv("OPR_KEY")

cache = load_cache()

# discover latest dataset date once
global_lu = fetch_opr_batch(["google.com"], API_KEY)["google.com"]["last_updated"]

# background auto‑refresh (non‑blocking)
schedule_full_refresh(cache, API_KEY, global_lu)

# anywhere in your code:
urls = ["https://www.nytimes.com", "foo", "bar.co.uk/?x=1"]
scores = process_urls_in_batches(urls, cache, API_KEY, global_lu)
print(scores)       # {'nytimes.com': 7.9, 'bar.co.uk': 4.4}

# flush cache to disk on app shutdown
save_cache(cache)
```

You can safely run multiple foreground batches per process; all share the same thread‑safe quota counters.

---

## 7  Operational characteristics

| Aspect                | Detail                                                                                                                                                                                                                |
| --------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Throughput**        | Up to 1 000 000 domains per hour (10 000 calls × 100 domains) until the daily 4.3 M cap is hit.                                                                                                                       |
| **Thread‑safety**     | Internal counters (`RL_LOCK`) protect against races **inside one Python process**. If you run multiple processes or containers, use an external lock (Redis, DB row) around `check_rate_limit` & `check_daily_limit`. |
| **Idempotency**       | The cache ensures duplicate domains are fetched once per dataset version.                                                                                                                                             |
| **Failure behaviour** | Network errors: 3 attempts with 5 s back‑off, then exception; the offending batch records `page_rank=None`. Parsing errors: logged and default to today’s date.                                                       |
| **Disk usage**        | Roughly 90 B per domain in `cache.json`. One million domains ≈ 85 MB.                                                                                                                                                 |

---

## 8  Typical console session

```
$ python opr_fetcher.py
[INFO] Fetching OPR global last_updated via 'google.com' …
[INFO] Global OPR last_updated = 2025-06-01
[SCHEDULER] Cache already fresh – nothing to do.
[INFO] Processing request …
   example.com                    → 6.56
   kaveh.com                      → 0
   fbk.eu                         → 4.53
   hafez.it                       → 2.01
   google.com                     → 10
[INFO] Done.
[INFO] Cache saved; background refresh continues (if needed).
```

---

## 9  Extending the module

* **Custom persistence** — swap `load_cache`/`save_cache` with a Postgres table or Redis hash if you need multi‑process coherence.
* **Advanced scheduling** — replace the bare thread with `APScheduler`, Celery beat, or a cron job if you prefer.
* **Metrics & logging** — wrap log prints with your favourite logger and export Prometheus counters from `RATE_LIMIT` and `DAILY_LIMIT`.
* **More validation rules** — adjust `DOMAIN_RE` for stricter or laxer hostnames (IDN Unicode, private TLDs, etc.).

---

## 10  Limitations & caveats

* Designed around **OpenPageRank’s** current limits (100‑domain batch, 10k calls/h, 4.3 M domains/day). If the service changes these, update the constants.
* Single‑threaded quota tracking; separate OS processes will not coordinate without extra work.
* Relies on the OPR header `last_updated`; if the API removes or renames it, `parse_opr_date` will need an update.

---

Happy ranking!


In [None]:
#!/usr/bin/env python3
"""
Bulk Open‑PageRank fetcher with caching, per‑hour and per‑day limits,
background full refresh, and strict domain validation.

Author: Kaveh
Updated: 2025‑06‑16
"""


import datetime
import json
import re
import threading
import time
from typing import Any, Dict, List, Union
from urllib.parse import urlparse

import requests

# ------------------------- #
#   RATE‑ & QUOTA LIMITS    #
# ------------------------- #
MAX_CALLS_PER_HOUR = 10_000                  # OPR limit
MAX_DOMAINS_PER_DAY = 4_300_000              # OPR daily dataset limit
DOMAINS_PER_CALL = 100                       # OPR batch size

RATE_LIMIT = {
    "start_time": time.time(),               # start of the current hour window
    "calls": 0                               # successful calls in this window
}

DAILY_LIMIT = {
    "day_start": time.time(),                # start of current 24‑h window
    "domains": 0                             # domains fetched in this window
}

RL_LOCK = threading.Lock()  # ensures atomic updates when threads are used


def check_rate_limit() -> None:
    """Block if the script would exceed 10 000 calls within an hour."""
    with RL_LOCK:
        now = time.time()
        elapsed = now - RATE_LIMIT["start_time"]

        # reset hourly window
        if elapsed >= 3600:
            RATE_LIMIT.update(start_time=now, calls=0)

        # wait out the hour if quota exhausted
        if RATE_LIMIT["calls"] >= MAX_CALLS_PER_HOUR:
            remaining = int(3600 - elapsed)
            print(f"[RATE LIMIT] Hourly quota reached — sleeping {remaining} s …")
            time.sleep(max(0, remaining))
            RATE_LIMIT.update(start_time=time.time(), calls=0)

        RATE_LIMIT["calls"] += 1


def check_daily_limit(domains_to_fetch: int) -> None:
    """Block if the script would exceed 4.3 M domains within the current day."""
    with RL_LOCK:
        now = time.time()
        elapsed = now - DAILY_LIMIT["day_start"]

        # reset daily window
        if elapsed >= 86_400:
            DAILY_LIMIT.update(day_start=now, domains=0)

        # wait out the day if quota exhausted
        if DAILY_LIMIT["domains"] + domains_to_fetch > MAX_DOMAINS_PER_DAY:
            remaining = int(86_400 - elapsed)
            print(f"[DAILY LIMIT] Daily quota reached — sleeping {remaining} s …")
            time.sleep(max(0, remaining))
            DAILY_LIMIT.update(day_start=time.time(), domains=0)

        DAILY_LIMIT["domains"] += domains_to_fetch


# -------------------------------- #
#     HELPER – DATE CONVERSION     #
# -------------------------------- #
def parse_opr_date(date_str: str) -> str:
    """
    Convert Open‑PageRank 'last_updated' strings to 'YYYY‑MM‑DD'.

    Handles both abbreviated months ('25th Dec 2024') and full month
    names ('1st June 2025').  If parsing fails, today's date is returned.
    """
    # Split into components and strip ordinal suffixes
    try:
        day, mon, year = date_str.split()
    except ValueError:
        print(f"[WARN] Could not parse OPR date '{date_str}' (unexpected shape)")
        return datetime.date.today().isoformat()

    day = re.sub(r"(st|nd|rd|th)$", "", day)
    cleaned = f"{day} {mon} {year}"

    # Try both formats: abbreviated month (%b) and full month (%B)
    for fmt in ("%d %b %Y", "%d %B %Y"):
        try:
            return datetime.datetime.strptime(cleaned, fmt).strftime("%Y-%m-%d")
        except ValueError:
            continue

    # Last resort
    print(f"[WARN] Could not parse OPR date '{date_str}'")
    return datetime.date.today().isoformat()



# -------------------------------- #
#       HELPER – HTTP RETRIES      #
# -------------------------------- #
def safe_get(url: str,
             params: Dict[str, Any],
             headers: Dict[str, str]) -> requests.Response:
    """GET wrapper with three attempts and 5‑second back‑off."""
    backoff, retries = 5, 3
    for attempt in range(1, retries + 1):
        try:
            resp = requests.get(url, params=params, headers=headers, timeout=30)
            resp.raise_for_status()
            return resp
        except requests.exceptions.RequestException as exc:
            if attempt < retries:
                print(f"[WARN] Attempt {attempt} failed ({exc}) – retrying in "
                      f"{backoff}s …")
                time.sleep(backoff)
            else:
                print(f"[ERROR] GET failed after {retries} attempts: {exc}")
                raise


# -------------------------------- #
#     DOMAIN VALIDATION & PARSE    #
# -------------------------------- #
DOMAIN_RE = re.compile(
    # e.g. example.co.uk, foo-bar.io, xn--bcher-kva.ch (IDN punycode)
    r"^(?:[a-z0-9-]{1,63}\.)+[a-z0-9-]{2,63}$",
    re.IGNORECASE
)


def is_valid_domain(domain: str) -> bool:
    """Return True if the string looks like a real domain."""
    return bool(DOMAIN_RE.match(domain))


def extract_domain(url_candidate: Any) -> Union[str, None]:
    """
    Parse the candidate into a bare domain.
    Returns None if the candidate is not a valid registrable domain.
    """
    if not isinstance(url_candidate, str):
        return None

    candidate = url_candidate.strip()
    if not candidate:
        return None

    # prepend scheme for urlparse
    candidate = candidate if "://" in candidate else f"http://{candidate}"

    try:
        netloc = urlparse(candidate).netloc.lower()
        if netloc.startswith("www."):
            netloc = netloc[4:]
        return netloc if is_valid_domain(netloc) else None
    except Exception:  # noqa: BLE001
        return None


# -------------------------------- #
#         BATCH OPR FETCH          #
# -------------------------------- #
def fetch_opr_batch(domains: List[str],
                    api_key: str) -> Dict[str, Dict[str, Any]]:
    """
    Fetch PageRank for ≤ 100 domains in one call.
    Returns {domain: {"page_rank", "last_updated"}}.
    """
    if len(domains) > DOMAINS_PER_CALL:
        raise ValueError("fetch_opr_batch handles max 100 domains.")

    # observe quotas
    check_daily_limit(len(domains))
    check_rate_limit()

    base_url = "https://openpagerank.com/api/v1.0/getPageRank"
    params = [("domains[]", d) for d in domains]
    headers = {"API-OPR": api_key}
    result: Dict[str, Dict[str, Any]] = {}

    try:
        data = safe_get(base_url, params, headers).json()
        batch_lu = parse_opr_date(data.get("last_updated", ""))

        for item in data.get("response", []):
            domain = item.get("domain")
            result[domain] = {
                "page_rank": item.get("page_rank_decimal"),
                "last_updated": batch_lu
            }

        # placeholders for domains missing in response
        for d in domains:
            result.setdefault(d, {"page_rank": None, "last_updated": batch_lu})

    except Exception as exc:    # noqa: BLE001
        print(f"[ERROR] Batch fetch failed for {domains}: {exc}")
        today = datetime.date.today().isoformat()
        for d in domains:
            result[d] = {"page_rank": None, "last_updated": today}

    return result


# -------------------------------- #
#        CACHE FRESHNESS TEST      #
# -------------------------------- #
def should_refresh_domain(cached: Dict[str, Any],
                          global_lu: str) -> bool:
    """True if cached record is older than OPR's global `last_updated`."""
    cached_lu = cached.get("opr_last_updated")
    if not cached_lu:
        return True
    try:
        return (datetime.date.fromisoformat(cached_lu) <
                datetime.date.fromisoformat(global_lu))
    except ValueError:
        return True


# -------------------------------- #
#      MAIN BATCH PROCESSOR        #
# -------------------------------- #
def process_urls_in_batches(urls: List[Any],
                            cache: Dict[str, Dict[str, Any]],
                            api_key: str,
                            global_lu: str) -> Dict[str, Any]:
    """
    Resolve a heterogeneous list of inputs to domains, pull PageRank where
    required, update cache, and return {domain: page_rank}.
    """
    results: Dict[str, Any] = {}
    domains = {extract_domain(u) for u in urls}
    domains.discard(None)  # remove invalids

    fetch_list: List[str] = []
    for d in domains:
        c = cache.get(d)
        if c and not should_refresh_domain(c, global_lu):
            results[d] = c["page_rank_decimal"]
        else:
            fetch_list.append(d)

    # fetch in chunks
    for i in range(0, len(fetch_list), DOMAINS_PER_CALL):
        chunk = fetch_list[i:i + DOMAINS_PER_CALL]
        batch = fetch_opr_batch(chunk, api_key)
        today = datetime.date.today().isoformat()

        for d in chunk:
            pr = batch[d]["page_rank"]
            lu = batch[d]["last_updated"]
            cache[d] = {
                "page_rank_decimal": pr,
                "last_checked": today,
                "opr_last_updated": lu
            }
            results[d] = pr

    return results


# -------------------------------- #
#       FULL‑DATASET REFRESH       #
# -------------------------------- #
def refresh_full_dataset(cache: Dict[str, Dict[str, Any]],
                         api_key: str,
                         global_lu: str) -> None:
    """
    Refresh every domain in cache that is older than the current global_lu.
    Respects hourly and daily quotas; may take hours to finish.
    """
    stale = [d for d, rec in cache.items()
             if should_refresh_domain(rec, global_lu)]

    if not stale:
        print("[SCHEDULER] Cache already fresh – nothing to do.")
        return

    print(f"[SCHEDULER] {len(stale):,} stale domains to refresh …")
    for i in range(0, len(stale), DOMAINS_PER_CALL):
        chunk = stale[i:i + DOMAINS_PER_CALL]
        batch = fetch_opr_batch(chunk, api_key)
        today = datetime.date.today().isoformat()

        for d in chunk:
            pr = batch[d]["page_rank"]
            lu = batch[d]["last_updated"]
            cache[d] = {
                "page_rank_decimal": pr,
                "last_checked": today,
                "opr_last_updated": lu
            }

        # optional: periodically save progress
        if i % (DOMAINS_PER_CALL * 100) == 0:
            save_cache(cache)

    print("[SCHEDULER] Full refresh completed.")


def schedule_full_refresh(cache: Dict[str, Dict[str, Any]],
                          api_key: str,
                          global_lu: str) -> None:
    """Spawn a background thread to run `refresh_full_dataset`."""
    threading.Thread(
        target=lambda: (refresh_full_dataset(cache, api_key, global_lu),
                        save_cache(cache)),
        daemon=True,
        name="opr_full_refresh"
    ).start()


# -------------------------------- #
#         CACHE SERIALISERS        #
# -------------------------------- #
def load_cache(path: str = "cache.json") -> Dict[str, Dict[str, Any]]:
    try:
        with open(path, "r") as fh:
            return json.load(fh)
    except (FileNotFoundError, json.JSONDecodeError):
        return {}


def save_cache(cache: Dict[str, Dict[str, Any]],
               path: str = "cache.json") -> None:
    with open(path, "w") as fh:
        json.dump(cache, fh, indent=2)



In [None]:

# -------------------------------- #
#        DEMO / ENTRY‑POINT        #
# -------------------------------- #
def example_usage(urls: List[Any],
                  cache: Dict[str, Dict[str, Any]],
                  api_key: str,
                  global_lu: str) -> None:
    print("[INFO] Processing request …")
    results = process_urls_in_batches(urls, cache, api_key, global_lu)

    for domain, pr in results.items():
        print(f"   {domain:<30} → {pr}")

    print("[INFO] Done.")


if __name__ == "__main__":
    API_KEY = "Your OPR key"   #

    # 1) load existing cache
    cache_dict = load_cache()

    # 2) get current global last_updated using a cheap single‑domain call
    print("[INFO] Fetching OPR global last_updated via 'google.com' …")
    global_lu = fetch_opr_batch(["google.com"], API_KEY)["google.com"]["last_updated"]
    print(f"[INFO] Global OPR last_updated = {global_lu}")

    # 3) schedule the full‑dataset refresh (non‑blocking)
    schedule_full_refresh(cache_dict, API_KEY, global_lu)

    # 4) handle the “foreground” request
    sample_inputs = [
        "https://www.google.com/search?q=somequery",
        "hafez.it",
        "fbk.eu",
        "example.com",
        "mail",          # rejected by validator
        "www.kaveh.com",
        12345            # rejected – not str
    ]
    example_usage(sample_inputs, cache_dict, API_KEY, global_lu)

    # 5) persist any immediate changes
    save_cache(cache_dict)
    print("[INFO] Cache saved; background refresh continues (if needed).")



[INFO] Fetching OPR global last_updated via 'google.com' …
[INFO] Global OPR last_updated = 2025-06-01
[SCHEDULER] Cache already fresh – nothing to do.
[INFO] Processing request …
   example.com                    → 6.56
   kaveh.com                      → 0
   fbk.eu                         → 4.53
   hafez.it                       → 2.01
   google.com                     → 10
[INFO] Done.
[INFO] Cache saved; background refresh continues (if needed).
