In [None]:
"""
===============================================================================
TRADENAME CLEANING & VALIDATION PIPELINE v5 (ENGLISH-ONLY, NO EMOJIS)
===============================================================================

Major changes from v4:

  1) Output CSV contains ONLY confirmed rows (is_tradename = True)
     - Not the whole dataset, not pending rows
     - Each batch is APPENDED to the existing file (no rewrite)

  2) Row order is preserved EXACTLY as in the original input CSV
     - No alphabetical sorting
     - start_from / end_at operate on this same preserved order

  3) Order is guaranteed when splitting across workers
     - Each worker processes a specific range from the SAME ordered-unique list
     - The unique list order is identical across machines

Output CSV:
  - All original columns
  - + new columns (tradename_cleaned, tradename_corrected, ...)
  - Only rows where is_tradename = True and validated
  - Each batch appends without rewriting the file

===============================================================================
"""

import glob
import json
import os
import re
import time
from datetime import datetime

import pandas as pd
import requests

# ==============================================================================
# SETTINGS
# ==============================================================================

BASE_DIR = "/content/drive/MyDrive/DataDoseClean/Tradename Clean"

INPUT_CSV = os.path.join(BASE_DIR, "DataDoseDataset_FinalV_Last.csv")

# This CSV is updated on every batch — always contains all confirmed rows
OUTPUT_CSV = os.path.join(BASE_DIR, "dataset_with_validated_tradenames.csv")

OUTPUT_JSON = os.path.join(BASE_DIR, "tradenames_validated.json")
PROGRESS_FILE = os.path.join(BASE_DIR, "tradename_pipeline_progress.json")
LOG_FILE = os.path.join(BASE_DIR, "tradename_pipeline_log.txt")

# Keys are masked here for security (use your real keys locally)
GROQ_API_KEYS = [
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
    "gsk_***",
]

_current_key_idx = 0
_processed_count = 0
ROTATE_EVERY = 10

GROQ_MODEL = "llama-3.1-8b-instant"
GROQ_URL = "https://api.groq.com/openai/v1/chat/completions"

CONFIDENCE_THRESHOLD = 0.85

# ==============================================================================
# LOGGING
# ==============================================================================


def log(msg: str, level: str = "INFO"):
    """Print and append logs to a file (best-effort)."""
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    line = f"[{ts}] [{level}] {msg}"
    print(line)
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass


# ==============================================================================
# STEP 1 — DEEP REGEX CLEANING
# ==============================================================================
#
# Goal:
#   - Keep:   Brand name + concentration
#   - Remove: dosage form (tabs, caps, syrup, cream, F.C.tabs, ...)
#   - Remove: pack counts / total volume (20 tabs, 120 ml, 100 gm, ...)
#   - Remove: leading serial numbers (1 2 3 ...)
#   - Remove: number words (one, two, three ...)
#   - Remove: quality terms USP/BP/NA (u.s.p.xxiii, n/a yet, ...)
#
# Concentration examples to KEEP:
#   500mg | 100mg/ml | 0.5% | 1000iu | 10meq | 250mcg | 50mg/5ml
#
# Pack/volume examples to REMOVE:
#   120 ml | 100 gm | 30 tabs | 10 caps | 5 sachets
#
# Approach:
#   - Temporarily replace concentrations with placeholders before removal steps,
#     then restore them at the end.
# ==============================================================================

_DOSAGE_FORM_RE = re.compile(
    r"""
    \b(?:
      # Tablets
      f\.?c\.?tab(?:let)?s?\.?   |
      film[- ]coated[- ]tab(?:let)?s?  |
      tab(?:let)?s?              |

      # Capsules
      caps?(?:ule)?s?            |

      # Suppositories
      supp(?:ositorie)?s?        |
      suppository                |

      # Liquids / other forms
      syrup | syr\.?             |
      suspension | susp\.?       |
      solution | sol\.? | soln\.? |
      drops?                     |
      injection | inj\.?         |
      infusion                   |

      # Ampoules / vials
      amp(?:oule)?s?             |
      vial?s?                    |

      # Sachets / effervescent
      sachet?s?                  |
      effervescent               |

      # Topicals
      cream | gel                |
      oint(?:ment)?              |
      lotion | spray             |
      patch(?:es)?               |

      # Other forms
      powder | granule?s?        |
      lozenge?s?                 |
      inhal(?:er|ation)?         |
      nebuli[sz]er               |

      # Qualifiers (not part of the core name)
      nasal | ophthalmic | otic  |
      topical | oral             |
      paed(?:iatric)?            |
      pediatric | adult          |

      # Quality abbreviations / notes
      u\.s\.p\.(?:xxiii)?        |
      b\.p\.                     |
      n/?a[- ]?yet               |

      # Specific form phrase
      effervescent[- ]tablet?s?
    )\b
""",
    re.IGNORECASE | re.VERBOSE,
)

_CONCENTRATION_RE = re.compile(
    r"(?<!\w)"
    r"(\d+(?:\.\d+)?)"
    r"\s{0,1}"
    r"("
    r"mg(?:/\d*\.?\d*\s*m[lg])?"
    r"|mcg|µg|ug"
    r"|iu|meq|mmol"
    r"|%"
    r")",
    re.IGNORECASE,
)

_COUNT_VOLUME_RE = re.compile(
    r"""
    \s+
    \d+(?:\.\d+)?
    \s*
    (?:
      ml\b | l\b      |
      gm?\b | gms?\b  |
      kg\b            |
      pcs?\b          |
      units?\b        |
      x\s*\d+         |
      \*\s*\d+
    )
""",
    re.IGNORECASE | re.VERBOSE,
)

_NUMBER_WORDS_RE = re.compile(
    r"\b(?:one|two|three|four|five|six|seven|eight|nine|ten|hundred)\b",
    re.IGNORECASE,
)

_LEADING_NUMS_RE = re.compile(r"^\s*(?:\d+[\.\-\)]\s*|\d+\s+)+")

_TRAILING_COUNT_RE = re.compile(r"\s+\d+\s*$")


def clean_tradename(raw) -> str:
    """
    Clean a drug name while preserving concentration.

    Output:
      Brand Name + concentration only

    Examples:
      "Abilify 15mg 30 F.c.tabs."              -> "Abilify 15mg"
      "Abramox 100mg/ml Syrup"                 -> "Abramox 100mg/ml"
      "Abrammune 50 Mg 10 Caps.(n/a Yet)"      -> "Abrammune 50mg"
      "Abimol Extra 20"                        -> "Abimol Extra"
      "1 2 3 (one Two Three) Syrup 120 Ml"     -> ""
    """
    if raw is None or (isinstance(raw, float) and pd.isna(raw)):
        return ""

    t = str(raw).strip()
    if not t:
        return ""

    # 1) Remove HTML entities
    t = re.sub(r"&[a-zA-Z]+;", " ", t)

    # 2) Remove unwanted symbols (keep: letters, digits, whitespace, - . / % mg/ml basics)
    t = re.sub(r"[^\w\s\-\./®™°%,()]", " ", t)

    # 3) Temporarily store concentrations as placeholders
    concentrations = []

    def _save_conc(m):
        start = m.start()
        prefix = t[:start].rstrip()
        if prefix and not re.search(r"[a-zA-Z\)]$", prefix):
            return m.group(0)

        token = m.group(0).strip()
        token = re.sub(r"(\d)\s+(\w)", r"\1\2", token)
        concentrations.append(token.lower())
        return f" __CONC_{len(concentrations) - 1}__ "

    t = _CONCENTRATION_RE.sub(_save_conc, t)

    # 4) Remove total volume/pack count patterns
    t = _COUNT_VOLUME_RE.sub(" ", t)

    # 5) Remove dosage form words
    t = _DOSAGE_FORM_RE.sub(" ", t)

    # 6) Remove number words
    t = _NUMBER_WORDS_RE.sub(" ", t)

    # 7) Remove leading serial numbers
    t = _LEADING_NUMS_RE.sub("", t)

    # 8) Remove empty parentheses or parentheses containing only numbers/punctuation
    t = re.sub(r"\(\s*[\d\s\.\-]*\s*\)", " ", t)
    t = re.sub(r"\(\s*\)", " ", t)

    # 9) Normalize whitespace
    t = re.sub(r"\s+", " ", t).strip()

    # 10) Remove leftover standalone numbers (not concentration placeholders, not part of a name)
    t = re.sub(
        r"\s+(\d+)(?!\s*(?:mg|mcg|iu|meq|mmol|%|/|__CONC|\w))",
        "",
        t,
    )
    t = re.sub(r"\s+", " ", t).strip()

    # 10b) If the result contains no letters, return empty
    if t and not re.search(r"[a-zA-Z_]", t):
        return ""

    # 11) Trim punctuation from ends
    t = t.strip(" .-/()")

    # 12) Restore saved concentrations
    def _restore_conc(m):
        idx = int(m.group(1))
        return concentrations[idx] if idx < len(concentrations) else ""

    t = re.sub(r"__CONC_(\d+)__", _restore_conc, t)

    # 13) Normalize whitespace again
    t = re.sub(r"\s+", " ", t).strip()
    t = t.strip(" .-/()")

    # 14) Smart Title Case
    words = []
    for w in t.split():
        if re.match(r"^\d", w) or re.match(r".*\d.*(?:mg|ml|mcg|iu|meq|%)", w, re.I):
            words.append(w.lower())
        elif w.isupper() and 1 < len(w) <= 5:
            words.append(w)
        elif re.match(r"^[A-Za-z]\d+$", w):
            words.append(w[0].upper() + w[1:])
        else:
            words.append(w.capitalize())

    t = " ".join(words).strip()
    return t if len(t) >= 2 else ""


# ==============================================================================
# EGYPT MARKET CONTEXT — INCLUDED IN EVERY PROMPT
# ==============================================================================

_EGYPT_CONTEXT = """
EGYPT PHARMACEUTICAL MARKET CONTEXT:
- Regulator: Egyptian Drug Authority (EDA), formerly NODCAR
- Major LOCAL manufacturers (brands may be unknown globally):
    EIPICO, Pharco, Eva Pharma, CID, Amriya, Alexandria Pharma,
    Minapharm, Nile Pharma, Mash Premiere, Global Napi, MUP (Medical Union Pharmaceuticals),
    Rameda, Adwia, Sigma Tec, Egyptian Int'l Pharmaceutical Industries (EIPI),
    Delta Pharma, October Pharma, Chemipharm, Arab Drug Company
- Multinational brands registered in Egypt:
    Glaxo Egypt (GSK), Sanofi Egypt, Novartis Egypt, AstraZeneca Egypt,
    Pfizer Egypt, Abbott Egypt, Roche Egypt, Bayer Egypt, Hikma Egypt
- Egypt registers BOTH originator brands AND local copies under brand names
- Local Egyptian brand name patterns:
    - Often end in: -ox, -mox, -cin, -ol, -al, -in, -cil, -ex, -ac, -ix, -cap
    - Prefix with manufacturer: Pharco-, Eva-, Nile-, Alex-, Egy-, Sigma-
    - Example local brands: Abimol (paracetamol by MUP), Abramox (amoxicillin by Abram),
      Acapril (captopril/enalapril), Abrammune (azathioprine), Accord Long (metformin),
      Abelia, Abelcet (amphotericin B lipid complex)
- Egypt uses BOTH INN names and local brand names in prescriptions
- Concentration: Egyptian market commonly uses mg, mg/ml, % in labeling
"""


# ==============================================================================
# GROQ PROMPTS — EGYPT-AWARE (3 ROUNDS)
# ==============================================================================

GROQ_PROMPT_VALIDATE = _EGYPT_CONTEXT + """
---
You are a senior pharmaceutical expert specializing in the EGYPTIAN drug market.

INPUT FORMAT: "Brand Name [concentration]"
  - Dosage FORM (tabs, caps, syrup…) and pack COUNT already removed.
  - Concentration may remain (500mg, 100mg/ml, 0.5%…) — KEEP IT.

YOUR TASK:
1. Is the NAME a real BRAND/TRADE name registered in EGYPT or by a manufacturer selling in Egypt?
2. Generic/INN names (amoxicillin, paracetamol, aripiprazole…) -> flag as is_generic_name
3. Fix typos or capitalization -> correct ONLY the name, keep concentration unchanged
4. egypt_market: true if confirmed or very likely in Egyptian market; false if not found

RULES:
- KEEP concentration exactly as-is
- DO NOT add dosage form
- DO NOT change to generic name
- Consider Egyptian local brands (Abimol, Abramox, Acapril…) as valid trade names
- If unsure whether it's in Egypt specifically -> set egypt_market: false, confidence < 0.85

RESPOND WITH ONLY valid JSON — no markdown:
{
  "input": "the name you received",
  "is_tradename": true or false,
  "is_generic_name": true or false,
  "egypt_market": true or false,
  "corrected_tradename": "Brand Name [concentration]" or null,
  "correction_type": "none" | "capitalization" | "typo_fix" | "full_correction" | "not_a_brand",
  "correction_note": "what changed + egypt market note" or null,
  "confidence": 0.0 to 1.0
}
"""

GROQ_PROMPT_VERIFY = _EGYPT_CONTEXT + """
---
You are a pharmaceutical verification expert for the EGYPTIAN drug market.

A previous AI check gave LOW CONFIDENCE or a CORRECTION for this trade name.
Do a DEFINITIVE check focused on:
  1. Is this brand name ACTUALLY registered in Egypt (EDA database)?
  2. Is the spelling correct for the Egyptian market version?
  3. Some brands have different names in Egypt vs. globally — use the EGYPTIAN name.

RESPOND WITH ONLY valid JSON — no extra text:
{
  "input": "name being verified",
  "final_corrected_tradename": "correct Egyptian market brand name [concentration]" or null,
  "is_confirmed_tradename": true or false,
  "is_confirmed_generic": true or false,
  "egypt_market": true or false,
  "verification_note": "specific reasoning about Egyptian market presence",
  "final_confidence": 0.0 to 1.0
}
"""

GROQ_PROMPT_EGYPT_CONFIRM = _EGYPT_CONTEXT + """
---
You are an EGYPT DRUG MARKET specialist with deep knowledge of the EDA drug registry.

FINAL TASK: Give a definitive YES/NO answer — is this brand name present in the Egyptian market?

Check specifically:
- EDA (Egyptian Drug Authority) registered products
- Egyptian pharmacy formulary / price lists
- Products manufactured by Egyptian companies OR imported and registered in Egypt
- Include products that may be discontinued but were historically available

RESPOND WITH ONLY valid JSON — no extra text:
{
  "input": "the name",
  "egypt_market_confirmed": true or false,
  "final_corrected_tradename": "exact name as used in Egypt [concentration]" or null,
  "manufacturer_in_egypt": "company name or null",
  "generic_name": "INN/generic name of the active ingredient",
  "egypt_confidence": 0.0 to 1.0,
  "egypt_note": "specific evidence for Egyptian market presence or absence"
}
"""


# ==============================================================================
# KEY ROTATION & RATE LIMIT HANDLING
# ==============================================================================

_key_cooldown_until = {}


def _get_retry_after(resp) -> int:
    """Compute retry delay from response headers (bounded)."""
    try:
        val = resp.headers.get("retry-after") or resp.headers.get(
            "x-ratelimit-reset-requests", "60"
        )
        return min(int(float(val)), 120)
    except Exception:
        return 60


def rotate_key_if_needed():
    """Rotate API key index every ROTATE_EVERY processed trade names."""
    global _current_key_idx, _processed_count
    _processed_count += 1
    if _processed_count % ROTATE_EVERY == 0:
        old = _current_key_idx
        _current_key_idx = (_current_key_idx + 1) % len(GROQ_API_KEYS)
        log(
            f"Key rotation: key[{old}] -> key[{_current_key_idx}] "
            f"(after {_processed_count} trade names)"
        )


def _call_groq(system_prompt: str, user_message: str, max_retries: int = 2) -> dict | None:
    """Call Groq with key rotation + cooldown logic, returning parsed JSON dict or None."""
    global _current_key_idx

    payload = {
        "model": GROQ_MODEL,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ],
        "response_format": {"type": "json_object"},
        "temperature": 0.05,
        "max_tokens": 300,
    }

    n_keys = len(GROQ_API_KEYS)

    for attempt in range(n_keys * max_retries):
        key_idx = (_current_key_idx + attempt) % n_keys
        now = time.time()
        cooldown_left = _key_cooldown_until.get(key_idx, 0) - now

        if cooldown_left > 0:
            if attempt < n_keys - 1:
                continue

            min_wait = min(
                max(_key_cooldown_until.get(i, 0) - now, 0) for i in range(n_keys)
            )
            log(f"All keys are on cooldown. Waiting {min_wait:.0f}s", "WARN")
            time.sleep(min_wait + 1)
            _key_cooldown_until.clear()
            continue

        headers = {
            "Authorization": f"Bearer {GROQ_API_KEYS[key_idx]}",
            "Content-Type": "application/json",
        }

        try:
            resp = requests.post(GROQ_URL, headers=headers, json=payload, timeout=25)

            if resp.status_code == 200:
                raw = resp.json()["choices"][0]["message"]["content"]
                return json.loads(raw)

            if resp.status_code == 429:
                wait = _get_retry_after(resp)
                _key_cooldown_until[key_idx] = time.time() + wait
                log(f"HTTP 429 on key[{key_idx}]. Cooldown {wait}s", "WARN")
                continue

            if resp.status_code == 401:
                _key_cooldown_until[key_idx] = time.time() + 7200
                log(f"HTTP 401 on key[{key_idx}]. Disabling for 2 hours", "ERROR")
                continue

            log(f"Groq HTTP {resp.status_code}", "WARN")
            time.sleep(2)

        except json.JSONDecodeError:
            log(f"JSON parse error on key[{key_idx}]", "WARN")
        except requests.exceptions.Timeout:
            log(f"Timeout on key[{key_idx}]", "WARN")
            time.sleep(3)
        except Exception as e:
            log(f"Error on key[{key_idx}]: {e}", "WARN")
            time.sleep(2)

    return None


# ==============================================================================
# 3-ROUND VALIDATION & VERIFICATION
# ==============================================================================


def validate_and_verify(tradename: str, groq_delay: float = 0.3) -> dict:
    """
    Three Groq rounds (Egypt-aware):

    Round 1 (VALIDATE):
      - Is it a real trade name?
      - Is it present in the Egyptian market?
      - Spelling/capitalization correction

    Round 2 (VERIFY) triggers if:
      - confidence < threshold
      - correction applied
      - trade name but Egypt presence not confirmed

    Round 3 (EGYPT_CONFIRM) triggers if:
      - trade name still not confirmed for Egypt after round 2
    """
    log(f"  Round 1 — Validate + Egypt check: '{tradename}'")
    r1 = _call_groq(
        GROQ_PROMPT_VALIDATE,
        f'Validate this drug trade name for the Egyptian market: "{tradename}"',
    )
    time.sleep(groq_delay)

    if r1 is None:
        log("  Round 1 failed. Falling back", "WARN")
        return _fallback(tradename)

    conf1 = r1.get("confidence", 0.0)
    corrected1 = r1.get("corrected_tradename") or tradename
    is_brand = r1.get("is_tradename", False)
    is_generic = r1.get("is_generic_name", False)
    egypt1 = r1.get("egypt_market", False)
    corr_type = r1.get("correction_type", "none")

    log(
        f"  Round 1 -> '{corrected1}' | conf={conf1:.2f} | "
        f"egypt={'YES' if egypt1 else 'NO'} | type={corr_type}"
    )

    needs_round2 = (
        conf1 < CONFIDENCE_THRESHOLD
        or corr_type in ("typo_fix", "full_correction")
        or (is_brand and not egypt1)
        or (is_brand and not corrected1)
    )

    if not needs_round2:
        log(f"  Confirmed in 1 round. Egypt: {'YES' if egypt1 else 'NO'}")
        return _build(
            tradename,
            corrected1,
            is_brand,
            is_generic,
            conf1,
            r1.get("correction_note"),
            egypt1,
            None,
            True,
            1,
        )

    log(f"  Round 2 — Verify + Egypt: '{corrected1}' (conf={conf1:.2f})")
    r2 = _call_groq(
        GROQ_PROMPT_VERIFY,
        (
            "Verify this drug trade name for the Egyptian market:\n"
            f'Original input: "{tradename}"\n'
            f'Previous suggestion: "{corrected1}" (confidence {conf1:.2f})\n'
            f"Previous egypt_market: {egypt1}\n"
            "Give definitive answer."
        ),
    )
    time.sleep(groq_delay)

    if r2 is None:
        log("  Round 2 failed. Using Round 1 results", "WARN")
        return _build(
            tradename,
            corrected1,
            is_brand,
            is_generic,
            conf1,
            r1.get("correction_note"),
            egypt1,
            None,
            False,
            1,
        )

    corrected2 = r2.get("final_corrected_tradename") or corrected1
    conf2 = r2.get("final_confidence", conf1)
    egypt2 = r2.get("egypt_market", egypt1)
    is_brand2 = r2.get("is_confirmed_tradename", is_brand)
    is_gen2 = r2.get("is_confirmed_generic", is_generic)
    note2 = r2.get("verification_note") or r1.get("correction_note")

    log(f"  Round 2 -> '{corrected2}' | conf={conf2:.2f} | egypt={'YES' if egypt2 else 'NO'}")

    needs_round3 = is_brand2 and (not egypt2 or conf2 < CONFIDENCE_THRESHOLD)

    if not needs_round3:
        log(f"  Confirmed in 2 rounds. Egypt: {'YES' if egypt2 else 'NO'}")
        return _build(
            tradename,
            corrected2,
            is_brand2,
            is_gen2,
            conf2,
            note2,
            egypt2,
            None,
            True,
            2,
        )

    log(f"  Round 3 — Egypt Market Confirm: '{corrected2}'")
    r3 = _call_groq(
        GROQ_PROMPT_EGYPT_CONFIRM,
        f'Confirm Egyptian market presence for: "{corrected2}"',
    )
    time.sleep(groq_delay)

    if r3 is None:
        log("  Round 3 failed. Using Round 2 results", "WARN")
        return _build(
            tradename,
            corrected2,
            is_brand2,
            is_gen2,
            conf2,
            note2,
            egypt2,
            None,
            True,
            2,
        )

    egypt3 = r3.get("egypt_market_confirmed", egypt2)
    corrected3 = r3.get("final_corrected_tradename") or corrected2
    conf3 = r3.get("egypt_confidence", conf2)
    manufacturer = r3.get("manufacturer_in_egypt")
    generic_name = r3.get("generic_name")
    note3 = r3.get("egypt_note") or note2

    log(
        f"  Round 3 -> Egypt={'YES' if egypt3 else 'NO'} | "
        f"conf={conf3:.2f} | manufacturer={manufacturer or '-'}"
    )

    return _build(
        tradename,
        corrected3,
        is_brand2,
        is_gen2,
        conf3,
        note3,
        egypt3,
        manufacturer,
        True,
        3,
        generic_name=generic_name,
    )


def _build(
    original,
    corrected,
    is_brand,
    is_generic,
    confidence,
    note,
    egypt_market,
    egypt_manufacturer,
    verified,
    rounds,
    generic_name=None,
) -> dict:
    """Build the standardized validation output dict."""
    return {
        "original": original,
        "corrected": corrected if is_brand else None,
        "is_tradename": is_brand,
        "is_generic": is_generic,
        "confidence": round(confidence, 3),
        "note": note,
        "egypt_market": egypt_market,
        "egypt_manufacturer": egypt_manufacturer,
        "generic_name": generic_name,
        "verified": verified,
        "groq_rounds": rounds,
    }


def _fallback(tradename: str) -> dict:
    """Fallback when Groq is unavailable."""
    return {
        "original": tradename,
        "corrected": tradename,
        "is_tradename": True,
        "is_generic": False,
        "confidence": 0.0,
        "note": "Groq unavailable — kept as-is",
        "egypt_market": False,
        "egypt_manufacturer": None,
        "generic_name": None,
        "verified": False,
        "groq_rounds": 0,
    }


# ==============================================================================
# PROGRESS IO
# ==============================================================================


def load_progress(filepath: str | None = None) -> dict:
    """Load progress JSON if it exists; return empty dict otherwise."""
    path = filepath or PROGRESS_FILE
    if os.path.exists(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            pass
    return {}


def save_progress(progress: dict, filepath: str | None = None):
    """Save progress JSON."""
    path = filepath or PROGRESS_FILE
    with open(path, "w", encoding="utf-8") as f:
        json.dump(progress, f, ensure_ascii=False, indent=2)


# ==============================================================================
# CORE — APPEND-ONLY CSV LOGIC (CONFIRMED ROWS ONLY)
# ==============================================================================

NEW_COLS = [
    "tradename_cleaned",
    "tradename_corrected",
    "tradename_is_valid",
    "tradename_is_generic",
    "tradename_confidence",
    "tradename_correction_note",
    "tradename_egypt_market",
    "tradename_egypt_manufacturer",
    "tradename_generic_name",
    "tradename_verified",
    "tradename_groq_rounds",
]


def _build_confirmed_rows(
    df_original: pd.DataFrame,
    tradename_col: str,
    batch_validations: dict,
) -> pd.DataFrame | None:
    """
    Build a DataFrame of rows from df_original corresponding to this batch's
    confirmed trade names (is_tradename=True), preserving df_original order.

    batch_validations: { cleaned_tradename: validation_dict }
    """
    if not batch_validations:
        return None

    confirmed = {k: v for k, v in batch_validations.items() if v.get("is_tradename", False)}
    if not confirmed:
        return None

    mask = df_original["_tradename_clean"].isin(confirmed.keys())
    df_batch = df_original[mask].copy()

    if df_batch.empty:
        return None

    def _get(row, field, default=""):
        v = confirmed.get(row["_tradename_clean"], {})
        return v.get(field, default)

    df_batch["tradename_cleaned"] = df_batch["_tradename_clean"]
    df_batch["tradename_corrected"] = df_batch.apply(
        lambda r: confirmed.get(r["_tradename_clean"], {}).get("corrected") or r["_tradename_clean"],
        axis=1,
    )
    df_batch["tradename_is_valid"] = df_batch.apply(lambda r: _get(r, "is_tradename", True), axis=1)
    df_batch["tradename_is_generic"] = df_batch.apply(lambda r: _get(r, "is_generic", False), axis=1)
    df_batch["tradename_confidence"] = df_batch.apply(lambda r: _get(r, "confidence", 0.0), axis=1)
    df_batch["tradename_correction_note"] = df_batch.apply(lambda r: _get(r, "note", ""), axis=1)

    df_batch["tradename_egypt_market"] = df_batch.apply(
        lambda r: _get(r, "egypt_market", False), axis=1
    )
    df_batch["tradename_egypt_manufacturer"] = df_batch.apply(
        lambda r: _get(r, "egypt_manufacturer", ""), axis=1
    )
    df_batch["tradename_generic_name"] = df_batch.apply(
        lambda r: _get(r, "generic_name", ""), axis=1
    )

    df_batch["tradename_verified"] = df_batch.apply(lambda r: _get(r, "verified", False), axis=1)
    df_batch["tradename_groq_rounds"] = df_batch.apply(lambda r: _get(r, "groq_rounds", 0), axis=1)

    df_batch.drop(columns=["_tradename_clean"], inplace=True, errors="ignore")
    return df_batch


def append_confirmed_to_csv(
    df_original: pd.DataFrame,
    tradename_col: str,
    batch_validations: dict,
    is_first_write: bool,
) -> int:
    """
    Append confirmed rows from this batch to OUTPUT_CSV.

    - is_first_write=True  -> write with header (first time or after reset)
    - is_first_write=False -> append without header

    Returns number of appended rows.
    """
    df_batch = _build_confirmed_rows(df_original, tradename_col, batch_validations)
    if df_batch is None or df_batch.empty:
        return 0

    write_header = is_first_write or not os.path.exists(OUTPUT_CSV)

    df_batch.to_csv(
        OUTPUT_CSV,
        mode="w" if write_header else "a",
        header=write_header,
        index=False,
        encoding="utf-8-sig",
    )
    return len(df_batch)


# ==============================================================================
# ORDER GUARANTEE — ORDERED UNIQUE LIST BY FIRST APPEARANCE
# ==============================================================================


def get_ordered_unique(df: pd.DataFrame, clean_col: str = "_tradename_clean") -> list[str]:
    """
    Return a list of unique trade names preserving the order of first appearance
    in the DataFrame (NOT sorted alphabetically).
    """
    seen = set()
    result = []
    for val in df[clean_col].dropna():
        if val and len(val) > 1 and val not in seen:
            seen.add(val)
            result.append(val)
    return result


# ==============================================================================
# JSON EXPORT + SUMMARY
# ==============================================================================


def save_json(results: dict):
    """Write a compact validated JSON output (done entries only)."""
    json_out = {}
    for tn, data in results.items():
        if data.get("status") != "done":
            continue

        v = data.get("validation", {})
        json_out[tn] = {
            "original": tn,
            "corrected": v.get("corrected"),
            "is_tradename": v.get("is_tradename", False),
            "is_generic": v.get("is_generic", False),
            "confidence": v.get("confidence", 0.0),
            "correction_note": v.get("note"),
            "egypt_market": v.get("egypt_market", False),
            "egypt_manufacturer": v.get("egypt_manufacturer", None),
            "generic_name": v.get("generic_name", None),
            "verified": v.get("verified", False),
            "groq_rounds": v.get("groq_rounds", 0),
        }

    with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
        json.dump(json_out, f, ensure_ascii=False, indent=2)

    log(f"JSON saved: {OUTPUT_JSON} ({len(json_out):,} entries)")
    return json_out


def print_summary(results: dict):
    """Print a pipeline summary based on `results`."""
    done = [v for v in results.values() if v.get("status") == "done"]
    total = len(done)
    valid = sum(1 for v in done if (v.get("validation") or {}).get("is_tradename"))
    generics = sum(1 for v in done if (v.get("validation") or {}).get("is_generic"))
    corrected = sum(
        1
        for v in done
        if (v.get("validation") or {}).get("note")
        and (v.get("validation") or {}).get("is_tradename")
    )
    egypt_yes = sum(1 for v in done if (v.get("validation") or {}).get("egypt_market"))
    egypt_no = valid - egypt_yes
    rounds_3 = sum(1 for v in done if (v.get("validation") or {}).get("groq_rounds", 0) >= 3)
    rounds_2 = sum(1 for v in done if (v.get("validation") or {}).get("groq_rounds", 0) == 2)

    log("=" * 70)
    log("Pipeline Summary:")
    log(f"   Total processed                       : {total:,}")
    log(f"   Valid trade names                     : {valid:,}  (included in CSV)")
    log(f"      Confirmed in Egypt market          : {egypt_yes:,}")
    log(f"      Not confirmed in Egypt market      : {egypt_no:,}")
    log(f"   Spelling corrections applied          : {corrected:,}")
    log(f"   Verified via Round 3 (Egypt Confirm)  : {rounds_3:,}")
    log(f"   Verified via Round 2 (Verify)         : {rounds_2:,}")
    log(f"   Generic names flagged                 : {generics:,}  (not included in CSV)")
    log(f"   Unknown                               : {total - valid - generics:,}  (not included in CSV)")
    log(f"   CSV Output                            : {OUTPUT_CSV}")
    log("=" * 70)


# ==============================================================================
# MAIN PIPELINE
# ==============================================================================


def run_full_pipeline(
    input_csv: str | None = None,
    groq_delay: float = 0.3,
    save_every: int = 10,
    max_tradenames: int | None = None,
    start_from: int | None = None,
    end_at: int | None = None,
    worker_name: str | None = None,
):
    """
    Pipeline v5 — append-only CSV with confirmed rows only, preserving input order.

    Unique list order = first appearance order in df_original.
    If start_from/end_at are provided, each worker processes a range of that same list.
    """
    if input_csv is None:
        input_csv = INPUT_CSV

    log("=" * 65)
    tag = f" [{worker_name}]" if worker_name else ""
    log(f"TRADENAME PIPELINE v5{tag}")
    log(f"save_every={save_every} | append-only confirmed rows")
    log("=" * 65)

    log(f"Reading CSV: {input_csv}")
    df_original = pd.read_csv(input_csv)
    log(f"Rows: {len(df_original):,} | Columns: {len(df_original.columns):,}")

    tradename_col = None
    for c in [
        "Tradename",
        "tradename",
        "TradeName",
        "trade_name",
        "BrandName",
        "brandname",
        "brand_name",
        "DrugName",
    ]:
        if c in df_original.columns:
            tradename_col = c
            break

    if tradename_col is None:
        raise ValueError(
            "Tradename column not found.\n"
            f"Available columns: {list(df_original.columns)}"
        )

    log(f"Tradename column: '{tradename_col}'")

    df_original["_tradename_clean"] = df_original[tradename_col].apply(clean_tradename)

    ordered_unique = get_ordered_unique(df_original, "_tradename_clean")
    total_all = len(ordered_unique)

    log(f"Unique trade names (df_original order): {total_all:,}")
    log(f"First 5: {ordered_unique[:5]}")
    log(f"Last 5: {ordered_unique[-5:]}")

    if max_tradenames:
        work_list = ordered_unique[:max_tradenames]
        log(f"Testing mode: first {max_tradenames}")
    else:
        s = (start_from - 1) if start_from else 0
        e = end_at if end_at else total_all
        work_list = ordered_unique[s:e]
        log(f"Range: #{s + 1:,} -> #{e:,} ({len(work_list):,} unique names)")
        log(f"First 3 in range: {work_list[:3]}")

    if start_from and not max_tradenames:
        worker_progress = PROGRESS_FILE.replace(
            ".json", f"_part{start_from}_{end_at or total_all}.json"
        )
    else:
        worker_progress = PROGRESS_FILE

    log(f"Progress file: {worker_progress}")

    progress = load_progress(worker_progress)
    done_before = sum(1 for v in progress.values() if v.get("status") == "done")
    if done_before:
        log(f"Resuming: {done_before:,} unique names already completed")

    results = {k: v for k, v in progress.items() if v.get("status") == "done"}
    total = len(work_list)
    processed = 0

    is_first_write = (done_before == 0) or (not os.path.exists(OUTPUT_CSV))
    batch_validations = {}

    for idx, tradename in enumerate(work_list):
        if tradename in results:
            continue

        log(f"\n[{idx + 1}/{total}] '{tradename}'")

        validation = validate_and_verify(tradename, groq_delay=groq_delay)

        if validation["is_tradename"]:
            log(
                f"  BRAND -> '{validation['corrected']}' "
                f"conf={validation['confidence']:.2f} "
                f"rounds={validation['groq_rounds']}"
            )
        elif validation["is_generic"]:
            log("  GENERIC -> not appended to CSV")
        else:
            log(f"  UNKNOWN conf={validation['confidence']:.2f} -> not appended to CSV")

        results[tradename] = {"status": "done", "tradename": tradename, "validation": validation}

        if validation["is_tradename"]:
            batch_validations[tradename] = validation

        processed += 1
        rotate_key_if_needed()

        if processed % save_every == 0:
            save_progress(results, worker_progress)

            added = append_confirmed_to_csv(
                df_original,
                tradename_col,
                batch_validations,
                is_first_write,
            )

            log(
                f"Batch checkpoint: +{added:,} rows appended to CSV | "
                f"progress: {processed}/{total}"
            )

            if added > 0:
                is_first_write = False

            batch_validations = {}

    if batch_validations:
        save_progress(results, worker_progress)
        added = append_confirmed_to_csv(
            df_original,
            tradename_col,
            batch_validations,
            is_first_write,
        )
        log(f"\nFinal batch: +{added:,} rows appended to CSV")
        if added > 0:
            is_first_write = False
    else:
        save_progress(results, worker_progress)

    save_json(results)
    print_summary(results)

    df_original.drop(columns=["_tradename_clean"], inplace=True, errors="ignore")

    log(f"\nPipeline complete. New processed unique names: {processed}")
    log(f"CSV Output: {OUTPUT_CSV}")
    return df_original, results


# ==============================================================================
# TEST HELPERS
# ==============================================================================


def test_clean(samples: list[str] | None = None):
    """Test clean_tradename() without API calls."""
    if samples is None:
        samples = [
            ("1 2 3 (one Two Three) Syrup 120 Ml", ""),
            ("1 2 3 (one Two Three) 20 F.c.tabs.", ""),
            ("A1 Cream 100 Gm", "A1"),
            ("Abelcet", "Abelcet"),
            ("Aripiprazole", "Aripiprazole"),
            ("Abilify 15mg 30 F.c.tabs.", "Abilify 15mg"),
            ("Abilia 15mg 30 F.c.tabs.", "Abilia 15mg"),
            ("Abimol Extra 20", "Abimol Extra"),
            ("Abimol", "Abimol"),
            ("Abrammune 50 Mg 10 Caps.(n/a Yet)", "Abrammune 50mg"),
            ("Abramox 100mg/ml Syrup", "Abramox 100mg/ml"),
            ("Abramox 50mg/ml Syrup", "Abramox 50mg/ml"),
            ("Accord Long 600mg 5 Effervescent Tablets", "Accord Long 600mg"),
            ("Acetaminophen 125mg Paed. Supp.u.s.p.xxiii", "Acetaminophen 125mg"),
            ("Acetaminophen 500mg 20 Tab", "Acetaminophen 500mg"),
            ("Acc 200mg 20 Sachets", "Acc 200mg"),
            ("Acapril", "Acapril"),
            ("Augmentin 625mg 14 Tabs", "Augmentin 625mg"),
            ("Panadol 500mg 24 Tablets", "Panadol 500mg"),
            ("Lipitor 20mg 30 F.C.Tabs", "Lipitor 20mg"),
            ("Voltaren 1% Gel 100gm", "Voltaren 1%"),
            ("HCl 0.9% 100ml Solution", "Hcl 0.9%"),
            ("Amoxil 250mg/5ml Suspension", "Amoxil 250mg/5ml"),
            ("Zithromax 500mg 3 Caps", "Zithromax 500mg"),
        ]

    print("\n" + "=" * 75)
    print("clean_tradename() test | PASS=OK | FAIL=Mismatch")
    print(f"{'INPUT':<45} {'EXPECTED':<22} {'GOT':<22} STATUS")
    print("=" * 75)

    passed = failed = 0
    for raw, expected in samples:
        got = clean_tradename(raw)
        ok = got == expected
        status = "PASS" if ok else "FAIL"
        passed += int(ok)
        failed += int(not ok)
        print(f"  {status:<4}  {raw:<43} -> {got!r}")

    print(f"\nResult: {passed} pass | {failed} fail out of {passed + failed}")


def test_order(input_csv: str | None = None):
    """Print ordered unique list (first-appearance order) to verify ordering pre-run."""
    csv = input_csv or INPUT_CSV
    df = pd.read_csv(csv)
    col = next(
        (
            c
            for c in [
                "Tradename",
                "tradename",
                "TradeName",
                "trade_name",
                "BrandName",
                "brandname",
                "brand_name",
                "DrugName",
            ]
            if c in df.columns
        ),
        None,
    )
    if not col:
        print("Tradename column not found")
        return

    df["_tradename_clean"] = df[col].apply(clean_tradename)
    ordered = get_ordered_unique(df, "_tradename_clean")

    print(f"\nOrdered unique list ({len(ordered):,} items)")
    print(f"First 10: {ordered[:10]}")
    print(f"Last 10:  {ordered[-10:]}")

    print("\nSuggested split across 4 workers:")
    n = len(ordered)
    q = n // 4
    print(f"Worker 1: start_from=1,           end_at={q}")
    print(f"Worker 2: start_from={q + 1},     end_at={2 * q}")
    print(f"Worker 3: start_from={2 * q + 1}, end_at={3 * q}")
    print(f"Worker 4: start_from={3 * q + 1}, end_at={n}")
    return ordered


def test_single(tradename: str, delay: float = 0.5):
    """Run full validation on a single tradename."""
    print(f"\nTesting: '{tradename}'")
    cleaned = clean_tradename(tradename)
    print(f"Cleaned: '{cleaned}'")
    if not cleaned:
        print("Cleaned result is empty")
        return None
    result = validate_and_verify(cleaned, groq_delay=delay)
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return result


def test_sample(n: int = 5, delay: float = 1.0):
    """Run pipeline on first N unique names."""
    return run_full_pipeline(max_tradenames=n, groq_delay=delay, save_every=2)


def show_csv_stats():
    """Show basic stats for the current OUTPUT_CSV."""
    if not os.path.exists(OUTPUT_CSV):
        print("No saved CSV yet.")
        return

    df = pd.read_csv(OUTPUT_CSV)
    total = len(df)
    verified_2 = (
        (df.get("tradename_groq_rounds", pd.Series(dtype=int)) >= 2).sum()
        if "tradename_groq_rounds" in df.columns
        else 0
    )

    print(f"\nCSV Stats — {OUTPUT_CSV}")
    print(f"Total rows          : {total:,}")
    print(f"Verified >= 2 rounds: {verified_2:,}")
    print(f"Columns             : {list(df.columns)}")

    if "tradename_corrected" in df.columns:
        changed = (df["tradename_corrected"] != df.get("tradename_cleaned", "")).sum()
        print(f"Changed names       : {changed:,}")


# ==============================================================================
# MERGE — AFTER ALL WORKERS FINISH
# ==============================================================================


def merge_all_workers(input_csv: str | None = None):
    """
    After all workers finish:
      1) Merge all worker progress JSON files
      2) Build final OUTPUT_CSV in correct df_original order (single rebuild)
    """
    pattern = PROGRESS_FILE.replace(".json", "_part*.json")
    part_files = sorted(glob.glob(pattern))
    all_files = part_files + ([PROGRESS_FILE] if os.path.exists(PROGRESS_FILE) else [])

    if not all_files:
        log("No progress files found", "ERROR")
        return

    log(f"Merging {len(all_files)} progress files")
    merged = {}

    for path in all_files:
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
            before = len(merged)
            merged.update(data)
            log(f"Loaded {os.path.basename(path)}: {len(data):,} (+{len(merged) - before:,})")
        except Exception as e:
            log(f"Failed to load {path}: {e}", "ERROR")

    save_progress(merged)
    log(f"Total after merge: {len(merged):,}")

    csv_path = input_csv or INPUT_CSV
    df = pd.read_csv(csv_path)

    col = next(
        (
            c
            for c in [
                "Tradename",
                "tradename",
                "TradeName",
                "trade_name",
                "BrandName",
                "brandname",
                "brand_name",
                "DrugName",
            ]
            if c in df.columns
        ),
        None,
    )
    if not col:
        log("Tradename column not found", "ERROR")
        return

    df["_tradename_clean"] = df[col].apply(clean_tradename)

    all_confirmed = {
        tn: data["validation"]
        for tn, data in merged.items()
        if data.get("status") == "done" and (data.get("validation") or {}).get("is_tradename", False)
    }

    log(f"Confirmed trade names: {len(all_confirmed):,}")

    added = append_confirmed_to_csv(df, col, all_confirmed, is_first_write=True)
    log(f"Final CSV rebuilt: {OUTPUT_CSV} ({added:,} rows)")

    save_json(merged)
    print_summary(merged)
    log("Merge complete")
    return merged


# ==============================================================================
# ENTRY POINT
# ==============================================================================

if __name__ == "__main__":
    run_full_pipeline(
        start_from=2000,
        end_at=3999,
        worker_name="User",
        save_every=10,
        groq_delay=0.3,
    )