In [10]:
import pandas as pd
import numpy as np
from typing import Iterable, Dict, Any, List, Optional
import requests
from functools import lru_cache
from collections import Counter
import re 


In [None]:
import re, sqlite3, threading, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import closing
from functools import lru_cache

import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

BASE = "https://rxnav.nlm.nih.gov/REST"
DB_PATH = "rx_cache.sqlite"

# ---------- Cleaning ----------
def clean_csv(path, keep=("State","NDC","Units Reimbursed","Number of Prescriptions"),
              drop_xx=True, add_year=True):
    # Removed chunksize parameter so read_csv returns a DataFrame instead of an iterator
    df = pd.read_csv(
        path,
        usecols=lambda c: c in set(keep),
        dtype={"State": "string", "NDC": "string"},
        low_memory=False
    )
    df["State"] = df["State"].str.strip().str.upper()
    for c in ("Units Reimbursed", "Number of Prescriptions"):
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(subset=list(keep))
    if drop_xx:
        df = df[df["State"] != "XX"]
    if add_year:
        df["Year"] = str(path)[4:8]
    # Keep tidy cols
    cols = [c for c in (*keep, "Year") if c in df.columns]
    return df.loc[:, cols].copy()

# ---------- NDC normalize ----------
def ndc11_candidates(ndc):
    s = re.sub(r"\D", "", str(ndc or ""))
    if len(s) == 11:
        return [s]
    if len(s) == 10:
        return [
            s[:4].rjust(5,"0")+s[4:8]+s[8:],   # 4-4-2
            s[:5]+s[5:8].rjust(4,"0")+s[8:],   # 5-3-2
            s[:9]+s[9:].rjust(2,"0"),          # 5-4-1
        ]
    return []

# ---------- HTTP session w/ retries ----------
def make_sess():
    s = requests.Session()
    s.headers.update({"User-Agent": "ndc-enrichment/parallel/1.0"})
    retry = Retry(total=5, backoff_factor=0.4, status_forcelist=[429,500,502,503,504])
    s.mount("https://", HTTPAdapter(max_retries=retry))
    return s

@lru_cache(maxsize=100_000)
def ndcproperties(ndc11):
    with make_sess() as s:
        r = s.get(f"{BASE}/ndcproperties.json", params={"id": ndc11, "ndcstatus": "ALL"}, timeout=10)
        r.raise_for_status()
        return (r.json().get("ndcPropertyList") or {}).get("ndcProperty") or []

@lru_cache(maxsize=100_000)
def rx_allprops(rxcui):
    with make_sess() as s:
        r = s.get(f"{BASE}/rxcui/{rxcui}/allProperties.json", params={"prop":"names"}, timeout=10)
        r.raise_for_status()
        return (r.json().get("propConceptGroup") or {}).get("propConcept") or []

def props_dict(item):
    plist = (item.get("propertyConceptList") or {}).get("propertyConcept") or []
    return { (p.get("propName") or "").upper(): (p.get("propValue") or "").strip()
             for p in plist }

# ---------- SQLite cache ----------
DDL = """
CREATE TABLE IF NOT EXISTS ndc_map (
  ndc11 TEXT PRIMARY KEY,
  rxcui TEXT,
  brand_name TEXT,
  generic_name TEXT,
  family_in TEXT,
  family_scd TEXT,
  family_bn TEXT,
  labeler TEXT,
  fetched_at REAL
);
CREATE INDEX IF NOT EXISTS idx_rxcui ON ndc_map(rxcui);
"""

def init_db(path=DB_PATH):
    with closing(sqlite3.connect(path, check_same_thread=False)) as con:
        con.executescript(DDL)
        con.commit()

def get_cached(ndc11_list, path=DB_PATH):
    if not ndc11_list: return pd.DataFrame(columns=["ndc11"])
    qmarks = ",".join("?"*len(ndc11_list))
    with closing(sqlite3.connect(path, check_same_thread=False)) as con:
        return pd.read_sql_query(
            f"SELECT * FROM ndc_map WHERE ndc11 IN ({qmarks})", con, params=ndc11_list
        )

_lock = threading.Lock()

def upsert_row(row, path=DB_PATH):
    with _lock, closing(sqlite3.connect(path, check_same_thread=False)) as con:
        con.execute("""
            INSERT INTO ndc_map (ndc11,rxcui,brand_name,generic_name,family_in,family_scd,family_bn,labeler,fetched_at)
            VALUES (?,?,?,?,?,?,?,?,?)
            ON CONFLICT(ndc11) DO UPDATE SET
              rxcui=excluded.rxcui,
              brand_name=excluded.brand_name,
              generic_name=excluded.generic_name,
              family_in=excluded.family_in,
              family_scd=excluded.family_scd,
              family_bn=excluded.family_bn,
              labeler=excluded.labeler,
              fetched_at=excluded.fetched_at
        """, (row["ndc11"], row["rxcui"], row["brand_name"], row["generic_name"],
              row["family_in"], row["family_scd"], row["family_bn"], row["labeler"], time.time()))
        con.commit()

# ---------- Worker (fetch one NDC11) ----------
def fetch_one(ndc11):
    items = ndcproperties(ndc11)
    if not items:
        return None
    it = items[0]
    pr = props_dict(it)
    rxcui = it.get("rxcui")
    brand = pr.get("PROPRIETARYNAME")
    generic = pr.get("NONPROPRIETARYNAME")
    labeler = pr.get("LABELER") or pr.get("LABELERNAME")
    family_in = family_scd = family_bn = None
    if rxcui:
        allp = rx_allprops(rxcui)
        def first(kind):
            for p in allp:
                if (p.get("propName") or "").upper() == kind:
                    return (p.get("propValue") or "").strip()
        family_in  = first("IN")
        family_scd = first("SCD")
        family_bn  = first("BN")
        if not generic:
            generic = family_scd or family_in
    return {
        "ndc11": it.get("ndcItem") or ndc11,
        "rxcui": rxcui,
        "brand_name": brand,
        "generic_name": generic,
        "family_in": family_in,
        "family_scd": family_scd,
        "family_bn": family_bn,
        "labeler": labeler
    }

# ---------- Warm cache in parallel ----------
def warm_cache_from_ndcs(ndc_series, max_workers=12, pause=0.0):
    """Accepts raw NDCs, normalizes to NDC11, filters ones missing in cache, fetches in parallel."""
    init_db(DB_PATH)

    # 1) Unique raw NDCs -> candidate NDC11s -> pick all candidates (we'll test each)
    raw = (ndc_series.dropna().astype(str).str.strip().unique().tolist())
    cands = set()
    for ndc in raw:
        for c in ndc11_candidates(ndc):
            cands.add(c)
    cands = sorted(cands)

    # 2) Remove already-cached
    cached = set(get_cached(cands)["ndc11"].tolist())
    todo = [c for c in cands if c not in cached]
    print(f"Unique NDC11 candidates: {len(cands)} | Cached: {len(cached)} | To fetch: {len(todo)}")

    if not todo:
        return

    # 3) Parallel fetch with polite pause between tasks (optional)
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(fetch_one, ndc11): ndc11 for ndc11 in todo}
        for i, fut in enumerate(as_completed(futures), 1):
            ndc11 = futures[fut]
            try:
                row = fut.result()
                if row:
                    upsert_row(row)
            except Exception:
                pass
            if pause:
                time.sleep(pause)
            if i % 200 == 0:
                print(f"  fetched {i}/{len(todo)}")

# ---------- Join fast from cache ----------
def enrich_from_cache(df):
    # Normalize to a *single* chosen NDC11 per row (first valid candidate)
    df = df.copy()
    df["ndc11"] = df["NDC"].apply(lambda x: next(iter(ndc11_candidates(x)), None))
    cached = get_cached(df["ndc11"].dropna().unique().tolist())
    return df.merge(cached, on="ndc11", how="left")

# 1) Clean fast
cleaned = clean_csv("SDUD2017.csv")

# 2) Warm the cache *once per set of files* (parallel + persistent)
#    Tweak max_workers to your network/CPU; start with 8–12.
warm_cache_from_ndcs(cleaned["NDC"], max_workers=12, pause=0.0)

# 3) Enrich instantly from local SQLite (no HTTP for already-cached NDCs)
enriched = enrich_from_cache(cleaned)

# 4) Repeat steps 1 & 3 for other files — step 2 will be tiny because cache is warm.

Unique NDC11 candidates: 29607 | Cached: 0 | To fetch: 29607
  fetched 200/29607
  fetched 400/29607
  fetched 600/29607
  fetched 800/29607
  fetched 1000/29607
  fetched 1200/29607
  fetched 1400/29607
  fetched 1600/29607
  fetched 1800/29607
  fetched 2000/29607
