In [2]:
from typing import List, Dict, Any, Optional
import json, re
from decimal import Decimal, InvalidOperation
from pprint import pprint as pp


In [67]:

def is_blank_or_na(v: Any) -> bool:
    if not isinstance(v, str):
        return False
    v = v.strip().lower()
    return v in {"", "na", "na.", "n/a", "n/a.", "n.a", "n.a."}


def iter_dict_rows(records):
    """Yield dict rows even if they live inside nested lists."""
    for r in records:
        if isinstance(r, dict):
            yield r
        elif isinstance(r, list):
            yield from iter_dict_rows(r)
        # ignore anything else


def parse_quantity(s: Any) -> Optional[Decimal]:
    if not isinstance(s, str) or is_blank_or_na(s) or 'line' in s:
        return None

    m = re.search(r'(\d[\d,]*\.?\d*)\s*Kg', s, re.I)
    if m:
        try:
            return Decimal(m.group(1).replace(',', ''))
        except InvalidOperation:
            pass

    m = re.search(r'(\d[\d,]*\.?\d*)', s.strip())
    if not m:
        return None
    try:
        return Decimal(m.group(1).replace(',', ''))
    except InvalidOperation:
        return None

def find_col(records: List[Dict[str, Any]], syns: List[str]) -> Optional[str]:
    for s in syns:
        if any(isinstance(r, dict) and s in r for r in records):
            return s
    return None


TOL = Decimal("0.001")

def extract_trolley_spec(records):
    bags = trays = min_load = None

    for row in records:
        if "Number of bags per tray" in row:
            if not is_blank_or_na(row["Number of bags per tray"]):
                bags = parse_quantity(row["Number of bags per tray"])

        if "No of trays per trolley" in row:
            if not is_blank_or_na(row["No of trays per trolley"]):
                trays = parse_quantity(row["No of trays per trolley"])

        if "Minimum load (One trolley)" in row:
            if not is_blank_or_na(row["Minimum load (One trolley)"]):
                min_load = parse_quantity(row["Minimum load (One trolley)"])

    if bags is None or trays is None or min_load is None:
        return None

    try:
        max_load = bags * trays
    except Exception:
        return None

    return {
        "min_load": min_load,
        "max_load": max_load
    }

def trolley_load_anomalies_for_page(records, spec):
    min_load = spec["min_load"]
    max_load = spec["max_load"]

    anomalies = []
    qty_keys = ["Quantity Nos. Bags", "Quantity Nos. Bag", "Quantity Bags"]

    def get_qty(row):
        for k in qty_keys:
            if k in row:
                return parse_quantity(row.get(k))
        return None

    for row in records:
        if "Trolley" not in row:
            continue

        trolley_name = str(row.get("Trolley", "")).strip()
        if trolley_name.lower() == "bags for sensor placement":
            continue

        qty = get_qty(row)
        if qty is None:
            continue

        if qty < min_load or qty > max_load:
            anomalies.append({
                "parameter": f"Trolley Load — {trolley_name}",
                "observed_value": float(qty),
                "standard_range": f"{float(min_load)} to {float(max_load)} Bags"
            })

    return anomalies

def anomalies_for_page(records, req_syns, obs_syns, trolley_spec=None):

    flat = list(iter_dict_rows(records))
    if not flat:
        return []

    req_col = find_col(flat, req_syns)
    obs_col = find_col(flat, obs_syns)

    # If no standard/observed columns, do only trolley check
    if not req_col or not obs_col:
        return trolley_spec and trolley_load_anomalies_for_page(flat, trolley_spec) or []

    out = []

    for row in flat:
        std = parse_quantity(row.get(req_col))
        obs = parse_quantity(row.get(obs_col))
        if std is None or obs is None:
            continue

        item = str(row.get("Item", ""))

        # Rule 1: Twist-off port
        if "twist-off port" in item.lower():
            if std != obs:
                out.append({
                    "parameter": f"Quantity Dispensed — {row.get('Item','N/A')}",
                    "observed_value": int(obs),
                    "standard_range": f"== {int(std)} Nos."
                })
            continue

        # Rule 2: Material
        if "Material" in row:
            if std != obs:
                out.append({
                    "parameter": f"Issued Qty — {row.get('Material','N/A')}",
                    "observed_value": float(obs),
                    "standard_range": f"== {float(std)}"
                })
            continue

        # Rule 3: Ingredients
        if "Ingredients" in row:
            if abs(std - obs) > TOL:
                out.append({
                    "parameter": f"Quantity Dispensed — {row.get('Ingredients','N/A')}",
                    "observed_value": float(obs),
                    "standard_range": f"== {float(std)} (±{float(TOL)})"
                })
            continue

        # Fallback
        if std != obs:
            out.append({
                "parameter": f"Quantity Dispensed — "
                            f"{row.get('Item') or row.get('Material') or row.get('Ingredients') or 'N/A'}",
                "observed_value": float(obs),
                "standard_range": f"== {float(std)}"
            })

    # Add trolley anomalies
    if trolley_spec:
        out.extend(trolley_load_anomalies_for_page(flat, trolley_spec))

    return out

def quant_variance_check(bmr_data: Dict[str, Any], keys: List[List[str]]) -> List[Dict[str, Any]]:

    req_syns, obs_syns = keys[0], keys[1]

    trolley_spec = None
    trolley_spec_next_page_allowed = False
    results = []
    for page_no, page in bmr_data.items():
        # print(f'---------------{page_no}-------------------')
        records = page.get("records", [])
        flat = list(iter_dict_rows(records))
        # print('------------------------')
    
        # ---- Extract trolley spec for this page ----
        new_spec = extract_trolley_spec(flat)
        # print(new_spec)
    
        if new_spec:
            trolley_spec = new_spec
            trolley_spec_next_page_allowed = True
        else:
            if trolley_spec_next_page_allowed:
                trolley_spec_next_page_allowed = False
            else:
                trolley_spec = None

        # ---- Run anomaly checks ----
        anoms = anomalies_for_page(flat, req_syns, obs_syns, trolley_spec)
    
        has = 1 if anoms else 0

        results.append({
            "page_no": page_no,
            "section_name": "Process Parameters",
            "check_name": "Quantity Variance Check",
            "anomaly_status": has
            })
    return results


In [65]:
if __name__ == "__main__":
    # filepath = '../JSON/BMR_61_FLATTEN.json'
    filepath = '../JSON/test.json'

    with open(filepath, "r", encoding="utf-8") as f:
        data = json.load(f)
    k1 = ['Qty. Required', 'Quantity Req.', 'Quantity Required']
    k2 = ['Dispensed', 'Issued Qty', 'Quantity Dispensed']
    results = quant_variance_check(data, [k1, k2])
    

In [66]:
pp(results)

[{'anomaly_status': 1,
  'check_name': 'Quantity Variance Check',
  'page_no': '31',
  'section_name': 'Process Parameters'},
 {'anomaly_status': 1,
  'check_name': 'Quantity Variance Check',
  'page_no': '114',
  'section_name': 'Process Parameters'},
 {'anomaly_status': 1,
  'check_name': 'Quantity Variance Check',
  'page_no': '115',
  'section_name': 'Process Parameters'}]


In [34]:
pp(data)

{'113': {'BATCH NO.': 'AH250061',
         'BATCH SIZE': '4000 L/ 3960 kg/7606 Units',
         'BMR NO.': 'BMR-PA-048-10',
         'NAME OF PRODUCT': 'Multiple Electrolytes Injection Type 1 USP',
         'PAGE NO.': 'Page 111 of 145',
         'additional_copy': False,
         'balance_id': '',
         'batch_number': '',
         'date': '',
         'end_time': '',
         'error': [],
         'markdown_page': "<a id='bff3082c-2120-48ab-9a7f-23f927aa0b4e'></a> "
                          '<::attestation: Stamp Status: Master Copy Readable '
                          'Text: MASTER COPY Short description of visual '
                          'elements and positioning: A red rectangular stamp '
                          'with bold text, slightly tilted, centered on the '
                          'image.::> <a '
                          "id='ad1a52fc-2fc4-4e2f-95bc-67cdbe7c1d71'></a> "
                          '<table id="0-1"> <tr><td id="0-2">amneal '
                        