In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Run your LangGraph workflow with one compiled graph per Ollama port, in parallel.

Prereqs:
  pip install langgraph langchain langchain-community langchain-ollama pandas openpyxl

Start multiple Ollama instances (examples):
  ollama serve -p 11434 &
  ollama serve -p 11435 &
  ollama serve -p 11436 &
  ollama serve -p 11437 &
  ollama serve -p 11438 &
Ensure the model (e.g., "llama3.1") is pulled on each instance.

Author: Daniel-ready multiport runner
"""

import os
import math
import time
import json
import traceback
from pathlib import Path
from typing import Annotated, List, TypedDict, Dict, Any, Optional

import pandas as pd

# LangGraph core
from langgraph.graph import StateGraph, START, END, add_messages

# Messages (LangChain core)
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

# ChatOllama integration (prefer the dedicated package, fallback to community)
try:
    from langchain_ollama import ChatOllama  # preferred
except Exception:
    from langchain_community.chat_models import ChatOllama  # fallback

In [2]:
from pathlib import Path
import nest_asyncio
import numpy as np
from tqdm.asyncio import tqdm_asyncio
nest_asyncio.apply()

In [3]:

# --------------------------------------------------------------------
# 1) State definition (adds "env" since nodes read it)
# --------------------------------------------------------------------
class State(TypedDict):
    messages: Annotated[List, add_messages]    # chat history, aggregated via add_messages
    structure: str
    objective: str
    testcases: str
    review_summary: str
    env: Dict[str, Any]                        # optional env payload (e.g., port, model, etc.)

# --------------------------------------------------------------------
# 2) NODES (YOUR EXACT NODE FUNCTIONS)
#     NOTE: They rely on a module-level `llm` which will be set per process.
# --------------------------------------------------------------------
# Global `llm` placeholder; each worker process will assign it.
llm = None

def eval_structure(state: State) -> Dict[str, Any]:
    testcases = state.get("testcases", "")
    environment = state.get("env", {})

    system_message = """You are a Senior Software QA Architect. Your task is to evaluate the structure of a test case against the provided Acceptance Criteria. You will determine whether the acceptance criteria has been appropriately satisfied:
    Acceptance Criteria:
    - Test case steps follow a logical, sequential path that ensures reproducibility.
    - Actions are documented where data collection is required, and data collection is accurate and complete.
    - Steps are clear, concise, and free of jargon; absolutes are avoided.
    - Steps are numbered or ordered for easy execution.
    - Outcome aligns with the expected results.

    You will provide: assessment_verdict, assessment_rationale, identified_gaps, actionable_recommendations, and test_case_improvements. Propose improvements detecting misalignment between test case objective and actual steps.

    Response Format (produce exactly this JSON structure):
    {
    "assessment_verdict": "complete|partial|inadequate",
    "assessment_rationale": "<description of how and why the value for assessment_verdict was chosen>",
    "identified_gaps": ["<gap 1>", "<gap 2>", ...],
    "recommendations": ["<recommendation 1>", "<recommendation 2>", ...],
    "test_case_improvements": ["<improvement 1>", "<improvement 2>", ...]
    }
    """
    user_message = f"""Task: Evaluate the completeness of a test case against the acceptance criteria for Test Case Structure.
    Input Variables:
    - Test Case:
    {testcases}
    Produce output strictly in the Response Format JSON. Do not use Markdown.

    Now perform the review on the provided Input Variables and return only the Response Format JSON.
    """
    resp = llm.invoke([
        SystemMessage(content=system_message),
        HumanMessage(content=user_message)
    ])

    ai_msg = AIMessage(content=resp.content, name="eval_structure")
    updates = {
        "messages": [ai_msg],
        "structure": resp.content,
    }
    return updates

def eval_objective(state: State) -> Dict[str, Any]:
    testcases = state.get("testcases", "")
    environment = state.get("env", {})

    system_message = """You are a Senior Software QA Architect. Your task is to evaluate the completeness of a test case against the provided Acceptance Criteria. You will determine whether the acceptance criteria has been appropriately satisfied:
    Acceptance Criteria:
        - Test case includes all required components: unique ID, descriptive title, preconditions, input data, expected results, and detailed steps.
        - Test case objective is clear, specific, and aligned with the requirement.
        - Test case meets its intended objective (e.g., verifies actual outcome, not just UI interaction).
        - Both positive and negative scenarios are included.
        - Expected results align with the requirement and provide sufficient evidence for verification.
        - Completeness score should meet or exceed 80%.

    You will provide: assessment_verdict, assessment_rationale, identified_gaps, actionable_recommendations, and test_case_improvements. Propose improvements detecting misalignment between test case objective and actual steps.

    Response Format (produce exactly this JSON structure):
    {
        "assessment_verdict": "complete|partial|inadequate",
        "assessment_rationale": "<description of how and why the value for assessment_verdict was chosen>",
        "identified_gaps": ["<gap 1>", "<gap 2>", ...],
        "recommendations": ["<recommendation 1>", "<recommendation 2>", ...],
        "test_case_improvements": ["<improvement 1>", "<improvement 2>", ...]
    }
    """
    user_message = f"""
    Task: Evaluate the completeness of a test case against the acceptance criteria for Test Case Completeness.
    Input Variables:
    - Test Case:
    {testcases}

    Acceptance Criteria:
    - Test case includes all required components: unique ID, descriptive title, preconditions, input data, expected results, and detailed steps.
    - Test case objective is clear, specific, and aligned with the requirement.
    - Test case meets its intended objective (e.g., verifies actual outcome, not just UI interaction).
    - Both positive and negative scenarios are included.
    - Expected results align with the requirement and provide sufficient evidence for verification.
    - Completeness score should meet or exceed 80%.

    Produce output strictly in the Response Format JSON. Do not use Markdown.
    """
    resp = llm.invoke([
        SystemMessage(content=system_message),
        HumanMessage(content=user_message)
    ])

    ai_msg = AIMessage(content=resp.content, name="eval_objective")
    updates = {
        "messages": [ai_msg],
        "objective": resp.content,
    }
    return updates

def get_review_summary(state: State) -> Dict[str, Any]:
    environment = state.get("env", {})

    all_messages_text = []
    for m in state.get("messages", []):
        content = getattr(m, "content", str(m))
        all_messages_text.append(content)

    system_message = """
    You are a Senior Test Verification Traceability Analyst with expertise in software quality assurance.
    You specialize in summarizing test case reviews, ensuring that critical improvements and recommendations are captured to provide a robust summary rationale.

    Response Format (produce exactly this JSON structure):

    {
        'testcase_review_summary': <summary of the outputs from test case review nodes>
    }
    """
    user_message = f"""
    Summarize the below test case review:

    ## Inputs
    Test Case Review: 
    {json.dumps(all_messages_text, ensure_ascii=False)}

    ## Notes 
    Produce output strictly in the described Response Format
    """
    resp = llm.invoke([
        SystemMessage(content=system_message),
        HumanMessage(content=user_message)
    ])

    ai_msg = AIMessage(content=resp.content, name="get_review_summary")
    updates = {
        "messages": [ai_msg],
        "review_summary": resp.content
    }

    return updates

# --------------------------------------------------------------------
# 3) Graph builder (compile with your nodes)
# --------------------------------------------------------------------
def compile_workflow() -> Any:
    graph = StateGraph(State)
    graph.add_node("eval_structure", eval_structure)
    graph.add_node("eval_objective", eval_objective)
    graph.add_node("get_review_summary", get_review_summary)

    graph.add_edge(START, "eval_structure")
    graph.add_edge(START, "eval_objective")
    graph.add_edge("eval_structure", "get_review_summary")
    graph.add_edge("eval_objective", "get_review_summary")
    graph.add_edge("get_review_summary", END)

    return graph.compile()

# --------------------------------------------------------------------
# 4) Multiport runner class
# --------------------------------------------------------------------
class LangGraphMultiPort:
    def __init__(
        self,
        input_df: pd.DataFrame,
        prompt_vars: List[str] = None,        # columns to include; must include 'testcases'
        model: str = "llama3.1",
        start_port: int = 11434,
        num_ports: int = 5,
        model_kwargs: Optional[Dict[str, Any]] = None,
        progress_every: int = 10,
        output_path: Optional[str] = "./output/langgraph_results.xlsx",
        enable_json_flatten: bool = True,     # flatten review_summary JSON if parsable
        json_key: Optional[str] = None,
        env_payload: Optional[Dict[str, Any]] = None,
    ):
        if prompt_vars is None:
            prompt_vars = input_df.columns.tolist()
        if "testcases" not in prompt_vars:
            raise ValueError("prompt_vars must include the 'testcases' column for these nodes.")

        self.df = input_df
        self.prompt_vars = prompt_vars
        self.model = model
        self.start_port = start_port
        self.num_ports = num_ports
        self.model_kwargs = model_kwargs or {"temperature": 0.1, "format": "json", "keep_alive": "30m"}
        self.progress_every = progress_every
        self.output_path = output_path
        self.enable_json_flatten = enable_json_flatten
        self.json_key = json_key
        self.env_payload = env_payload or {}

        # Prepare records with original index
        self.records = []
        for i, row in self.df.iterrows():
            rec = {k: row[k] for k in self.prompt_vars}
            rec["_index"] = i
            self.records.append(rec)

        self.ports = [self.start_port + i for i in range(self.num_ports)]

    @staticmethod
    def _chunk(items: List[Dict[str, Any]], n_chunks: int) -> List[List[Dict[str, Any]]]:
        if n_chunks <= 0:
            return [items]
        size = math.ceil(len(items) / n_chunks)
        return [items[i:i + size] for i in range(0, len(items), size)]

    @staticmethod
    def _prepare_state(item: Dict[str, Any], env: Dict[str, Any]) -> State:
        return State(
            messages=[],
            structure="",
            objective="",
            testcases=str(item.get("testcases", "")),
            review_summary="",
            env=env,
        )

    @staticmethod
    def _flatten_json(text: str, json_key: Optional[str]) -> Dict[str, Any]:
        out: Dict[str, Any] = {}
        try:
            data = json.loads(text)
            payload = data
            if json_key and isinstance(data, dict) and json_key in data:
                payload = data[json_key]
            if isinstance(payload, dict):
                for k, v in payload.items():
                    out[f"summary.{k}"] = v
            elif isinstance(payload, list):
                for i, d in enumerate(payload):
                    if isinstance(d, dict):
                        for k, v in d.items():
                            out[f"summary[{i}].{k}"] = v
                    else:
                        out[f"summary[{i}]"] = d
        except Exception:
            pass
        return out

    @staticmethod
    def _worker(args) -> List[Dict[str, Any]]:
        """
        Worker for one port. Sets the global llm, compiles the workflow, and processes items.
        """
        (port, model, model_kwargs, items, progress_every, enable_json_flatten, json_key, base_env) = args

        # Assign per-process llm (so node functions use the right instance)
        globals()['llm'] = ChatOllama(model=model, base_url=f"http://localhost:{port}", **(model_kwargs or {}))

        workflow = compile_workflow()
        results: List[Dict[str, Any]] = []
        total = len(items)
        start = time.time()
        print(f"[Port {port}] Processing {total} items...")

        for idx, item in enumerate(items, start=1):
            try:
                env = dict(base_env)
                env.update({"port": port, "model": model})
                state = LangGraphMultiPort._prepare_state(item, env)
                out_state: State = workflow.invoke(state)  # sync nodes

                row = {
                    "port": port,
                    "item_index": item.get("_index", idx - 1),
                    "structure": out_state.get("structure", ""),
                    "objective": out_state.get("objective", ""),
                    "review_summary": out_state.get("review_summary", ""),
                    "testcases": out_state.get("testcases", ""),
                }

                if enable_json_flatten:
                    # Flatten review_summary only (you can also parse structure/objective if desired)
                    row.update(LangGraphMultiPort._flatten_json(row["review_summary"], json_key))

                results.append(row)

                if progress_every and (idx % progress_every == 0 or idx == total):
                    elapsed = time.time() - start
                    print(f"[Port {port}] {idx}/{total} done ({elapsed:.1f}s)")
            except Exception as e:
                print(f"[Port {port}] ERROR on item idx={idx}: {e}\n{traceback.format_exc()}")
                results.append({
                    "port": port,
                    "item_index": item.get("_index", idx - 1),
                    "error": str(e),
                    "testcases": str(item.get("testcases", "")),
                })

        print(f"[Port {port}] Completed {total} items in {time.time()-start:.1f}s")
        return results

    def run(self) -> pd.DataFrame:
        # Split items across ports
        chunks = self._chunk(self.records, len(self.ports))

        # Build worker args
        worker_args = []
        for port, items in zip(self.ports, chunks):
            worker_args.append((
                port,
                self.model,
                self.model_kwargs,
                items,
                self.progress_every,
                self.enable_json_flatten,
                self.json_key,
                self.env_payload
            ))

        print(f"Spawning {len(worker_args)} workers for ports: {self.ports}")
        all_results: List[Dict[str, Any]] = []

        from concurrent.futures import ProcessPoolExecutor, as_completed
        with ProcessPoolExecutor(max_workers=len(worker_args)) as ex:
            futs = [ex.submit(LangGraphMultiPort._worker, args) for args in worker_args]
            for fut in as_completed(futs):
                res = fut.result()
                all_results.extend(res)

        # Sort and save
        all_results.sort(key=lambda r: r.get("item_index", 0))
        df_out = pd.DataFrame(all_results)

        if self.output_path:
            Path(self.output_path).parent.mkdir(parents=True, exist_ok=True)
            df_out.to_excel(self.output_path, index=False)
            print(f"Results written to: {self.output_path}")

        return df_out    

In [4]:
# --------------------------------------------------------------------
# 5) End-to-end usage
# --------------------------------------------------------------------
# Adjust to your file path
input_excel = "./ready_for_testcase_summarization_prompt_test.xlsx"
if not os.path.exists(input_excel):
    raise FileNotFoundError(f"Input file not found: {input_excel}")

df = pd.read_excel(input_excel, engine="openpyxl")
prompt_vars = ["testcases"]  # minimal set needed by your nodes

# Ports + model (mirrors the batch style)
num_ports = 5
start_port = 11436
model = "deepseek-r1" #"llama3.1"
model_kwargs = {"temperature": 0.1, "format": "json", "keep_alive": "30m"}

output_dir = Path("./output")
output_dir.mkdir(parents=True, exist_ok=True)
output_path = str(output_dir / "test_case_review_results_test_dec_8_2025.xlsx")

runner = LangGraphMultiPort(
    input_df=df,
    prompt_vars=prompt_vars,
    model=model,
    start_port=start_port,
    num_ports=num_ports,
    model_kwargs=model_kwargs,
    progress_every=10,
    output_path=output_path,
    enable_json_flatten=True,     # set False if you prefer raw JSON strings
    json_key=None,                # e.g., 'testcase_review_summary' to extract only that
    env_payload={"run_id": "structure_objective_summary_v1"}  # optional
)

t0 = time.time()
df_out = runner.run()
print(f"Processed {len(df_out)} rows in {time.time()-t0:.1f}s")


Spawning 5 workers for ports: [11436, 11437, 11438, 11439, 11440]
[Port 11440] Processing 1 items...[Port 11436] Processing 1 items...[Port 11437] Processing 1 items...[Port 11438] Processing 1 items...[Port 11439] Processing 1 items...




[Port 11438] 1/1 done (80.6s)
[Port 11438] Completed 1 items in 80.6s
[Port 11439] 1/1 done (115.5s)
[Port 11439] Completed 1 items in 115.5s
[Port 11437] 1/1 done (132.9s)
[Port 11437] Completed 1 items in 132.9s
[Port 11440] 1/1 done (284.3s)
[Port 11440] Completed 1 items in 284.3s
[Port 11436] 1/1 done (303.9s)
[Port 11436] Completed 1 items in 303.9s
Results written to: output/test_case_review_results_test_dec_8_2025.xlsx
Processed 5 rows in 304.1s
