# CoT + MCeT Class Diagram Pipeline

This notebook is an adaptation of your original **CoT + HITL** notebook. It replaces the HITL questionnaire step with a fully implemented **MCeT** pipeline (MCeT-A + MCeT-X) for class diagrams. It does **not** perform any automated evaluation metrics — it only runs the MCeT checks and produces structured reports.

**Instructions:**
- Replace `call_llm()` with your LLM call (or ensure `gemini_caller` is defined in the notebook and adjust `call_llm` to use it).
- Run cells in order. The notebook expects `requirements_text` and `cot_text` variables (strings) to be provided.


In [1]:

import requests
import json
import os
import re
from plantuml import PlantUML
# GEMINI_API_KEY2 = "AIzaSyC-G81Hhcw9HduAmGboYXBgDx_szPcNqLk"
# GEMINI_API_KEY= "AIzaSyDuTVudkjY-EgjYtejNhDkf0Tj_kerD6ns"
# GEMINI_API_KEY = "AIzaSyA3OqiW9wNl-yC0VUTBbq8Z3wA4jAbWA6o"
OPENAI_API_KEY = "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"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"


server = PlantUML(url="http://www.plantuml.com/plantuml/png/")
OUTPUT_DIR = "cot-diagrams"
os.makedirs(OUTPUT_DIR, exist_ok=True)
PLANTUML_LOG = os.path.join(OUTPUT_DIR, "cot-plantuml.txt")
COT_PROCESS_LOG = os.path.join(OUTPUT_DIR, "cot-process.txt")

cot_log = []


title = "Movie-Shop"
desc = "♣ Design a system for a movie-shop, in order to handle ordering of movies and browsing of the catalogue of the store, and user subscriptions with rechargeable cards. ♣ Only subscribers are allowed hiring movies with their own card. ♣ Credit is updated on the card during rent operations. ♣ Both users and subscribers can buy a movie and their data are saved in the related order. ♣ When a movie is not available it is ordered ."

def gemini_caller(prompt, model="gpt-3.5-turbo", max_tokens=1500, temperature=0.2):
    """Make API call to Gemini"""
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {OPENAI_API_KEY}"
    }
    payload = {
        "model": model,
        "messages": [
            {"role": "user", "content": prompt}
        ],
        "max_tokens": max_tokens,
        "temperature": temperature,
    }

    try:
        resp = requests.post(OPENAI_API_URL, headers=headers, data=json.dumps(payload), timeout=60)
        resp.raise_for_status()
    except requests.RequestException as e:
        print("OpenAI request failed:", e)
        return None

    j = resp.json()
    # Defensive access in case of unexpected response shape
    if "choices" in j and len(j["choices"]) > 0:
        # For chat completions, assistant text is in choices[0].message.content
        return j["choices"][0].get("message", {}).get("content")
    # older/other APIs might put text differently — try candidates fallback
    if "candidates" in j and len(j["candidates"]) > 0:
        return j["candidates"][0].get("content", {}).get("parts", [None])[0]
    print("OpenAI returned unexpected response:", j)
    return None

In [2]:
# Imports and LLM caller stub
import re, json, math, os, textwrap
from collections import Counter, defaultdict

# Replace this function with your LLM API integration.
def call_llm(prompt: str, system: str = None, model: str = "gpt-4o-mini", temperature: float = 0.7):
    
    return gemini_caller(prompt)
    


In [3]:
# Parsing CoT textual class output into a structured Diagram object
class ClassNode:
    def __init__(self, name: str):
        self.name = name
        self.attributes = []  # list of (name, type, desc)
        self.methods = []     # list of (signature, desc)
        self.description = ""

class Relationship:
    def __init__(self, left, right, rel_type, desc=""):
        self.left = left
        self.right = right
        self.type = rel_type
        self.desc = desc

class Diagram:
    def __init__(self):
        self.classes = {}  # name -> ClassNode
        self.relationships = []


def parse_cot_text(text: str) -> Diagram:
    diag = Diagram()
    # split blocks by '---' or '###' markers commonly used
    blocks = re.split(r'\n-{3,}\n|\n###', text.strip())
    for block in blocks:
        m = re.search(r'CLASS:\s*(.+)', block)
        if not m:
            continue
        cname = m.group(1).strip()
        node = ClassNode(cname)
        dm = re.search(r'Description:\s*(.*)', block)
        if dm:
            node.description = dm.group(1).strip()
        # attributes pattern: - name: Type - desc
        attrs = re.findall(r'-\s*([A-Za-z0-9_]+)\s*:\s*([A-Za-z0-9_\[\]]+)\s*-\s*(.*)', block)
        for a in attrs:
            node.attributes.append((a[0].strip(), a[1].strip(), a[2].strip()))
        # methods simple match: - sig(): desc
        methods = re.findall(r'-\s*([A-Za-z0-9_]+\([^\)]*\))\s*-\s*(.*)', block)
        for mth in methods:
            node.methods.append((mth[0].strip(), mth[1].strip()))
        # relationships
        rel_block = re.search(r'RELATIONSHIPS:\s*(.*)', block, flags=re.S)
        if rel_block:
            rel_lines = re.findall(r'-\s*(.+)', rel_block.group(1))
            for rl in rel_lines:
                t = re.search(r'(\w+).+?(\w+).*\(([^\)]+)\)', rl)
                if t:
                    left, right, rtype = t.group(1), t.group(2), t.group(3)
                    diag.relationships.append(Relationship(left, right, rtype, rl))
                else:
                    diag.relationships.append(Relationship('?', '?', 'association', rl))
        diag.classes[cname] = node
    return diag

# Small helper to pretty-print parsed diagram
def diagram_to_text(diag: Diagram) -> str:
    out = []
    for cname, c in diag.classes.items():
        out.append(f"CLASS: {cname}")
        if c.description:
            out.append(f"  Description: {c.description}")
        if c.attributes:
            out.append("  ATTRIBUTES:")
            for a in c.attributes:
                out.append(f"    - {a[0]}: {a[1]} - {a[2]}")
        if c.methods:
            out.append("  METHODS:")
            for m in c.methods:
                out.append(f"    - {m[0]} - {m[1]}")
    if diag.relationships:
        out.append("RELATIONSHIPS:")
        for r in diag.relationships:
            out.append(f"  - {r.left} --({r.type})--> {r.right} : {r.desc}")
    return "\n".join(out)


In [4]:
# PlantUML conversion (useful for LLM prompts)
def diagram_to_plantuml(diag: Diagram) -> str:
    lines = ["@startuml", "hide circle"]
    for cname, node in diag.classes.items():
        lines.append(f"class {cname} {{")
        for a in node.attributes:
            lines.append(f"  +{a[0]} : {a[1]}")
        for m in node.methods:
            lines.append(f"  {m[0]}")
        lines.append("}")
    for r in diag.relationships:
        rel = "--"
        t = r.type.lower()
        if 'composition' in t:
            rel = "*--"
        elif 'aggregation' in t:
            rel = "o--"
        elif 'inherit' in t:
            rel = "<|--"
        lines.append(f"{r.left} {rel} {r.right} : {r.desc}")
    lines.append("@enduml")
    return "\n".join(lines)


In [5]:
# MCeT prompt templates and voting helper
PROMPTS = {
    'P_split': """Split the following REQUIREMENTS into atomic requirement sentences. Return a JSON array of short requirement atoms (each <= 2 sentences).\n\nRequirements:\n{requirements}\n""",
    'P_holistic': """You are an expert software engineer. Compare the CLASS DIAGRAM (PlantUML) to the REQUIREMENTS.\nReturn JSON array of issues with fields: type(accuracy|completeness), element_type(class|attribute|relationship), element, explanation, localization.\n\nRequirements:\n{requirements}\n\nDiagram:\n{diagram}\n""",
    'P_class_atom': """Given REQUIREMENTS and the full CLASS DIAGRAM (PlantUML), examine the CLASS ATOM below and answer as JSON: {{exists_in_requirements: bool, missing_attributes: [], incorrect_attribute_types: [], missing_relationships: [], spurious: bool, explanation: str}}\n\nRequirements:\n{requirements}\n\nDiagram:\n{diagram}\n\nClass Atom:\n{atom}\n""",
    'P_req_atom_check': """For the requirement-atom below, check whether the CLASS DIAGRAM implements it. Return JSON: {{req_atom: <text>, implemented: bool, missing_elements: [], incorrect_elements: [], explanation: ''}}\n\nRequirement-atom:\n{req}\n\nDiagram:\n{diagram}\n"""
}


def llm_vote(prompt_key, fill, votes=5, **kwargs):
    prompt = PROMPTS[prompt_key].format(**fill)
    responses = []
    for i in range(votes):
        r = call_llm(prompt, **kwargs)
        responses.append(r.strip())
    # Try to parse responses as JSON; collect majority items
    parsed = []
    for r in responses:
        try:
            parsed.append(json.loads(r))
        except Exception:
            jmatch = re.search(r'(\[.*\]|\{.*\})', r, flags=re.S)
            if jmatch:
                try:
                    parsed.append(json.loads(jmatch.group(1)))
                except Exception:
                    parsed.append(r)
            else:
                parsed.append(r)
    # majority filter simplistic: keep items that appear >= ceil(votes/2)
    counts = Counter()
    items_by_resp = []
    for p in parsed:
        s = json.dumps(p, sort_keys=True) if not isinstance(p, str) else p
        counts[s] += 1
        items_by_resp.append(s)
    threshold = math.ceil(votes/2)
    kept = []
    for item, c in counts.items():
        if c >= threshold:
            try:
                kept.append(json.loads(item))
            except Exception:
                kept.append(item)
    return kept


In [6]:
# MCeT-A implementation: requirement split, holistic, class-atom, req-atom checks

def mceT_split_requirements(requirements, votes=5):
    kept = llm_vote('P_split', {'requirements': requirements}, votes=votes)
    if kept:
        # expect first kept to be a list
        for k in kept:
            if isinstance(k, list):
                return k
        # fallback: coerce first
        first = kept[0]
        if isinstance(first, str):
            return [s.strip() for s in first.split('\n') if s.strip()]
    # fallback naive splitting
    sents = re.split(r'(?<=[\.\n])\s+', requirements.strip())
    return [s for s in sents if len(s)>10]


def mceT_holistic(requirements, plantuml, votes=5):
    kept = llm_vote('P_holistic', {'requirements': requirements, 'diagram': plantuml}, votes=votes)
    if kept:
        # expect list-of-issues
        for k in kept:
            if isinstance(k, list):
                return k
    return []


def mceT_class_atoms(requirements, plantuml, diag, votes=5):
    results = {}
    for cname, node in diag.classes.items():
        atom = f"Class {cname} attrs: {[a[0] for a in node.attributes]} methods: {[m[0] for m in node.methods]}"
        kept = llm_vote('P_class_atom', {'requirements': requirements, 'diagram': plantuml, 'atom': atom}, votes=votes)
        results[cname] = kept[0] if kept else { 'exists_in_requirements': False, 'missing_attributes':[], 'incorrect_attribute_types':[], 'missing_relationships':[], 'spurious': False, 'explanation': '' }
    return results


def mceT_requirement_atoms_check(req_atoms, plantuml, votes=5):
    out = {}
    # for ra in req_atoms:
    #     kept = llm_vote('P_req_atom_check', {'req': ra, 'diagram': plantuml}, votes=votes)
    #     out[ra] = kept[0] if kept else { 'req_atom': ra, 'implemented': False, 'missing_elements': [], 'incorrect_elements': [], 'explanation': '' }
    # return out
    for ra in req_atoms:
    # Normalize ra (requirement atom) to a string
        if isinstance(ra, dict):
            ra_text = ra.get("req_atom") or ra.get("text") or json.dumps(ra)
        else:
            ra_text = str(ra)
        
        kept = llm_vote('P_req_atom_check', {'req': ra_text, 'diagram': plantuml}, votes=votes)
        
        out[ra_text] = kept[0] if kept else {
            'req_atom': ra_text,
            'implemented': False,
            'missing_elements': [],
            'incorrect_elements': [],
            'explanation': ''
        }

    return out



In [7]:
# MCeT-X: authority-based cross-check

def mceT_cross_check(holistic_issues, class_atom_results, req_atom_results):
    # Build confirmed set from requirement atoms implemented==True
    confirmed = set()
    for ra, info in req_atom_results.items():
        try:
            if info.get('implemented', False):
                # naive: extract capitalized tokens as class names
                for c in re.findall(r"\b[A-Z][A-Za-z0-9_]{1,30}\b", ra):
                    confirmed.add(('class', c))
        except Exception:
            pass
    kept = []
    for issue in holistic_issues:
        elem = issue.get('element','')
        etype = issue.get('element_type','')
        if elem and ('class', elem) in confirmed:
            # drop issue because requirement atom confirmed it
            continue
        kept.append(issue)
    # class atom derived issues
    for cname, cres in class_atom_results.items():
        if cres.get('spurious'):
            if ('class', cname) in confirmed:
                continue
            kept.append({'type':'accuracy','element_type':'class','element':cname,'explanation':cres.get('explanation','')})
        for ma in cres.get('missing_attributes', []):
            kept.append({'type':'completeness','element_type':'attribute','element':f"{cname}.{ma}", 'explanation': f"missing {ma}"})
        for it in cres.get('incorrect_attribute_types', []):
            name = it.get('name') if isinstance(it, dict) else str(it)
            kept.append({'type':'accuracy','element_type':'attribute','element':f"{cname}.{name}", 'explanation': 'incorrect type'})
    return kept


In [8]:
# Runner: Integrate CoT parse -> MCeT pipeline

def run_cot_mcet(requirements_text: str, cot_text: str, votes=5):
    # Parse CoT textual output
    diag = parse_cot_text(cot_text)
    plant = diagram_to_plantuml(diag)
    print("Parsed diagram with classes:", list(diag.classes.keys()))

    # MCeT-A: split requirements -> atoms
    req_atoms = mceT_split_requirements(requirements_text, votes=votes)
    print(f"Generated {len(req_atoms)} requirement atoms")

    # Holistic check
    holistic = mceT_holistic(requirements_text, plant, votes=votes)
    print(f"Holistic issues found (raw): {len(holistic)}")

    # Class-atom checks
    class_atoms = mceT_class_atoms(requirements_text, plant, diag, votes=votes)
    print("Completed class-atom checks")

    # Requirement-atom checks
    req_checks = mceT_requirement_atoms_check(req_atoms, plant, votes=votes)
    print("Completed requirement-atom checks")

    # MCeT-X cross-check
    final_issues = mceT_cross_check(holistic, class_atoms, req_checks)
    print(f"After cross-checking, {len(final_issues)} issues remain")

    # Return structured report (no evaluation metrics)
    return {
        'diagram': diag,
        'plantuml': plant,
        'req_atoms': req_atoms,
        'holistic_issues': holistic,
        'class_atom_results': class_atoms,
        'req_atom_results': req_checks,
        'cross_checked_issues': final_issues
    }




In [9]:
cot_text = """ 
CLASS: Movie
Description: Represents a movie available in the movie-shop's catalogue.

ATTRIBUTES:
- title: String - Title of the movie
- movieId: int - Unique identifier for the movie
- genre: String - Genre of the movie
- releaseDate: Date - Release date of the movie
- director: String - Director of the movie
- cast: String - Cast of the movie
- availableQuantity: int - Number of copies available in the store

METHODS:
- getTitle(): String - Returns the title of the movie
- getMovieId(): int - Returns the unique movie ID
- getGenre(): String - Returns the genre of the movie
- getReleaseDate(): Date - Returns the release date of the movie
- getDirector(): String - Returns the director of the movie
- getCast(): String - Returns the cast of the movie
- getAvailableQuantity(): int - Returns the available quantity of the movie
- setAvailableQuantity(quantity: int): void - Sets the available quantity of the movie

RELATIONSHIPS:
- Movie belongs to Catalogue [*] ---- [1] (aggregation)
- Movie is included in Order [1] ---- [1] (aggregation)
- Movie is related to MovieShop [*] ---- [1] (aggregation)

---
CLASS: User
Description: Represents a user of the movie-shop.

ATTRIBUTES:
- userId: int - Unique identifier for the user
- name: String - Name of the user
- address: String - Address of the user
- email: String - Email of the user

METHODS:
- getUserId(): int - Returns the user ID
- getName(): String - Returns the name of the user
- getAddress(): String - Returns the address of the user
- getEmail(): String - Returns the email of the user

RELATIONSHIPS:
- User places Order [*] ---- [1] (association)
- User is related to MovieShop [*] ---- [1] (aggregation)

---
CLASS: Subscriber
Description: Represents a subscriber, a special type of user who can rent movies.

ATTRIBUTES:
- userId: int - Unique identifier for the subscriber (inherited from User)
- name: String - Name of the subscriber (inherited from User)
- address: String - Address of the subscriber (inherited from User)
- email: String - Email of the subscriber (inherited from User)
- subscriptionStartDate: Date - Start date of the subscription
- subscriptionEndDate: Date - End date of the subscription
- rechargeableCard: RechargeableCard - The subscriber's rechargeable card

METHODS:
- getUserId(): int - Returns the user ID (inherited from User)
- getName(): String - Returns the name of the subscriber (inherited from User)
- getAddress(): String - Returns the address of the subscriber (inherited from User)
- getEmail(): String - Returns the email of the subscriber (inherited from User)
- getSubscriptionStartDate(): Date - Returns the subscription start date
- getSubscriptionEndDate(): Date - Returns the subscription end date
- getRechargeableCard(): RechargeableCard - Returns the rechargeable card
- rentMovie(movie: Movie): void - Allows the subscriber to rent a movie
- renewSubscription(): void - Renews the subscriber's subscription

RELATIONSHIPS:
- Subscriber is a User (inheritance)
- Subscriber has a RechargeableCard [1] ----◆ [1] (composition)
- Subscriber rents Movie [*] ---- [1] (association)

---
CLASS: RechargeableCard
Description: Represents a rechargeable card used by subscribers for payment.

ATTRIBUTES:
- cardId: int - Unique identifier for the card
- credit: double - Amount of credit available on the card

METHODS:
- getCardId(): int - Returns the card ID
- getCredit(): double - Returns the amount of credit
- addCredit(amount: double): void - Adds credit to the card
- deductCredit(amount: double): void - Deducts credit from the card

RELATIONSHIPS:
- RechargeableCard belongs to Subscriber [1] ----◆ [1] (composition)

---
CLASS: Order
Description: Represents an order placed by a user or subscriber.

ATTRIBUTES:
- orderId: int - Unique identifier for the order
- user: User - The user who placed the order
- movie: Movie - The movie that was ordered
- orderDate: Date - Date and time the order was placed
- totalAmount: double - Total amount of the order

METHODS:
- getOrderId(): int - Returns the order ID
- getUser(): User - Returns the user who placed the order
- getMovie(): Movie - Returns the movie in the order
- getOrderDate(): Date - Returns the order date
- getTotalAmount(): double - Returns the total amount
- calculateTotal(): double - Calculates the total amount of the order

RELATIONSHIPS:
- Order is placed by User [*] ---- [1] (association)
- Order contains Movie [1] ---- [1] (aggregation)

---
CLASS: MovieShop
Description: Represents the store and its main operations.

ATTRIBUTES:
- movies: List<Movie> - List of movies in the store
- users: List<User> - List of users of the store

METHODS:
- addMovie(movie: Movie): void - Adds a movie to the store
- removeMovie(movie: Movie): void - Removes a movie from the store
- addUser(user: User): void - Adds a user to the store
- removeUser(user: User): void - Removes a user from the store
- createOrder(user: User, movie: Movie): void - Creates an order for a user

RELATIONSHIPS:
- MovieShop manages Movies [1] ---- [*] (aggregation)
- MovieShop manages Users [1] ---- [*] (aggregation)
"""

requirements_text = """ ♣ Design a system for a movie-shop, in order to handle ordering of movies and browsing of the catalogue of the store, and user subscriptions with rechargeable cards. ♣ Only subscribers are allowed hiring movies with their own card. ♣ Credit is updated on the card during rent operations. ♣ Both users and subscribers can buy a movie and their data are saved in the related order. ♣ When a movie is not available it is ordered . """
report = run_cot_mcet(requirements_text, cot_text, votes=2)
print(report.keys())

Parsed diagram with classes: ['Movie', 'User', 'Subscriber', 'RechargeableCard', 'Order', 'MovieShop']
Generated 6 requirement atoms
Holistic issues found (raw): 3
Completed class-atom checks
Completed requirement-atom checks
After cross-checking, 4 issues remain
dict_keys(['diagram', 'plantuml', 'req_atoms', 'holistic_issues', 'class_atom_results', 'req_atom_results', 'cross_checked_issues'])


## How to use

1. Ensure you have an LLM caller available. Replace `call_llm()` in the notebook with your API function or ensure `gemini_caller(prompt)` is defined.
2. Provide `requirements_text` (string) and `cot_text` (string) variables, e.g. by reading from files or pasting them into cells.
3. Run the `run_cot_mcet()` function to obtain the MCeT report. The notebook does not compute precision/recall; it prepares the MCeT pipeline outputs ready for later evaluation.

If you want, I can now:
- Wire `call_llm()` to your existing `gemini_caller` implementation in the uploaded notebook and run the pipeline on one example, OR
- Add richer few-shot examples inside the PROMPTS to make the checks more deterministic.
