In [1]:
import gc
import os
import re
import hashlib
import warnings
import pandas as pd
import numpy as np
from datetime import datetime
from sqlalchemy import create_engine, text
import urllib
import pyodbc
import json
import math
import fitz
import re
import pymupdf4llm
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
 

Consider using the pymupdf_layout package for a greatly improved page layout analysis.


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
driver = "ODBC Driver 17 for SQL Server"
server = "10.10.1.103"
database = "TrainingDB"
username = "EnterpriseApprovalUser"
password = "gcp@2017"
table_name = "ecat_filtered_data"

THRESHOLD_CSV = "Parameters_Test_Mapping.csv"
RUNCAT_XLSX = "Running_Catalysts.xlsx" 
REPORT_OUTPUT_DIR = r"REPORT_PDF"

BASE_COLS = [
    "SAMPLE_NUMBER", "DATE_RECEIVED", "DATE_TAKEN",
    "SUBMITTER_ID", "LAB_CODE", "SAMPLE_TYPE"
]
 

In [3]:
odbc_str = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};"
quoted = urllib.parse.quote_plus(odbc_str)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={quoted}", fast_executemany=True)

In [4]:
def safe_str(x):
    return "" if pd.isna(x) else str(x).strip()

def try_float(x):
    try:
        return float(x)
    except Exception:
        return None

def normalize_name(name):
    if pd.isna(name):
        return ""
    s = str(name).strip().upper()
    return re.sub(r"\s+", "", s)

def clean_columns(df):
    df.columns = (
        df.columns.str.strip()
                  .str.replace(" ", "_")
                  .str.replace("-", "_")
                  .str.upper()
    )
    return df

def to_iso_string(x):
    try:
        dt = pd.to_datetime(x)
        return dt.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(dt) else ""
    except Exception:
        return str(x) if x else ""


In [16]:

def get_engine(driver, server, database, username, password):
    odbc_str = (
        f"DRIVER={driver};SERVER={server};DATABASE={database};"
        f"UID={username};PWD={password};"
    )
    quoted = urllib.parse.quote_plus(odbc_str)
    return create_engine(
        f"mssql+pyodbc:///?odbc_connect={quoted}",
        fast_executemany=True
    )

def get_existing_report_samples(output_dir):
    """
    Reads output folder and extracts sample numbers
    from existing PDF reports
    """
    output_dir = os.path.abspath(output_dir)

    if not os.path.isdir(output_dir):
        return set()

    existing_samples = set()

    for f in os.listdir(output_dir):
        if f.lower().endswith(".pdf"):
            # Extract numeric sample number from filename
            match = re.search(r"(\d+)", f)
            if match:
                existing_samples.add(match.group(1))

    print(f"Found {len(existing_samples)} existing reports — will SKIP these samples.")
    return existing_samples

def get_non_buying_units(runcat_path):
    try:
        runcat_df = pd.read_excel(runcat_path)
        runcat_df.columns = runcat_df.columns.str.strip().str.upper()

        required_cols = {"RUNCAT_UNIT_ID", "RUNCAT_ON_DATE", "RUNCAT_END_DATE"}
        if not required_cols.issubset(runcat_df.columns):
            return set()

        runcat_df["RUNCAT_ON_DATE"] = pd.to_datetime(
            runcat_df["RUNCAT_ON_DATE"], errors="coerce"
        )

        latest = (
            runcat_df.sort_values("RUNCAT_ON_DATE", ascending=False)
            .groupby("RUNCAT_UNIT_ID", as_index=False)
            .first()
        )

        non_buying = set(
            latest[latest["RUNCAT_END_DATE"].notna()]["RUNCAT_UNIT_ID"].astype(str)
        )

        print(f"Found {len(non_buying)} non-buying units — will KEEP these units.")
        return non_buying

    except Exception as e:
        print(f"Failed to read Runcat file: {e}")
        return set()


def load_table_to_df(
    driver,
    server,
    database,
    username,
    password,
    table_name,
    runcat_path=None
):
   
    engine = get_engine(driver, server, database, username, password)
    print(f"Reading table: {table_name}")

    existing_samples = get_existing_report_samples(REPORT_OUTPUT_DIR)

    cols = (
        "SAMPLE_NUMBER, UNIT_ID, TEST_NAME, RESULT, "
        "DATE_RECEIVED, DATE_TAKEN, LAB_CODE, SAMPLE_TYPE, SUBMITTER_ID"
    )

    q = f"""
        SELECT {cols}
        FROM {table_name}
    """

    chunks = []
    non_buying_units = get_non_buying_units(runcat_path) if runcat_path else set()

    with engine.connect() as conn:
        for i, chunk in enumerate(
            pd.read_sql_query(
                text(q),
                conn,
                chunksize=400000
            )
        ):
            print(f"Loaded chunk {i+1} ({len(chunk)} rows)")

            # Skip samples with existing PDFs
            chunk["SAMPLE_NUMBER"] = chunk["SAMPLE_NUMBER"].astype(str)
            chunk = chunk[
                ~chunk["SAMPLE_NUMBER"].isin(existing_samples)
            ]

            if chunk.empty:
                print("All samples in this chunk already processed — skipping.")
                continue

            # Optional unit filter (disabled per business rule)
            # if non_buying_units:
            #     chunk = chunk[chunk["UNIT_ID"].astype(str).isin(non_buying_units)]

            chunks.append(chunk)
            gc.collect()

    if not chunks:
        print("No new samples found to process.")
        return pd.DataFrame()

    df = pd.concat(chunks, ignore_index=True)

    print(f"Final dataset contains {len(df)} new rows for report generation.")
    return df


In [17]:
def load_thresholds(threshold_csv):
    th_df = pd.read_csv(threshold_csv)
    th_df.columns = [c.strip() for c in th_df.columns]
    if "Test_Name" not in th_df.columns and "Test Name" in th_df.columns:
        th_df = th_df.rename(columns={"Test Name": "Test_Name"})
    th_df["Test_Key"] = th_df["Test_Name"].apply(normalize_name)
    thresholds = {}
    for _, row in th_df.iterrows():
        try:
            thresholds[row["Test_Key"]] = (float(row["Deviation_UL"]), float(row["Deviation_LL"]))
        except Exception:
            continue
    
    return thresholds

# Statistics
def compute_test_stats_past_year(raw_df, past_days=365):
    raw = clean_columns(raw_df.copy())
    raw["DATE_RECEIVED"] = pd.to_datetime(raw["DATE_RECEIVED"], errors="coerce")
    cutoff = pd.Timestamp.now() - pd.Timedelta(days=past_days)
    recent = raw[raw["DATE_RECEIVED"] >= cutoff]
    recent["RESULT_NUM"] = pd.to_numeric(recent["RESULT"], errors="coerce")
    recent["TEST_KEY"] = recent["TEST_NAME"].apply(normalize_name)
    return recent.groupby("TEST_KEY").agg(
        max_value=("RESULT_NUM", "max"),
        min_value=("RESULT_NUM", "min")
    ).to_dict(orient="index")

# Pivot 
def pivot_and_compute_diffs(df, base_cols, threshold_csv):
    
    df = clean_columns(df)
    BASE_COLS = base_cols.copy()

    for col in BASE_COLS:
        if col not in df.columns:
            df[col] = pd.NA

    if "UNIT_ID" not in df.columns:
        raise RuntimeError("UNIT_ID column missing in source table.")
    if "TEST_NAME" not in df.columns or "RESULT" not in df.columns:
        raise RuntimeError("TEST_NAME and RESULT columns are required.")

    df["DATE_RECEIVED"] = pd.to_datetime(df["DATE_RECEIVED"], errors="coerce")
    df["RESULT"] = df["RESULT"].replace("", np.nan)

    # Added Filter: Keep only TEST_NAMEs that exist in threshold CSV 
    try:
        threshold_df = pd.read_csv(threshold_csv)
        threshold_df.columns = [c.strip() for c in threshold_df.columns]
        if "Test_Name" in threshold_df.columns:
            valid_tests = set(threshold_df["Test_Name"].astype(str).str.strip().str.upper())
            df = df[df["TEST_NAME"].astype(str).str.strip().str.upper().isin(valid_tests)]
    except Exception as e:
        print(f" Warning: Failed to read threshold CSV for filtering: {e}")
    

    pivoted_per_unit = {}
    for unit, group in df.groupby("UNIT_ID"):
        try:
            pivot_df = group.pivot_table(
                index=BASE_COLS, columns="TEST_NAME", values="RESULT",
                aggfunc="first", sort=False
            ).reset_index()
            pivot_df.columns.name = None
            pivot_df["UNIT_ID"] = unit
            pivot_df["DATE_RECEIVED"] = pd.to_datetime(pivot_df["DATE_RECEIVED"], errors="coerce")
            pivot_df = pivot_df.sort_values("DATE_RECEIVED", ascending=False)

            test_cols = [c for c in pivot_df.columns if c not in BASE_COLS + ["UNIT_ID"]]
            numeric = pivot_df[test_cols].apply(pd.to_numeric, errors='coerce')
            diff_df = pd.DataFrame(index=pivot_df.index, columns=test_cols, dtype="float64")

            if "SAMPLE_TYPE" in pivot_df.columns:
                sample_types = pivot_df["SAMPLE_TYPE"].fillna("").unique()
            else:
                sample_types = [""]

            for st in sample_types:
                idx = pivot_df[pivot_df["SAMPLE_TYPE"].fillna("") == (st if st is not None else "")].index
                if len(idx) <= 1:
                    continue
                sub_numeric = numeric.loc[idx]
                sub_diff = sub_numeric.diff(periods=-1)
                diff_df.loc[idx, :] = sub_diff.values

            diff_df.columns = [f"{c}_diff" for c in diff_df.columns]
            combined = pd.concat([pivot_df.reset_index(drop=True), diff_df.reset_index(drop=True)], axis=1)
            pivoted_per_unit[str(unit)] = combined
        except Exception as e:
            print(f" Error pivoting unit {unit}: {e}")
    
    return pivoted_per_unit

# Runcat 
def load_runcat_data(excel_path):
    runcat = pd.read_excel(excel_path)
    runcat = clean_columns(runcat)
    for col in ["RUNCAT_ON_DATE", "RUNCAT_END_DATE"]:
        if col in runcat.columns:
            runcat[col] = pd.to_datetime(runcat[col], errors="coerce")
    
    return runcat


In [18]:
import pandas as pd
import math

PARAMETER_MAPPING_CSV = r"C:/Users/Machine/Documents/POC_ECAT/ECAT_LANGGRAPH/Parameters_Test_Mapping.csv"


def normalize_key(s):
    """
    Normalizes keys for safe matching
    """
    return (
        str(s)
        .strip()
        .lower()
        .replace(" ", "")
        .replace("_", "")
    )


def load_parameter_mapping(csv_path):
    """
    Builds lookup:
    normalized_test_name -> {display_name, unit}
    """
    df = pd.read_csv(csv_path)

    df["Test_Name_norm"] = df["Test_Name"].apply(normalize_key)

    return {
        row["Test_Name_norm"]: {
            "display_name": row["Parameter"],
            "unit": row["Unit"]
        }
        for _, row in df.iterrows()
    }


PARAMETER_MAP = load_parameter_mapping(PARAMETER_MAPPING_CSV)


In [20]:


SAMPLE_TYPE_MAP = {
    "E": "Ecat",
    "F": "Fines",
    "H": "Hopper",
    "M": "Misc.",
    "N": "Non-routine",
    "P": "Purchased",
    "S": "Spent",
    "K": "FeedStock",
    "B": "Scrubber Fines Sample",
    "Y": "Slurry"
}


def is_nan_value(val):

    if val is None:
        return True

    # float NaN
    if isinstance(val, float) and math.isnan(val):
        return True

    # string NaN
    if isinstance(val, str) and val.strip().lower() == "nan":
        return True

    if isinstance(val, dict):
        if "$numberDouble" in val and str(val["$numberDouble"]).lower() == "nan":
            return True

    return False

def resolve_parameter_name_and_unit(test_param):
    """
    Given Test_Parameter (e.g. AL2O3),
    returns (display_name_from_csv, unit)
    """
    if not test_param:
        return test_param, "N/A"

    key = normalize_key(test_param)

    meta = PARAMETER_MAP.get(key)
    if not meta:
        # fallback: keep original name
        return test_param, "N/A"

    return meta["display_name"], meta["unit"]

def resolve_sample_type(row, SAMPLE_TYPE_MAP):
    raw_value = row.get("SAMPLE_TYPE")

    if raw_value is None:
        return "N/A"

    raw_str = str(raw_value).strip().upper()

    if not raw_str or raw_str == "NAN":
        return "N/A"

    return SAMPLE_TYPE_MAP.get(raw_str, "N/A")


def generate_fused_summary(sample_obj):
    """
    Convert ECAT sample object directly into a fused natural-language summary.
    """

    unit_name = sample_obj.get("Unit_Name", "N/A")
    sample_no = sample_obj.get("Sample_Number", "N/A")
    sample_type = sample_obj.get("Sample_Type", "N/A")
    date_taken = sample_obj.get("Date_Taken", "N/A")
    date_received = sample_obj.get("Date_Received", "N/A")
    lab_code = sample_obj.get("Lab_Code", "N/A")
    unit_id = sample_obj.get("Unit_Id", "N/A")

    header = (
        f"Sample {sample_no} Sample Type {sample_type} was taken on {date_taken} "
        f"and received on {date_received}. It belongs to unit {unit_name} "
        f"(Unit ID: {unit_id}) and was analyzed under lab code '{lab_code}'.\n"
    )

    parameter_lines = []
    anomalies = []

    for result in sample_obj.get("Results", []):
        raw_param = result.get("Test_Parameter", "").strip()
        value = result.get("Captured_Value")

        if is_nan_value(value):
            continue

        deviation = result.get("Deviation", "N/A")
        y_min = result.get("Y_Min", "N/A")
        y_max = result.get("Y_Max", "N/A")
        within = result.get("isWithinLimit", True)
        last_vals = result.get("lastSampleValues", [])

        formatted_last_vals = (
            ", ".join(str(v) for v in last_vals) if last_vals else "No data"
        )
       
        display_param, unit = resolve_parameter_name_and_unit(raw_param)

        line = (
            f"- Parameter {display_param} ({unit}): captured value {value}, deviation {deviation}, "
            f"isWithinLimit {within}. Y_Min: {y_min}, Y_Max: {y_max}. "
            f"Last 5 sample values: {formatted_last_vals}."
        )

        parameter_lines.append(line)

        if not within:
            anomalies.append(display_param)

    if anomalies:
        anomaly_section = (
            "\nAnomalous parameters (outside acceptable limits): " + ", ".join(anomalies)+ ".\n")
    else:
        anomaly_section = "\nAll parameters are within acceptable limits.\n"

    sample_summary_text = (
        " SAMPLE INFORMATION\n"
        + header
        + "\n PARAMETER DETAILS\n"
        + "\n".join(parameter_lines)
        + "\n\n ANOMALY OVERVIEW\n"
        + anomaly_section
    )

    return sample_summary_text


def build_and_generate_fused_summaries(
    pivoted_per_unit, thresholds, stats_map, runcat_df, BASE_COLS
):
    
    results = []

    def get_unit_metadata(unit_id):
        sub = runcat_df[runcat_df["RUNCAT_UNIT_ID"].astype(str) == str(unit_id)]
        if sub.empty:
            return "", False
        latest = sub.sort_values("RUNCAT_ON_DATE", ascending=False).iloc[0]
        return safe_str(latest.get("RUNCAT_UNIT_NAME")), bool(not latest.get("RUNCAT_END_DATE"))

    for unit, pivot_df in pivoted_per_unit.items():
        unit_name, is_non_buying = get_unit_metadata(unit)

        test_cols = [
            c for c in pivot_df.columns
            if c not in BASE_COLS + ["UNIT_ID"] and not c.endswith("_diff")
        ]

        for idx, row in pivot_df.iterrows():
            sample_no = safe_str(row.get("SAMPLE_NUMBER"))
            if not sample_no:
                continue

            sample_type = resolve_sample_type(row, SAMPLE_TYPE_MAP)

            sample_obj = {
                "Unit_Id": str(unit),
                "Sample_Number": sample_no,
                "Date_Received": to_iso_string(row.get("DATE_RECEIVED")),
                "Date_Taken": to_iso_string(row.get("DATE_TAKEN")),
                "Submitter_Id": safe_str(row.get("SUBMITTER_ID")),
                "Lab_Code": safe_str(row.get("LAB_CODE")),
                "Sample_Type": sample_type,
                "Unit_Name": unit_name,
                "isNonBuying": is_non_buying,
                "Results": []
            }

            for test in test_cols:
                raw_val = row.get(test)
                if is_nan_value(raw_val):
                    continue

                val = try_float(raw_val)
                if val is None:
                    continue

                diff_val = try_float(row.get(f"{test}_diff"))
                tkey = normalize_name(test)
                ul_ll = thresholds.get(tkey)
                deviation = diff_val if diff_val is not None else None

                stat = stats_map.get(tkey, {})
                y_max = stat.get("max_value")
                y_min = stat.get("min_value")

                within_y = y_min is not None and y_max is not None and y_min < val < y_max
                within_t = ul_ll is not None and diff_val is not None and ul_ll[1] <= diff_val <= ul_ll[0]
                is_within = within_y and within_t

                current_idx = pivot_df.index.get_loc(idx)
                prev_rows = pivot_df.iloc[current_idx + 1: current_idx + 6]

                last_values = []
                if test in prev_rows.columns:
                    for _, prow in prev_rows.iterrows():
                        pv_raw = prow.get(test)
                        if is_nan_value(pv_raw):
                            continue
                        pv = try_float(pv_raw)
                        if pv is not None:
                            last_values.append(pv)

                sample_obj["Results"].append({
                    "Test_Parameter": test,

                    "Captured_Value": val,
                    "Deviation": deviation,
                    "Y_Max": y_max,
                    "Y_Min": y_min,
                    "isWithinLimit": is_within,
                    "lastSampleValues": last_values
                })

            fused_summary = generate_fused_summary(sample_obj)

            results.append({
                "fused_summary": fused_summary,                
            })

    return results



In [21]:
def main():
    try:        
        df_raw = load_table_to_df(driver,server,database,username,password,table_name,runcat_path=RUNCAT_XLSX)
        if df_raw.empty:
            print(" No valid data to process.")
            return
              
        stats_map = compute_test_stats_past_year(df_raw)        
        
        thresholds = load_thresholds(THRESHOLD_CSV)
        
        pivoted = pivot_and_compute_diffs(df_raw,BASE_COLS,THRESHOLD_CSV)
        
        runcat_df = load_runcat_data(RUNCAT_XLSX)
               
        pipeline_results = build_and_generate_fused_summaries(pivoted_per_unit=pivoted,thresholds=thresholds,stats_map=stats_map,runcat_df=runcat_df,BASE_COLS=BASE_COLS)

        return pipeline_results
    except Exception as e:
        print(" PIPELINE FAILED")
        print(" Error:", e)
        raise

main()

Reading table: ecat_filtered_data
Found 1 existing reports — will SKIP these samples.
Found 489 non-buying units — will KEEP these units.
Loaded chunk 1 (11919 rows)
Final dataset contains 11863 new rows for report generation.


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  recent["RESULT_NUM"] = pd.to_numeric(recent["RESULT"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  recent["TEST_KEY"] = recent["TEST_NAME"].apply(normalize_name)


[{'fused_summary': " SAMPLE INFORMATION\nSample 41011461 Sample Type Fines was taken on 2024-10-28 00:00:00 and received on 2024-10-31 00:00:00. It belongs to unit CITGO - CORPUS CHRISTI (NEW), TX, US (Unit ID: 313) and was analyzed under lab code 'C'.\n\n PARAMETER DETAILS\n- Parameter Al₂O₃ (wt%): captured value 48.73351, deviation -0.9930499999999967, isWithinLimit False. Y_Min: 32.76446, Y_Max: 44.50726. Last 5 sample values: 52.40013, 49.72656, 52.79158, 49.82718, 53.32167.\n- Parameter CaO (wt%): captured value 0.24141, deviation 0.02829000000000001, isWithinLimit False. Y_Min: 0.03, Y_Max: 0.17. Last 5 sample values: 0.0915, 0.21312, 0.09904, 0.1923199999999999, 0.1000999999999999.\n- Parameter Co (ppm): captured value 21.41198, deviation -1.9723299999999995, isWithinLimit False. Y_Min: 27.76159, Y_Max: 50.82113. Last 5 sample values: 21.34817, 23.38431, 23.57444, 25.80823, 25.90799.\n- Parameter Cu (ppm): captured value 21.84488, deviation -1.7517600000000009, isWithinLimit Tru