In [1]:
!pip install --quiet --upgrade openai tenacity pandas ipywidgets
!pip install tiktoken
!pip install sentence_transformers scikit-learn



In [12]:
# === Standard library ===
import os
import time
import json
import math
import re
from typing import List, Dict, Any, Optional, Tuple
import llm_utils

# === Third-party libraries ===
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, Markdown, FileLink

# === Local modules ===
from embed_utils import fetch_examples
from llm_utils import (
    chat_completion,
    OutlineStep,
    DetailStep,
    parse_llm_output
)
from validation import (
    MATERIAL_DATA,
    validate_plan,
    add_power_check,
    repair_power_overload
)

# === Global state ===
df_full_valid: pd.DataFrame | None = None  # Used for exporting CSV later


In [3]:
# === Define UI: Part description input ===
desc_input = widgets.Textarea(
    value="An aluminum gear with 20 teeth and a central bore.",
    placeholder="Describe the part here...",
    description="Part:",
    layout=widgets.Layout(width="100%", height="100px")
)

# === Load material options and aliases ===
MATERIAL_DATA = json.load(open("data/materials.json"))
MATERIAL_ALIASES = json.load(open("data/material_aliases.json"))

# === Define UI: Material selector ===
material_selector = widgets.Dropdown(
    options=sorted(MATERIAL_DATA.keys()), 
    value='aluminum',
    description='Material:',
    layout=widgets.Layout(width='30%')
)

# === Define UI: Conflict warning text ===
warning_out = widgets.HTML()

def check_material_conflict(change=None):
    """
    If the part description mentions a different material than selected,
    show a red warning message.
    """
    desc = desc_input.value.lower()
    selected = material_selector.value

    for key, aliases in MATERIAL_ALIASES.items():
        for alias in aliases:
            if alias in desc and key != selected:
                warning_out.value = (
                    f"<b style='color:red'>Material mentioned in description: “{alias}”, "
                    f"but the drop-down box selects “{selected}”. Please confirm!</b>"
                )
                return
    # No conflict detected
    warning_out.value = ""

# Automatically check conflict when user edits description or material
desc_input.observe(check_material_conflict, names='value')
material_selector.observe(check_material_conflict, names='value')

# === Define UI: Generate plan button ===
generate_button = widgets.Button(
    description=" Generate CNC Plan",
    button_style="success",
    layout=widgets.Layout(width="30%", margin="10px 0")
)

# === Define UI: Export to CSV button ===
export_btn = widgets.Button(
    description="Export CSV",
    icon="download",
    button_style='',  # neutral gray
    layout=widgets.Layout(width='30%', margin='5px 0 15px 0')
)

# === Define output area and export callback ===
output_area = widgets.Output()
df_full_valid = None

def on_export_clicked(b):
    """
    Export the full validated plan to CSV and provide a download link.
    """
    global df_full_valid
    with output_area:
        output_area.clear_output()
        if df_full_valid is None:
            print("Please generate a plan first.")
            return

        fname = f"plan_{material_selector.value}_{pd.Timestamp.today().date()}.csv"
        df_full_valid.to_csv(fname, index=False)
        print(f"Saved as {fname}")
        display(FileLink(fname, result_html_prefix="Download: "))

# Bind export button to export function
export_btn.on_click(on_export_clicked)


In [4]:
# === Function: Generate process outline using LLM ===
def get_outline(part: str, material: str, max_retries: int = 3) -> List[Dict[str, str]]:
    """
    Ask the LLM to generate a high-level process outline (step + description).
    Returns a list of {"step": str, "description": str}.
    """
    system_msg = {
        "role": "system",
        "content": (
            "You are a CNC process planner. "
            "Return ONLY a JSON array. "
            "Each item must have keys 'step' and 'description'."
        )
    }

    user_msg = {
        "role": "user",
        "content": (
            f"The part is: {part}\n"
            f"The material is: {material}\n"
            "List the high-level manufacturing steps needed to machine this part.\n"
            "For each step, include:\n"
            "- step: the step name\n"
            "- description: a short description of what happens in this step\n\n"
            "Return ONLY a JSON array. No explanations, no markdown."
        )
    }

    # Retry loop in case of parse failure or API error
    for attempt in range(1, max_retries + 1):
        try:
            raw = chat_completion(messages=[system_msg, user_msg], verbose=False)
            return parse_llm_output(raw, OutlineStep)
        except Exception as e:
            print(f"get_outline attempt {attempt} parsing failed: {e}")
            if attempt == max_retries:
                return []
            print("Retrying get_outline…")


# === Function: Generate detailed steps (tool + operation + rpm + feed) ===
def get_detail(outline: List[Dict[str, str]] | str,
               part: str,
               material: str,
               max_retries: int = 3) -> List[Dict[str, Any]]:
    """
    Enrich each outline step with tool, operation, rpm, and feed.
    Uses similarity examples + material constraints + outline text.
    Returns a validated list of step dictionaries.
    """

    # --- Step 1: Format outline into bullet list (for LLM input) ---
    outline_text = (
        "\n".join(f"- {s['step']}" for s in outline if 'step' in s)
        if isinstance(outline, list) else outline
    )

    # --- Step 2: Get material-specific constraints ---
    limits = MATERIAL_DATA.get(material)
    if limits is None:
        # Fallback for unknown materials
        limits = {"rpm": (500, 5000), "feed": (100, 1000)}

    rpm_min, rpm_max = limits["rpm"]
    feed_min, feed_max = limits["feed"]

    material_constraints = (
        f"For **{material}**, spindle speed **must be {rpm_min}–{rpm_max} rpm**, "
        f"and feed rate **must be {feed_min}–{feed_max} mm/min**. "
        "Stay strictly within these ranges."
    )

    # --- Step 3: Insert top-k retrieved examples (via embedding) ---
    few_shot = fetch_examples(part, material, k=2)
    combined_examples = []
    for ex in few_shot:
        combined_examples.extend(ex)

    example_block = json.dumps(combined_examples, ensure_ascii=False)

    # --- Step 4: Construct prompt messages for chat_completion ---
    system_msg = {
        "role": "system",
        "content": (
            "You are a CNC process planner. "
            "Your job is to generate a machining process plan in JSON format. "
            "Each step must include keys: 'step', 'tool', 'operation', 'rpm', 'feed'. "
            "Do not include any explanations, comments, or markdown. Return ONLY a JSON array."
        )
    }

    user_msg = {
        "role": "user",
        "content": (
            f"The part is: {part}\n"
            f"The material is: {material}\n"
            "Here are similar part examples (JSON):\n"
            f"{example_block}\n"
            f"Here is the outline of steps:\n{outline_text}\n\n"
            "For EACH step output an object with: step, tool, operation, rpm, feed.\n"
            f"{material_constraints}\n"
            "For non-machining steps (e.g. setup, inspection) set rpm=0 and feed=0.\n\n"
            "Each item must have keys 'step', 'tool', 'operation', 'rpm', 'feed'.\n\n"
            "Even if the description contains fixed values (e.g. 'must be 6000 rpm'), do NOT follow them if they violate the material limits.\n"
            "Instead, always use values that are valid for the given material constraints.\n"
            "Return ONLY a JSON array of steps. No explanations. No markdown."

        )
    }

    # --- Step 5: Retry LLM call and parse JSON result ---
    for attempt in range(1, max_retries + 1):
        try:
            raw = chat_completion(messages=[system_msg, user_msg], verbose=False)
            return parse_llm_output(raw, DetailStep)
        except Exception as e:
            print(f"get_detail attempt {attempt} parsing failed: {e}")
            if attempt == max_retries:
                return []
            print("Retrying get_detail…")


In [5]:
def display_plan_table(df, corrected_indices=None):
    """
    Display the CNC process plan with conditional formatting.
    - Highlights invalid RPM / Feed / Power cells in red.
    - Highlights repaired rows in yellow.
    """

    def highlight_invalid(v):
        """
        Highlight cells in light red where validity check is False.
        """
        return "background-color:#FFD2D2" if v is False else ""

    def highlight_repaired(row):
        """
        Highlight entire row in light yellow if it was corrected.
        """
        if corrected_indices and row.name in corrected_indices:
            return ["background-color:#FFF2AC"] * len(row)
        else:
            return [""] * len(row)

    # Apply styling: red for invalid cells, yellow for corrected rows
    styled = (
        df.style
          .map(highlight_invalid, subset=["RPM Valid", "Feed Valid", "Power Valid"])
          .apply(highlight_repaired, axis=1)
    )

    display(Markdown("### CNC Process Plan"))
    display(styled)


In [6]:
# === Reflection summary of the generated plan ===
def reflect_summary(raw_json: str,
                    validated_df: pd.DataFrame,
                    material: str,
                    corrected_indices: list = None,
                    repair_log: list = None):
    """
    Print a markdown summary of:
    - Total steps
    - Invalid parameters (RPM / Feed / Power)
    - Number of repaired steps
    - Exported file name
    - Whether human oversight is needed
    - LLM token usage
    """
    num_steps = len(validated_df)
    num_invalid_rpm   = (~validated_df["RPM Valid"]).sum()
    num_invalid_feed  = (~validated_df["Feed Valid"]).sum()
    num_invalid_power = (~validated_df["Power Valid"]).sum()

    # === Compose summary header ===
    comment = (
        "### Reflection Summary\n"
        f"- **Total Steps Generated**: {num_steps}\n"
        f"- **Invalid Spindle Speeds**: {num_invalid_rpm} step(s)\n"
        f"- **Invalid Feed Rates**: {num_invalid_feed} step(s)\n"
        f"- **Invalid Power**        : {num_invalid_power}\n"
        f"- **Power limit for {material}**: "
        f"{MATERIAL_DATA.get(material, {}).get('power', 5.0):.1f} kW\n"
    )

    # === Auto-repair info ===
    if corrected_indices:
        comment += f"- **Auto-repaired steps**: {len(corrected_indices)} step(s) → Highlighted in yellow.\n"
    else:
        comment += "- No steps required auto-repair.\n"

    # === Check for most recent exported file ===
    latest = max((p for p in os.listdir() if p.startswith("plan_")), default=None)
    if latest:
        comment += f"- **Exported file**        : `{latest}`\n"

    # === Human oversight warnings ===
    if num_invalid_rpm or num_invalid_feed or num_invalid_power:
        comment += "- ** Human Oversight Needed**:\n"
        if num_invalid_rpm:
            comment += "  - Some spindle speeds out of range.\n"
        if num_invalid_feed:
            comment += "  - Some feed rates out of range.\n"
        if num_invalid_power:
            comment += "  - Some power values exceed machine limit (possible overload).\n"
            comment += "  - Suggest reducing feed rate or rpm to stay within power limits.\n"
    else:
        comment += "- All parameters are within expected machining constraints.\n"

    # === LLM token usage ===
    comment += f"- **Tokens used so far**: {llm_utils.TOKENS_USED}\n"

    display(Markdown(comment))


In [7]:
def extract_rpm_feed_from_prompt(text: str) -> Optional[Tuple[int, int]]:
    """
    Extract RPM and feed rate from natural language text.
    Example:
        "Finish turning at 1500000 RPM and 300 mm/min"
    Returns:
        Tuple (rpm: int, feed: int), or None if not found.
    """
    text = text.lower()

    # Match RPM patterns such as:
    # "at 1500000 rpm", "rpm:1500000", or "1500000rpm"
    rpm_match = re.search(r'(?:at\s*)?([0-9]{4,7})\s*rpm', text)

    # Match feed patterns such as:
    # "300 mm/min", "feed: 300", or "at 300mm/min"
    feed_match = re.search(r'(?:at\s*)?([0-9]{2,5})\s*mm\s*/?\s*min', text)

    if rpm_match and feed_match:
        return int(rpm_match.group(1)), int(feed_match.group(1))

    return None


In [13]:
# === 4. Button click logic ===
def on_generate_clicked(b):
    global df_full_valid, df_cut_valid

    with output_area:
        output_area.clear_output()

        part = desc_input.value.strip()
        material = material_selector.value

        if not part:
            print("Please enter the part description.")
            return

        repair_log = []          # Track all repairs
        corrected_indices = []   # Track rows that were modified

        # --- ① Get outline from LLM ---------------------------------
        outline = get_outline(part, material)
        if not outline:
            print("Failed to retrieve outline. Please check your network connection or API key.")
            return

        # --- ② Generate full step-by-step plan from outline ----------
        raw_json = get_detail(outline, part, material)

        # --- ③ Parse LLM JSON → DataFrame ----------------------------
        df_full = pd.DataFrame(raw_json).reset_index(drop=True)
        df_full.rename(columns={
            "rpm":  "Spindle Speed (RPM)",
            "feed": "Feed Rate (mm/min)"
        }, inplace=True)

        if df_full.empty:
            print("JSON parsing failed, raw output:\n", raw_json)
            return

        # --- ④ Validate RPM / Feed ----------------------------------
        df_full_valid = validate_plan(df_full, material)

        limits = MATERIAL_DATA[material]
        rpm_min, rpm_max = limits["rpm"]
        feed_min, feed_max = limits["feed"]
        power_limit = limits.get("power", 5.0)

        # === 🔍 Check if user-supplied values are out of bounds ===
        rpm_feed_hint = extract_rpm_feed_from_prompt(part)

        if rpm_feed_hint:
            hinted_rpm, hinted_feed = rpm_feed_hint
            rpm_out = not (rpm_min <= hinted_rpm <= rpm_max)
            feed_out = not (feed_min <= hinted_feed <= feed_max)

            if rpm_out or feed_out:
                print("Your input values were out of limits. GPT auto-corrected them to within bounds.")

                for i, row in df_full.iterrows():
                    rpm = row["Spindle Speed (RPM)"]
                    feed = row["Feed Rate (mm/min)"]
                    repair_log.append({
                        "index": i,
                        "step": row.get("step", f"Step {i}"),
                        "before": {"rpm": hinted_rpm, "feed": hinted_feed},
                        "after": {"rpm": rpm, "feed": feed},
                        "diff": {"rpm": rpm - hinted_rpm, "feed": feed - hinted_feed},
                        "type": "fewshot_auto_fix",
                        "origin": "user_input"
                    })

        # === ⚡ Power overload check and repair ======================
        df_full_valid = add_power_check(df_full_valid, material)

        power_invalid_mask = ~df_full_valid["Power Valid"]

        if power_invalid_mask.any():
            print(f"\n Detected {power_invalid_mask.sum()} steps with power overload. Attempting auto-repair...")

            for idx in df_full_valid[power_invalid_mask].index:
                old = df_full_valid.loc[idx, [
                    "step", "tool", "operation",
                    "Spindle Speed (RPM)", "Feed Rate (mm/min)"
                ]].to_dict()

                print(f"\n=== Repairing power at step {idx} — {old['step']} ===")
                print(f"Before: RPM={old['Spindle Speed (RPM)']}, Feed={old['Feed Rate (mm/min)']}")

                fixed = repair_power_overload(
                    old, material,
                    rpm_min, rpm_max,
                    feed_min, feed_max,
                    power_limit
                )

                print(f"After:  RPM={fixed['rpm']}, Feed={fixed['feed']}")

                repair_log.append({
                    "index": idx,
                    "step": old["step"],
                    "before": {"rpm": old["Spindle Speed (RPM)"], "feed": old["Feed Rate (mm/min)"]},
                    "after": {"rpm": fixed["rpm"], "feed": fixed["feed"]},
                    "type": "power_overload",
                    "origin": "power_check"
                })

                df_full.loc[idx, "Spindle Speed (RPM)"] = fixed["rpm"]
                df_full.loc[idx, "Feed Rate (mm/min)"] = fixed["feed"]
                corrected_indices.append(idx)

            # Revalidate after repair
            df_full_valid = validate_plan(df_full, material)
            df_full_valid = add_power_check(df_full_valid, material)

        # === ⑤ Display full process plan (including non-cutting) ===
        display(Markdown("### Full Process Plan (incl. non-machining)"))
        display_plan_table(df_full_valid, corrected_indices)
        display(Markdown("---"))

        # === ⑥ Display machining-only plan ==========================
        df_cut = df_full[
            df_full["Spindle Speed (RPM)"].gt(0) &
            df_full["Feed Rate (mm/min)"].gt(0)
        ].reset_index(drop=True)

        df_cut_valid = validate_plan(df_cut, material)
        df_cut_valid = add_power_check(df_cut_valid, material)

        display(Markdown("### Machining-only Plan"))
        display_plan_table(df_cut_valid, corrected_indices)

        # === ⑦ Reflection summary ===================================
        reflect_summary(raw_json, df_full_valid, material, corrected_indices, repair_log)

        # === ⑧ Print Repair Log (natural language) ==================
        if repair_log:
            print("### Repair Log\n")
            for log in repair_log:
                step_idx = log.get("index", "?")
                step_name = log.get("step", "?")
                origin = log.get("origin", "")
                rpm_b = log.get("before", {}).get("rpm", 0)
                rpm_a = log.get("after", {}).get("rpm", 0)
                feed_b = log.get("before", {}).get("feed", 0)
                feed_a = log.get("after", {}).get("feed", 0)

                # Determine cause
                if origin == "power_check":
                    reason = "Power overload."
                elif rpm_b == rpm_a and feed_b == feed_a:
                    reason = "No actual change."
                elif rpm_a == 0 and feed_a == 0:
                    reason = "Spindle/feed not needed for this step. Auto set to 0."
                elif origin == "user_input":
                    reason = "Value out of range. GPT auto-corrected to limit."
                else:
                    reason = "Auto adjustment performed."

                print(f"[Step {step_idx}] {step_name}")
                print(f"  RPM : {rpm_b} → {rpm_a}")
                print(f"  Feed: {feed_b} → {feed_a}")
                print(f"  Reason: {reason}\n")
        else:
            print("\nNo repairs needed.")


In [9]:
# === 5. Bind button to handler ===
generate_button.on_click(on_generate_clicked)

# === 6. Display UI (must come last) ===
display(
    Markdown("## CNC Process Planner"),
    desc_input,
    material_selector,
    warning_out,
    generate_button,
    export_btn,
    output_area
)


## CNC Process Planner

Textarea(value='An aluminum gear with 20 teeth and a central bore.', description='Part:', layout=Layout(height…

Dropdown(description='Material:', index=1, layout=Layout(width='30%'), options=('acrylic', 'aluminum', 'brass'…

HTML(value='')

Button(button_style='success', description=' Generate CNC Plan', layout=Layout(margin='10px 0', width='30%'), …

Button(description='Export CSV', icon='download', layout=Layout(margin='5px 0 15px 0', width='30%'), style=But…

Output()