In [None]:
# extract_excel_summary.py
# Phase 2 - Step 4: Extract summary data from Excel-like calc files into JSON.
# - Scans a root folder for .xlsx/.xlsm/.xls
# - Finds "summary-like" tables (Actual/Allowable/RF + Component + Criterion + Output Set)
# - Outputs: outputs/extracted/excel_summary.json + excel_summary_log.csv

import os
import re
import sys
import json
from pathlib import Path
from typing import Dict, List, Tuple, Any, Optional

import pandas as pd
from openpyxl import load_workbook

SHEET_NAME_HINTS = [
    "summary", "output", "results", "rf", "interaction",
    "bearing", "buckling", "joint", "panel"
]
SUPPORTED_EXT = {".xlsx", ".xlsm", ".xls"}

KEYWORDS = {
    "component": [
        "component", "panel", "panel id", "joint", "location", "item",
        "part", "bay", "rib", "stringer", "upper panel", "lower panel"
    ],
    "criterion": [
        "criterion", "criteria", "check", "mode", "failure", "type",
        "buckling", "bearing", "net tension", "net-tension",
        "shear-out", "crippling", "irb", "inter-rivet", "tsai-wu",
        "interaction"
    ],
    "actual": [
        "actual", "applied", "demand", "calc", "computed", "value",
        "load", "stress", "flux"
    ],
    "allowable": [
        "allowable", "limit", "capacity", "capability", "allow",
        "ultimate", "ult", "allowable flux", "allowable stress"
    ],
    "rf": [
        "rf", "reserve factor", "mos", "margin"
    ],
    "output_set": [
        "output set id", "output set", "load case", "case", "pdf case id", "osid"
    ],
}

def normalize(s: Any) -> str:
    return re.sub(r"\s+", " ", str(s).strip().lower()) if s is not None else ""

def matches_any(cell_text: str, field: str) -> bool:
    t = normalize(cell_text)
    for kw in KEYWORDS[field]:
        if kw in t:
            return True
    return False

def is_candidate_sheet(name: str) -> bool:
    n = normalize(name)
    return any(h in n for h in SHEET_NAME_HINTS)

def to_float(x) -> Optional[float]:
    if x is None or (isinstance(x, str) and normalize(x) in {"", "nan", "none", "-"}):
        return None
    if isinstance(x, (int, float)):
        return float(x)
    s = str(x).strip()
    s = s.replace(" ", "").replace(",", ".")  # support decimal commas
    s = re.sub(r"[^0-9eE+\-.]", "", s)       # remove units/symbols
    try:
        return float(s)
    except Exception:
        return None

# -------- openpyxl path for xlsx/xlsm (with cell addresses) --------
def find_header_openpyxl(ws) -> Tuple[Optional[int], Dict[str, int]]:
    max_scan_rows = min(25, ws.max_row)
    for r in range(1, max_scan_rows + 1):
        row_vals = [ws.cell(row=r, column=c).value for c in range(1, ws.max_column + 1)]
        hits: Dict[str, int] = {}
        for c, val in enumerate(row_vals, start=1):
            if val is None: 
                continue
            text = str(val)
            for field in ["component", "criterion", "actual", "allowable", "rf", "output_set"]:
                if matches_any(text, field):
                    hits[field] = c - 1  # 0-based index
        if len(hits) >= 3:
            return r, hits
    return None, {}

def col_letter(idx_1based: int) -> str:
    # Excel column index -> letter
    s = ""
    n = idx_1based
    while n:
        n, rem = divmod(n - 1, 26)
        s = chr(65 + rem) + s
    return s

def extract_openpyxl(path: Path) -> List[Dict]:
    wb = load_workbook(filename=str(path), data_only=True, read_only=True)
    results: List[Dict] = []

    # Prefer candidate sheets, else scan all
    sheets = [s for s in wb.sheetnames if is_candidate_sheet(s)]
    if not sheets:
        sheets = wb.sheetnames[:]

    for sh in sheets:
        ws = wb[sh]
        header_row, colmap = find_header_openpyxl(ws)
        if not header_row:
            continue

        r = header_row + 1
        blank_streak = 0
        while r <= ws.max_row and blank_streak < 2:
            any_data = False
            row_dict: Dict[str, Any] = {
                "component": None, "criterion": None,
                "actual": None, "allowable": None, "rf": None,
                "output_set_id": None,
                "source_file": str(path),
                "source_sheet": sh,
                "source_cells": {}
            }
            for field, c0 in colmap.items():
                c = c0 + 1  # 1-based
                val = ws.cell(row=r, column=c).value
                if field in {"actual", "allowable", "rf"}:
                    row_dict[field] = to_float(val)
                elif field in {"component", "criterion"}:
                    row_dict[field] = str(val).strip() if val is not None else None
                elif field == "output_set":
                    num = to_float(val)
                    row_dict["output_set_id"] = int(num) if num is not None and float(num).is_integer() else (str(val).strip() if val else None)
                row_dict["source_cells"][field] = f"{col_letter(c)}{r}"
                if val not in (None, "", "-"):
                    any_data = True

            if not any_data:
                blank_streak += 1
                r += 1
                continue
            blank_streak = 0

            # Compute RF if missing but possible
            if row_dict.get("rf") is None and row_dict.get("actual") and row_dict.get("allowable"):
                a = row_dict["actual"]; b = row_dict["allowable"]
                if a and b and a != 0:
                    row_dict["rf"] = b / a

            # Accept if we at least have component and RF (or actual+allowable)
            if (row_dict.get("component") or row_dict.get("criterion")) and \
               (row_dict.get("rf") is not None or
                (row_dict.get("actual") is not None and row_dict.get("allowable") is not None)):
                results.append(row_dict)

            r += 1

    return results

# -------- pandas fallback for legacy .xls (no cell addresses) --------
def find_header_pandas(df: pd.DataFrame) -> Tuple[Optional[int], Dict[str, int]]:
    max_scan_rows = min(25, len(df))
    for i in range(max_scan_rows):
        row = df.iloc[i]
        hits: Dict[str, int] = {}
        for c, val in enumerate(list(row)):
            text = normalize(val)
            if not text or text in {"nan", "none"}:
                continue
            for field in ["component", "criterion", "actual", "allowable", "rf", "output_set"]:
                if matches_any(text, field):
                    hits[field] = c
        if len(hits) >= 3:
            return i, hits
    return None, {}

def extract_pandas_xls(path: Path) -> List[Dict]:
    try:
        xls = pd.ExcelFile(path, engine="xlrd")
    except Exception:
        return []
    results: List[Dict] = []
    # Prefer candidate sheets
    sheets = [s for s in xls.sheet_names if is_candidate_sheet(s)]
    if not sheets:
        sheets = xls.sheet_names[:]
    for sh in sheets:
        try:
            df = pd.read_excel(xls, sheet_name=sh, header=None, dtype=str)
        except Exception:
            continue
        if df.empty:
            continue
        header_row, colmap = find_header_pandas(df)
        if header_row is None:
            continue
        start = header_row + 1
        blanked = 0
        for r in range(start, len(df)):
            row = list(df.iloc[r])
            nonempty = sum(1 for v in row if pd.notna(v) and str(v).strip() != "")
            if nonempty <= 1:
                if results:
                    break
                else:
                    continue
            item = {
                "component": None, "criterion": None,
                "actual": None, "allowable": None, "rf": None,
                "output_set_id": None,
                "source_file": str(path),
                "source_sheet": sh,
                "source_cells": None
            }
            for field, c in colmap.items():
                val = row[c] if c < len(row) else None
                if field in {"actual", "allowable", "rf"}:
                    item[field] = to_float(val)
                elif field in {"component", "criterion"}:
                    item[field] = str(val).strip() if val else None
                elif field == "output_set":
                    num = to_float(val)
                    item["output_set_id"] = int(num) if num is not None and float(num).is_integer() else (str(val).strip() if val else None)
            if item.get("rf") is None and item.get("actual") and item.get("allowable") and item["actual"] != 0:
                item["rf"] = item["allowable"] / item["actual"]
            if (item.get("component") or item.get("criterion")) and \
               (item.get("rf") is not None or (item.get("actual") is not None and item.get("allowable") is not None)):
                results.append(item)
    return results

def crawl_and_extract(root: Path) -> Tuple[List[Dict], List[Dict]]:
    records: List[Dict] = []
    warnings: List[Dict] = []
    for p in root.rglob("*"):
        if p.is_file() and p.suffix.lower() in SUPPORTED_EXT:
            try:
                if p.suffix.lower() in {".xlsx", ".xlsm"}:
                    recs = extract_openpyxl(p)
                else:
                    recs = extract_pandas_xls(p)  # .xls fallback
                if not recs:
                    warnings.append({"file": str(p), "warning": "No summary-like table found"})
                else:
                    records.extend(recs)
            except Exception as e:
                warnings.append({"file": str(p), "warning": f"Error: {e}"})
    return records, warnings

def main():
    if len(sys.argv) < 2:
        print("Usage: python extract_excel_summary.py <root_calc_folder> [<out_json_path>]")
        sys.exit(1)
    root = Path(sys.argv[1]).expanduser().resolve()
    out_json = Path(sys.argv[2]).resolve() if len(sys.argv) >= 3 else Path("outputs/extracted/excel_summary.json")
    out_log = out_json.with_name("excel_summary_log.csv")

    out_json.parent.mkdir(parents=True, exist_ok=True)

    records, warns = crawl_and_extract(root)

    # simple de-dup
    seen = set()
    unique = []
    for r in records:
        key = (r.get("component"), r.get("criterion"), r.get("rf"), r.get("source_file"), r.get("source_sheet"))
        if key not in seen:
            seen.add(key)
            unique.append(r)

    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(unique, f, indent=2, ensure_ascii=False)

    pd.DataFrame(warns or [{"file":"", "warning":""}]).to_csv(out_log, index=False)

    print(f"[OK] Extracted {len(unique)} rows  â†’ {out_json}")
    print(f"[OK] Log                        â†’ {out_log}")

if __name__ == "__main__":
    main()

[OK] Extracted 0 rows  â†’ outputs\extracted\excel_summary.json
[OK] Log                        â†’ outputs\extracted\excel_summary_log.csv


In [None]:
# Use this block instead of the "main()" function in your Notebook
from pathlib import Path

# --- DEFINE YOUR INPUT PATH HERE ---
# Point this to the folder containing your .xlsx files
input_folder = "inputs/excel" 

# --- DEFINE YOUR OUTPUT PATH HERE ---
output_json = "outputs/extracted/excel_summary.json"

# Run the extraction
root_path = Path(input_folder).expanduser().resolve()
out_json_path = Path(output_json).resolve()

# Create output directory if it doesn't exist
out_json_path.parent.mkdir(parents=True, exist_ok=True)

# Run the crawler (from your provided code)
records, warns = crawl_and_extract(root_path)

# Simple de-dup logic (from your provided code)
seen = set()
unique = []
for r in records:
    key = (r.get("component"), r.get("criterion"), r.get("rf"), r.get("source_file"), r.get("source_sheet"))
    if key not in seen:
        seen.add(key)
        unique.append(r)

# Save to JSON
with open(out_json_path, "w", encoding="utf-8") as f:
    json.dump(unique, f, indent=2, ensure_ascii=False)

print(f"âœ… Success! Extracted {len(unique)} rows from Excel files.")
print(f"ðŸ“‚ Data saved to: {out_json_path}")

In [None]:
# extract_op2_sets.py
# Phase 2 - Step 5: List available Output Set IDs (subcases) from OP2 files.
# Output: outputs/extracted/op2_loads.json

import sys
import json
from pathlib import Path
from typing import Dict, List

from pyNastran.op2.op2 import OP2

def list_sets(op2_path: Path) -> List[int]:
    op2 = OP2()
    # Quiet reading; skip non-essential tables for speed
    op2.read_op2(str(op2_path), combine=True, build_dataframe=False, skip_undefined_matrices=True)
    # Collect subcase / output set IDs found in common result tables
    set_ids = set()

    # Try several result containers if present
    candidates = [
        getattr(op2, "cquad4_stress", None),
        getattr(op2, "ctria3_stress", None),
        getattr(op2, "cbar_stress", None),
        getattr(op2, "gpforce", None),
        getattr(op2, "grid_point_forces", None),
        getattr(op2, "displacements", None),
    ]
    for c in candidates:
        if c is None:
            continue
        # new-style objects expose ._subcase_id or .subcase_id; also ._times for transient
        try:
            # Pandas-free simple API: iterate keys (subcase->data)
            for key in c.keys():
                try:
                    set_ids.add(int(key))
                except Exception:
                    pass
        except Exception:
            # Fallback: ignore if container isn't dict-like
            pass

    return sorted(set_ids)

def main():
    if len(sys.argv) < 2:
        print("Usage: python extract_op2_sets.py <op2_root_folder> [<out_json>]")
        sys.exit(1)

    root = Path(sys.argv[1]).expanduser().resolve()
    out_json = Path(sys.argv[2]).resolve() if len(sys.argv) >= 3 else Path("outputs/extracted/op2_loads.json")
    out_json.parent.mkdir(parents=True, exist_ok=True)

    result: Dict[str, List[int]] = {}
    for p in root.rglob("*.op2"):
        try:
            result[str(p)] = list_sets(p)
            print(f"[OK] {p.name}: sets={result[str(p)]}")
        except Exception as e:
            result[str(p)] = []
            print(f"[WARN] {p.name}: {e}")

    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2)

    print(f"[DONE] Wrote {out_json}")

if __name__ == "__main__":
    main()

In [None]:
# build_load_case_map.py
# Phase 2 - Step 6: Validate and emit the OutputSet -> PDF CaseID mapping JSON.
# Inputs:
#   - outputs/extracted/excel_summary.json (from Step 4)
#   - outputs/extracted/op2_loads.json     (from Step 5)  [optional check]
#   - inputs/load_case_map_seed.csv        (your manual seed)
# Output:
#   - outputs/extracted/load_case_map.json

import sys
import json
import pandas as pd
from pathlib import Path

def main():
    if len(sys.argv) < 4:
        print("Usage: python build_load_case_map.py <excel_summary_json> <seed_csv> <out_json> [<op2_loads_json>]")
        sys.exit(1)

    excel_summary = Path(sys.argv[1]).resolve()
    seed_csv      = Path(sys.argv[2]).resolve()
    out_json      = Path(sys.argv[3]).resolve()
    op2_json      = Path(sys.argv[4]).resolve() if len(sys.argv) >= 5 else None

    out_json.parent.mkdir(parents=True, exist_ok=True)

    # Load extracted Excel rows
    with open(excel_summary, "r", encoding="utf-8") as f:
        rows = json.load(f)
    excel_sets = sorted({ int(r["output_set_id"]) for r in rows if r.get("output_set_id") is not None and str(r["output_set_id"]).isdigit() })

    # Load optional OP2 sets for validation
    op2_sets = set()
    if op2_json and op2_json.exists():
        with open(op2_json, "r", encoding="utf-8") as f:
            op2_map = json.load(f)
        for _, sets in op2_map.items():
            for s in sets:
                if isinstance(s, int) or (isinstance(s, str) and s.isdigit()):
                    op2_sets.add(int(s))

    # Load seed CSV mapping
    seed = pd.read_csv(seed_csv).fillna("")
    mapping = {}
    problems = []

    for _, row in seed.iterrows():
        osid = str(row["output_set_id"]).strip()
        pcid = str(row["pdf_case_id"]).strip()
        if not osid or not osid.isdigit() or not pcid or not pcid.isdigit():
            problems.append(f"Invalid row in seed: output_set_id={osid}, pdf_case_id={pcid}")
            continue
        osid_i = int(osid)
        mapping[osid] = pcid

        # Optional: sanity checks
        if osid_i not in excel_sets:
            problems.append(f"Seed maps OutputSet {osid_i}, but it's not seen in excel_summary.json")
        if op2_sets and osid_i not in op2_sets:
            problems.append(f"Seed maps OutputSet {osid_i}, but it's not seen in op2_loads.json")

    # Emit JSON
    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(mapping, f, indent=2)

    print(f"[OK] Wrote mapping â†’ {out_json}")
    if problems:
        print("\n[WARN] Validation notes:")
        for p in problems:
            print(" - " + p)

if __name__ == "__main__":
    main()