In [None]:
!pip -q install -U transformers accelerate sentencepiece spacy
!python -m spacy download en_core_web_sm -q
!pip -q install -U transformers accelerate sentencepiece
!pip -q install -U sentence-transformers scikit-learn joblib matplotlib
!pip -q install -U transformers accelerate sentencepiece
!pip -q install -U transformers accelerate sentencepiece spacy
!python -m spacy download en_core_web_sm -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m32.6 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.8/52.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.9/8.9 MB[0m [31m56.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m48.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m103.9 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and instal

In [1]:
# SprintPilotAI FR01 - CELL 1
# One-time setup + model loading


import json
import re
from datetime import datetime, timezone
from typing import Any, Dict, List

import spacy
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM


# Setup (NLP + Local LLM)

nlp = spacy.load("en_core_web_sm")

MODEL_NAME = "google/flan-t5-large"  # FREE local model from Hugging Face
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

print(" Loaded:", MODEL_NAME, "| device:", device)



# Basic text cleaning

def clean_text(text: str) -> str:
    if not isinstance(text, str):
        raise ValueError("Goal must be a string.")
    text = text.strip()
    text = re.sub(r"\s+", " ", text)
    # keep useful punctuation + currency symbols
    text = re.sub(r"[^\w\s\-,.()/%:$€£]", "", text)
    return text



#  spaCy feature extraction

def spacy_features(goal: str) -> Dict[str, Any]:
    doc = nlp(goal)

    entities = [
        {"text": ent.text, "label": ent.label_}
        for ent in doc.ents
        if ent.label_ in {"DATE", "TIME", "MONEY", "ORG", "PRODUCT", "GPE", "NORP", "EVENT"}
    ]

    noun_chunks: List[str] = []
    for chunk in doc.noun_chunks:
        t = chunk.text.strip().lower()
        if len(t) >= 3 and not t.isdigit():
            noun_chunks.append(t)

    keywords: List[str] = []
    for tok in doc:
        if tok.is_stop or tok.is_punct:
            continue
        if not (tok.is_alpha or tok.like_num):
            continue
        lemma = tok.lemma_.lower().strip()
        if len(lemma) >= 3:
            keywords.append(lemma)

    def uniq(seq: List[str]) -> List[str]:
        seen = set()
        out = []
        for x in seq:
            if x not in seen:
                seen.add(x)
                out.append(x)
        return out

    return {
        "entities": entities,
        "noun_chunks": uniq(noun_chunks)[:20],
        "keywords": uniq(keywords)[:25],
    }



#  Required FR01 Output Template (for FR02 input)

def required_template(clean_goal: str) -> Dict[str, Any]:
    return {
        "goal_id": "GOAL-REPLACED-LATER",
        "timestamp_utc": "REPLACED-LATER",
        "raw_goal": "REPLACED-LATER",
        "clean_goal": clean_goal,

        "domain": "Other",
        "problem_summary": "",
        "deliverables": [],
        "assumptions": [],

        "constraints": {
            "time_text": None,
            "time_days_approx": None,
            "budget_text": None,
            "budget_value_raw": None,
            "platform_hints": [],
            "non_functional": []
        },

        "stakeholders": [],
        "features_high_level": [],
        "keywords": [],

        "confidence": 0.75,
        "validation_error": None
    }



# Deterministic (reliable) constraint extraction

TIME_RE = re.compile(r"\b(?:in|within|by|for)\s+(\d+(?:\.\d+)?)\s*(day|days|week|weeks|month|months|year|years)\b", re.I)

PLATFORM_RE = re.compile(
    r"\b(android|ios|mobile|web|website|frontend|backend|api|cloud|desktop|windows|linux|mac)\b",
    re.I
)

BUDGET_RE = re.compile(
    r"\b(?:budget|cost|under|below|less than)\s*(?:is|:)?\s*([$€£]?\s*\d+(?:,\d{3})*(?:\.\d+)?)\b",
    re.I
)

def normalize_time_to_days(value: float, unit: str) -> int:
    unit = unit.lower()
    if unit.startswith("day"):
        return int(round(value))
    if unit.startswith("week"):
        return int(round(value * 7))
    if unit.startswith("month"):
        return int(round(value * 30))
    if unit.startswith("year"):
        return int(round(value * 365))
    return int(round(value))

def extract_time_platform_budget(clean_goal: str) -> Dict[str, Any]:
    out = {
        "time_text": None,
        "time_days_approx": None,
        "platform_hints": [],
        "budget_text": None,
        "budget_value_raw": None
    }

    m = TIME_RE.search(clean_goal)
    if m:
        num = float(m.group(1))
        unit = m.group(2)
        out["time_text"] = f"{m.group(1)} {unit}"
        out["time_days_approx"] = normalize_time_to_days(num, unit)

    plats = sorted({p.lower() for p in PLATFORM_RE.findall(clean_goal)})
    out["platform_hints"] = plats

    b = BUDGET_RE.search(clean_goal)
    if b:
        out["budget_text"] = b.group(0)
        out["budget_value_raw"] = b.group(1).replace(" ", "")

    return out

def classify_domain_rule(clean_goal: str) -> str:
    g = clean_goal.lower()

    if any(x in g for x in ["android", "ios", "mobile app", "mobile application", "mobile"]):
        return "Mobile App"

    if any(x in g for x in ["web", "website", "web app", "frontend", "dashboard"]):
        return "Web Application"

    if any(x in g for x in ["api", "rest", "graphql", "microservice", "backend"]):
        return "API/Backend"

    if any(x in g for x in ["desktop", "windows app", "linux app", "mac app", "electron"]):
        return "Desktop Application"

    if any(x in g for x in ["machine learning", "ml", "ai", "model", "prediction", "nlp", "llm"]):
        return "ML/AI System"

    return "Other"



# FREE local LLM enrichment (FLAN-T5)

def llm_enrich_fields(clean_goal: str) -> Dict[str, Any]:
    prompt = f"""
You are SprintPilotAI FR01.
Given the goal, produce ONLY these lines (no extra text):

SUMMARY: <one sentence>
DELIVERABLES: <3-6 items separated by ';'>
FEATURES: <3-8 items separated by ';'>
STAKEHOLDERS: <2-6 items separated by ';'>
NON_FUNCTIONAL: <0-6 items separated by ';'>
ASSUMPTIONS: <0-6 items separated by ';'>
CONFIDENCE: <0.0 to 1.0>
VALIDATION_ERROR: <null or short message>

Now do the same for:
Goal: {clean_goal}
""".strip()

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=260,
            do_sample=False
        )

    text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    kv = {}
    for ln in text.splitlines():
        if ":" in ln:
            k, v = ln.split(":", 1)
            kv[k.strip().upper()] = v.strip()

    return kv



# FR01 main parsing

def fr01_parse_goal(goal_text: str) -> Dict[str, Any]:
    raw_goal = goal_text
    cleaned = clean_text(goal_text)

    parsed = required_template(cleaned)

    parsed["goal_id"] = f"GOAL-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
    parsed["timestamp_utc"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    parsed["raw_goal"] = raw_goal
    parsed["clean_goal"] = cleaned

    hints = spacy_features(cleaned)
    parsed["keywords"] = hints.get("keywords", [])

    parsed["domain"] = classify_domain_rule(cleaned)

    tp = extract_time_platform_budget(cleaned)
    parsed["constraints"]["time_text"] = tp["time_text"]
    parsed["constraints"]["time_days_approx"] = tp["time_days_approx"]
    parsed["constraints"]["platform_hints"] = tp["platform_hints"]
    parsed["constraints"]["budget_text"] = tp["budget_text"]
    parsed["constraints"]["budget_value_raw"] = tp["budget_value_raw"]

    kv = llm_enrich_fields(cleaned)

    parsed["problem_summary"] = kv.get("SUMMARY") or "Parsed project goal into structured fields for FR02."

    if kv.get("DELIVERABLES"):
        parsed["deliverables"] = [x.strip() for x in kv["DELIVERABLES"].split(";") if x.strip()]

    if kv.get("FEATURES"):
        parsed["features_high_level"] = [x.strip() for x in kv["FEATURES"].split(";") if x.strip()]

    if kv.get("STAKEHOLDERS"):
        parsed["stakeholders"] = [x.strip() for x in kv["STAKEHOLDERS"].split(";") if x.strip()]

    if kv.get("NON_FUNCTIONAL"):
        parsed["constraints"]["non_functional"] = [x.strip() for x in kv["NON_FUNCTIONAL"].split(";") if x.strip()]

    if kv.get("ASSUMPTIONS"):
        parsed["assumptions"] = [x.strip() for x in kv["ASSUMPTIONS"].split(";") if x.strip()]

    if kv.get("CONFIDENCE"):
        try:
            parsed["confidence"] = max(0.0, min(1.0, float(kv["CONFIDENCE"])))
        except Exception:
            parsed["confidence"] = 0.75

    verr = (kv.get("VALIDATION_ERROR") or "null").strip().lower()
    parsed["validation_error"] = None if verr in {"null", "none", ""} else kv.get("VALIDATION_ERROR")

    if len(cleaned) < 10:
        parsed["validation_error"] = "Goal is too short. Add more details."

    if parsed["constraints"]["time_text"] is None and parsed["validation_error"] is None:
        parsed["validation_error"] = "Consider adding a timeline (e.g., within 8 weeks) for better planning."

    return parsed



# Save output

def save_goal_json(parsed: Dict[str, Any], filepath: str = "fr01_goal_output.json") -> None:
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(parsed, f, indent=2, ensure_ascii=False)

print(" Cell 1 ready. Run Cell 2 to enter a goal and generate FR01 JSON.")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/662 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/3.13G [00:00<?, ?B/s]

Loading weights:   0%|          | 0/558 [00:00<?, ?it/s]



generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

 Loaded: google/flan-t5-large | device: cpu
 Cell 1 ready. Run Cell 2 to enter a goal and generate FR01 JSON.


In [2]:

# SprintPilotAI FR02 - CELL 1
# One-time setup + model loading

import json
import re
from datetime import datetime, timezone
from typing import Dict, Any, List, Tuple

import spacy
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM


# Setup (NLP + Local LLM)

nlp = spacy.load("en_core_web_sm")

MODEL_NAME = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

print("FR02 Loaded:", MODEL_NAME, "| device:", device)


# Load FR01 output

def load_fr01_json(filepath: str = "fr01_goal_output.json") -> Dict[str, Any]:
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)

# Helpers

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def uniq_keep_order(items: List[str]) -> List[str]:
    seen = set()
    out = []
    for x in items:
        if x not in seen:
            seen.add(x)
            out.append(x)
    return out

def guess_epic(domain: str) -> str:
    if domain == "Web Application":
        return "Web App Delivery"
    if domain == "Mobile App":
        return "Mobile App Delivery"
    if domain == "API/Backend":
        return "Backend/API Delivery"
    if domain == "Desktop Application":
        return "Desktop App Delivery"
    if domain == "ML/AI System":
        return "ML System Delivery"
    return "General Project Delivery"

def safe_int(x, default=3):
    try:
        return int(float(x))
    except Exception:
        return default

# Deterministic fallback templates

BASE_TASKS = [
    ("Requirements & Scope", 3, []),
    ("Architecture & Tech Stack", 3, ["Requirements & Scope"]),
    ("Project Setup (Repo, CI, Environments)", 2, ["Architecture & Tech Stack"]),
    ("Testing Strategy & QA Plan", 2, ["Requirements & Scope"]),
    ("Deployment Plan", 2, ["Architecture & Tech Stack"]),
]

DOMAIN_TASKS = {
    "Web Application": [
        ("UI/UX Wireframes", 3, ["Requirements & Scope"]),
        ("Frontend Development", 8, ["UI/UX Wireframes", "Project Setup (Repo, CI, Environments)"]),
        ("Backend Development", 8, ["Architecture & Tech Stack", "Project Setup (Repo, CI, Environments)"]),
        ("Integration (Frontend + Backend)", 4, ["Frontend Development", "Backend Development"]),
    ],
    "Mobile App": [
        ("UI/UX Wireframes", 3, ["Requirements & Scope"]),
        ("Mobile App Development", 10, ["UI/UX Wireframes", "Project Setup (Repo, CI, Environments)"]),
        ("Backend/API Integration", 5, ["Mobile App Development"]),
    ],
    "API/Backend": [
        ("API Design (Endpoints, Contracts)", 4, ["Architecture & Tech Stack"]),
        ("API Implementation", 8, ["API Design (Endpoints, Contracts)", "Project Setup (Repo, CI, Environments)"]),
        ("API Documentation", 2, ["API Design (Endpoints, Contracts)"]),
    ],
    "Desktop Application": [
        ("UI/UX Wireframes", 3, ["Requirements & Scope"]),
        ("Desktop App Development", 10, ["UI/UX Wireframes", "Project Setup (Repo, CI, Environments)"]),
    ],
    "ML/AI System": [
        ("Dataset Collection", 5, ["Requirements & Scope"]),
        ("Data Cleaning & Preprocessing", 5, ["Dataset Collection"]),
        ("Model Training Baseline", 7, ["Data Cleaning & Preprocessing"]),
        ("Model Evaluation & Metrics", 4, ["Model Training Baseline"]),
        ("Model Packaging (Save/Load + Inference)", 3, ["Model Evaluation & Metrics"]),
    ],
}

FEATURE_TASKS = {
    "authentication": [
        ("User Authentication (Login/Signup)", 4, ["Backend Development"]),
        ("Auth UI Screens", 3, ["Frontend Development"]),
    ],
    "login": [
        ("User Authentication (Login/Signup)", 4, ["Backend Development"]),
        ("Auth UI Screens", 3, ["Frontend Development"]),
    ],
    "payment": [
        ("Payment Gateway Integration", 5, ["Backend Development"]),
        ("Payment UI Flow", 3, ["Frontend Development"]),
    ],
    "database": [
        ("Database Schema Design", 3, ["Architecture & Tech Stack"]),
        ("Database Implementation & Migrations", 3, ["Database Schema Design"]),
    ],
    "dashboard": [
        ("Dashboard UI", 4, ["Frontend Development"]),
        ("Dashboard Backend APIs", 3, ["Backend Development"]),
    ],
    "prediction": [
        ("Define Prediction Target + Features", 3, ["Requirements & Scope"]),
        ("Train Prediction Model", 6, ["Data Cleaning & Preprocessing"]),
    ],
    "deployment": [
        ("Deploy to Staging", 2, ["Integration (Frontend + Backend)", "Deployment Plan"]),
        ("Deploy to Production", 2, ["Deploy to Staging"]),
    ],
}

def add_feature_tasks(keywords: List[str]) -> List[Tuple[str, int, List[str]]]:
    tasks: List[Tuple[str, int, List[str]]] = []
    kw_set = set([k.lower() for k in keywords])
    for k, tlist in FEATURE_TASKS.items():
        if k in kw_set:
            tasks.extend(tlist)
    return tasks

def build_tasks_from_templates(templates: List[Tuple[str, int, List[str]]]) -> List[Dict[str, Any]]:
    tasks = []
    for i, (title, est, deps) in enumerate(templates, start=1):
        tasks.append({
            "id": f"T-{i:03d}",
            "title": title,
            "estimate_days": est,
            "depends_on": deps,
            "type": "task",
            "source": "fallback_templates"
        })
    return tasks

def task_titles(task_list: List[Dict[str, Any]]) -> set:
    return {t["title"] for t in task_list}

def ensure_dependency_exists(task_list: List[Dict[str, Any]], dep: str) -> None:
    existing = task_titles(task_list)
    if dep not in existing:
        task_list.append({
            "id": f"T-{len(task_list)+1:03d}",
            "title": dep,
            "estimate_days": 3,
            "depends_on": [],
            "type": "auto_added_dependency",
            "source": "dependency_fix"
        })

def fix_dependencies(task_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    titles = task_titles(task_list)
    for t in task_list:
        for dep in t.get("depends_on", []):
            if dep not in titles:
                ensure_dependency_exists(task_list, dep)
                titles = task_titles(task_list)
    return task_list


# LLM-based decomposition

def llm_generate_tasks(fr01: Dict[str, Any]) -> List[Dict[str, Any]]:
    domain = fr01.get("domain", "Other")
    goal = fr01.get("raw_goal", "")
    keywords = fr01.get("keywords", [])
    constraints = fr01.get("constraints", {})

    prompt = f"""
You are SprintPilotAI FR02.
Decompose the project goal into sprint tasks.

Return ONLY lines in this exact format:
TASK: <title> | <estimate_days_integer> | <dependency1,dependency2 or none>

Rules:
- Produce 8 to 14 tasks.
- Dependencies must refer to earlier tasks by title (or 'none').
- Estimates should be small integers (1-10).
- Include at least: Requirements, Design/Wireframes, Development, Testing, Deployment.
- Use the domain to choose tasks: {domain}

Goal: {goal}
Keywords: {", ".join(keywords[:20])}
Time constraint (days): {constraints.get("time_days_approx")}

Example lines:
TASK: Requirements & Scope | 3 | none
TASK: UI/UX Wireframes | 3 | Requirements & Scope
""".strip()

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(device)
    with torch.no_grad():
        out = model.generate(**inputs, max_new_tokens=360, do_sample=False)

    text = tokenizer.decode(out[0], skip_special_tokens=True).strip()

    tasks: List[Dict[str, Any]] = []
    for line in text.splitlines():
        line = line.strip()
        if not line.startswith("TASK:"):
            continue
        try:
            rest = line[len("TASK:"):].strip()
            parts = [p.strip() for p in rest.split("|")]
            if len(parts) != 3:
                continue
            title = parts[0]
            days = safe_int(parts[1], default=3)
            deps_raw = parts[2].strip().lower()

            if deps_raw in {"none", "null", ""}:
                deps = []
            else:
                deps = [d.strip() for d in parts[2].split(",") if d.strip()]

            tasks.append({
                "id": f"T-{len(tasks)+1:03d}",
                "title": title,
                "estimate_days": max(1, min(10, days)),
                "depends_on": deps,
                "type": "task",
                "source": "llm"
            })
        except Exception:
            continue

    return tasks


# Merge LLM tasks with fallback

def fallback_decompose(fr01: Dict[str, Any]) -> List[Dict[str, Any]]:
    domain = fr01.get("domain", "Other")
    keywords = fr01.get("keywords", [])

    templates: List[Tuple[str, int, List[str]]] = []
    templates.extend(BASE_TASKS)
    templates.extend(DOMAIN_TASKS.get(domain, []))
    templates.extend(add_feature_tasks(keywords))

    seen = set()
    uniq_templates = []
    for t in templates:
        if t[0] not in seen:
            seen.add(t[0])
            uniq_templates.append(t)

    tasks = build_tasks_from_templates(uniq_templates)
    tasks = fix_dependencies(tasks)
    return tasks

def choose_tasks(fr01: Dict[str, Any]) -> List[Dict[str, Any]]:
    llm_tasks = llm_generate_tasks(fr01)

    if len(llm_tasks) < 6:
        return fallback_decompose(fr01)

    llm_tasks = fix_dependencies(llm_tasks)
    return llm_tasks


# FR02 main function

def fr02_decompose(fr01: Dict[str, Any]) -> Dict[str, Any]:
    domain = fr01.get("domain", "Other")
    keywords = fr01.get("keywords", [])
    constraints = fr01.get("constraints", {})

    epic = guess_epic(domain)
    tasks = choose_tasks(fr01)

    time_days = constraints.get("time_days_approx")
    if isinstance(time_days, int) and time_days > 0:
        sprint_count = max(1, int(round(time_days / 14)))
    else:
        sprint_count = None

    output = {
        "decomposition_id": f"DEC-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}",
        "timestamp_utc": utc_now_iso(),
        "source_goal_id": fr01.get("goal_id"),
        "epic": epic,
        "domain": domain,
        "original_goal": fr01.get("raw_goal"),
        "keywords_used": keywords,
        "constraints": constraints,
        "estimated_sprint_count_approx": sprint_count,
        "tasks": tasks,
    }
    return output


# Save FR02 output

def save_fr02_json(fr02_output: Dict[str, Any], filepath: str = "fr02_tasks_output.json") -> None:
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(fr02_output, f, indent=2, ensure_ascii=False)

print(" Cell 1 ready. Run Cell 2 to load FR01 JSON and generate FR02 tasks.")


Loading weights:   0%|          | 0/558 [00:00<?, ?it/s]



FR02 Loaded: google/flan-t5-large | device: cpu
 Cell 1 ready. Run Cell 2 to load FR01 JSON and generate FR02 tasks.


In [3]:

# SprintPilotAI FR03 - CELL 1
# One-time setup + model loading


# If needed in Colab (run once):
# !pip -q install -U transformers accelerate sentencepiece

import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Tuple, Set

import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM


# 0) Load FREE local LLM (FLAN-T5)

MODEL_NAME = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

print("FR03 Loaded:", MODEL_NAME, "| device:", device)


# Load FR02 output

def load_fr02_json(filepath: str = "fr02_tasks_output.json") -> Dict[str, Any]:
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)


# Helpers

def save_fr03_json(fr03_output: Dict[str, Any], filepath: str = "fr03_sprint_plan_output.json") -> None:
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(fr03_output, f, indent=2, ensure_ascii=False)

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def build_maps(tasks: List[Dict[str, Any]]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, str]]:
    task_by_id = {t["id"]: t for t in tasks}
    id_by_title = {}
    for t in tasks:
        title = t.get("title", "").strip()
        if title:
            id_by_title[title] = t["id"]
    return task_by_id, id_by_title

def toposort_task_ids(tasks: List[Dict[str, Any]]) -> List[str]:
    task_by_id, id_by_title = build_maps(tasks)

    dependents: Dict[str, Set[str]] = {tid: set() for tid in task_by_id}
    indegree: Dict[str, int] = {tid: 0 for tid in task_by_id}

    for t in tasks:
        tid = t["id"]
        deps_titles = t.get("depends_on", [])
        for dep_title in deps_titles:
            dep_id = id_by_title.get(dep_title)
            if dep_id is None:
                continue
            dependents[dep_id].add(tid)
            indegree[tid] += 1

    queue = [tid for tid, deg in indegree.items() if deg == 0]
    queue.sort()

    order = []
    while queue:
        node = queue.pop(0)
        order.append(node)
        for nxt in sorted(dependents[node]):
            indegree[nxt] -= 1
            if indegree[nxt] == 0:
                queue.append(nxt)
                queue.sort()

    # If cycle or mismatch, fall back to ID sort
    if len(order) != len(tasks):
        return sorted(task_by_id.keys())

    return order

def deps_satisfied(task: Dict[str, Any], done_titles: Set[str]) -> bool:
    for dep in task.get("depends_on", []):
        if dep not in done_titles:
            return False
    return True

def split_task_if_needed(task: Dict[str, Any], sprint_capacity: int) -> List[Dict[str, Any]]:
    est = int(task.get("estimate_days", 1))
    if est <= sprint_capacity:
        return [task]

    parts = []
    remaining = est
    part_num = 1
    base_id = task["id"]
    base_title = task["title"]

    prev_part_title = None
    while remaining > 0:
        chunk = min(remaining, sprint_capacity)
        new_title = f"{base_title} (Part {part_num})"
        new_id = f"{base_id}P{part_num}"

        if part_num == 1:
            new_deps = task.get("depends_on", []).copy()
        else:
            new_deps = [prev_part_title] if prev_part_title else []

        parts.append({
            "id": new_id,
            "title": new_title,
            "estimate_days": chunk,
            "depends_on": new_deps,
            "type": task.get("type", "task"),
            "split_from": base_id,
        })

        prev_part_title = new_title
        remaining -= chunk
        part_num += 1

    return parts



# LLM enrichment (per sprint)

def llm_enrich_sprint(goal: str, sprint_id: str, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
    task_titles = [t["title"] for t in tasks]

    prompt = f"""
Write sprint notes for a software sprint.

Return ONLY 4 lines exactly:
SPRINT_GOAL: <short sentence>
RATIONALE: <short sentence>
RISKS: <risk1; risk2; risk3>
MITIGATIONS: <mitigation1; mitigation2; mitigation3>

Sprint: {sprint_id}
Goal: {goal}
Tasks: {", ".join(task_titles)}
""".strip()

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(device)
    with torch.no_grad():
        out = model.generate(**inputs, max_new_tokens=140, do_sample=False)

    text = tokenizer.decode(out[0], skip_special_tokens=True).strip()

    kv = {}
    for ln in text.splitlines():
        if ":" in ln:
            k, v = ln.split(":", 1)
            kv[k.strip().upper()] = v.strip()

    def split_list(v: str) -> List[str]:
        return [x.strip() for x in v.split(";") if x.strip()]

    # Fallback if model fails formatting
    if "SPRINT_GOAL" not in kv or "RATIONALE" not in kv:
        base_goal = f"Complete {len(task_titles)} planned tasks for {sprint_id}."
        rationale = "Tasks are grouped by dependency order and sprint capacity."
        risks = ["Underestimation of effort", "Dependency delays", "Scope changes"]
        mitigations = ["Time buffer in sprint", "Daily check on blockers", "Freeze scope for sprint"]

        return {
            "sprint_goal": base_goal,
            "sprint_rationale": rationale,
            "risks": risks,
            "mitigations": mitigations,
            "notes_source": "fallback_notes"
        }

    return {
        "sprint_goal": kv.get("SPRINT_GOAL", ""),
        "sprint_rationale": kv.get("RATIONALE", ""),
        "risks": split_list(kv.get("RISKS", "")),
        "mitigations": split_list(kv.get("MITIGATIONS", "")),
        "notes_source": "llm_local_flan_t5"
    }



# Sprint planning (topological + greedy packing)

def fr03_generate_sprint_plan(
    fr02: Dict[str, Any],
    sprint_length_days: int = 14,
    velocity_days_per_sprint: int = 14,
    enrich_with_llm: bool = True
) -> Dict[str, Any]:
    now_utc = datetime.now(timezone.utc)

    original_tasks = fr02.get("tasks", [])
    if not original_tasks:
        raise ValueError("No tasks found in FR02 JSON.")

    # Split oversized tasks (if any)
    expanded_tasks: List[Dict[str, Any]] = []
    for t in original_tasks:
        expanded_tasks.extend(split_task_if_needed(t, sprint_capacity=velocity_days_per_sprint))

    # Re-toposort after splitting
    order_ids = toposort_task_ids(expanded_tasks)
    task_by_id, _ = build_maps(expanded_tasks)

    # Create sprints greedily
    sprints: List[Dict[str, Any]] = []
    done_titles: Set[str] = set()

    remaining_ids = order_ids.copy()
    sprint_index = 1

    safety = 0
    while remaining_ids:
        safety += 1
        if safety > 1000:
            raise RuntimeError("Scheduling loop exceeded safety limit. Check dependencies.")

        capacity_left = velocity_days_per_sprint
        sprint_tasks: List[Dict[str, Any]] = []
        scheduled_this_sprint = set()

        progress = True
        while progress:
            progress = False
            for tid in list(remaining_ids):
                task = task_by_id[tid]
                title = task["title"]
                est = int(task.get("estimate_days", 1))

                if title in done_titles or title in scheduled_this_sprint:
                    continue
                if not deps_satisfied(task, done_titles.union(scheduled_this_sprint)):
                    continue

                if est <= capacity_left:
                    sprint_tasks.append(task)
                    capacity_left -= est
                    scheduled_this_sprint.add(title)
                    remaining_ids.remove(tid)
                    progress = True

        # If nothing fit, force schedule one task
        if not sprint_tasks and remaining_ids:
            tid = remaining_ids.pop(0)
            task = task_by_id[tid]
            sprint_tasks.append(task)
            scheduled_this_sprint.add(task["title"])
            capacity_left = max(0, capacity_left - int(task.get("estimate_days", 1)))

        for t in sprint_tasks:
            done_titles.add(t["title"])

        sprint_obj = {
            "sprint_id": f"SPRINT-{sprint_index:02d}",
            "sprint_length_days": sprint_length_days,
            "capacity_days": velocity_days_per_sprint,
            "used_days": sum(int(t.get("estimate_days", 1)) for t in sprint_tasks),
            "remaining_capacity_days": capacity_left,
            "tasks": [
                {
                    "id": t["id"],
                    "title": t["title"],
                    "estimate_days": int(t.get("estimate_days", 1)),
                    "depends_on": t.get("depends_on", []),
                    "type": t.get("type", "task"),
                    **({"split_from": t["split_from"]} if "split_from" in t else {})
                }
                for t in sprint_tasks
            ]
        }

        # LLM enrichment
        if enrich_with_llm:
            enrichment = llm_enrich_sprint(
                goal=fr02.get("original_goal", ""),
                sprint_id=sprint_obj["sprint_id"],
                tasks=sprint_obj["tasks"]
            )
            sprint_obj.update(enrichment)

        sprints.append(sprint_obj)
        sprint_index += 1

    total_days = sum(s["used_days"] for s in sprints)

    return {
        "sprint_plan_id": f"PLAN-{now_utc.strftime('%Y%m%d%H%M%S')}",
        "timestamp_utc": utc_now_iso(),
        "source_decomposition_id": fr02.get("decomposition_id"),
        "source_goal_id": fr02.get("source_goal_id"),
        "domain": fr02.get("domain"),
        "epic": fr02.get("epic"),
        "original_goal": fr02.get("original_goal"),
        "assumptions": {
            "sprint_length_days": sprint_length_days,
            "velocity_days_per_sprint": velocity_days_per_sprint,
            "planning_method": "topological + greedy packing",
            "notes": "Dependencies must be satisfied before a task can be scheduled. Oversized tasks are split."
        },
        "summary": {
            "num_sprints": len(sprints),
            "total_estimated_days": total_days,
            "avg_days_per_sprint": round(total_days / max(1, len(sprints)), 2)
        },
        "sprints": sprints
    }

print(" Cell 1 ready. Run Cell 2 to load FR02 JSON and generate FR03 sprint plan.")


Loading weights:   0%|          | 0/558 [00:00<?, ?it/s]



FR03 Loaded: google/flan-t5-large | device: cpu
 Cell 1 ready. Run Cell 2 to load FR02 JSON and generate FR03 sprint plan.


In [4]:

# SprintPilotAI FR04 - CELL 1
# One-time setup + model loading + functions

import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Tuple

import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM


# FREE local LLM (optional)

MODEL_NAME = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
print(" FR04 Loaded:", MODEL_NAME, "| device:", device)

def llm_suggest_task_changes(
    goal: str,
    existing_task_titles: List[str],
    action: str,
    current_title: str = "",
    current_est: int = 0,
    current_deps: Optional[List[str]] = None
) -> Dict[str, Any]:
    """
    LLM gives suggestions ONLY (never auto-applies).
    Output format lines:
      TITLE: ...
      ESTIMATE_DAYS: ...
      DEPENDS_ON: dep1; dep2; none
      SPRINT_HINT: SPRINT-01 / SPRINT-02 / none
      NOTE: ...
    """
    current_deps = current_deps or []
    titles_short = ", ".join(existing_task_titles[:25])

    prompt = f"""
You are SprintPilotAI FR04 assistant.
You suggest edits to tasks in a sprint plan.

Return ONLY these lines:
TITLE: <suggested title or keep similar>
ESTIMATE_DAYS: <integer 1-30>
DEPENDS_ON: <dep1; dep2; or none>
SPRINT_HINT: <SPRINT-01/SPRINT-02/SPRINT-03 or none>
NOTE: <1 sentence>

Context:
Project goal: {goal}

Existing task titles (choose dependencies only from these titles):
{titles_short}

Action: {action}
Current title: {current_title}
Current estimate days: {current_est}
Current deps: {", ".join(current_deps) if current_deps else "none"}
""".strip()

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(device)
    with torch.no_grad():
        out = model.generate(**inputs, max_new_tokens=160, do_sample=False)

    text = tokenizer.decode(out[0], skip_special_tokens=True).strip()

    kv = {}
    for ln in text.splitlines():
        if ":" in ln:
            k, v = ln.split(":", 1)
            kv[k.strip().upper()] = v.strip()

    # Parse
    title = kv.get("TITLE", "").strip()
    est_raw = kv.get("ESTIMATE_DAYS", "").strip()
    deps_raw = kv.get("DEPENDS_ON", "none").strip().lower()
    sprint_hint = kv.get("SPRINT_HINT", "none").strip()
    note = kv.get("NOTE", "").strip()

    try:
        est = int(float(est_raw))
        if est < 1 or est > 30:
            est = None
    except Exception:
        est = None

    if deps_raw in {"none", "null", ""}:
        deps = []
    else:
        deps = [d.strip() for d in deps_raw.split(";") if d.strip()]

    # Keep only valid deps that exist
    valid_set = set(existing_task_titles)
    deps = [d for d in deps if d in valid_set]

    return {
        "title": title if title else None,
        "estimate_days": est,
        "depends_on": deps,
        "sprint_hint": sprint_hint if sprint_hint else "none",
        "note": note
    }


# IO helpers

def load_json(filepath: str) -> Dict[str, Any]:
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)

def save_json(data: Dict[str, Any], filepath: str) -> None:
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


# Plan utilities

def recompute_sprint_capacity(sprint: Dict[str, Any]) -> None:
    used = 0
    for t in sprint.get("tasks", []):
        if t.get("status") == "removed":
            continue
        used += int(t.get("estimate_days", 1))
    sprint["used_days"] = used
    sprint["remaining_capacity_days"] = max(0, int(sprint.get("capacity_days", 14)) - used)

def recompute_all(plan: Dict[str, Any]) -> None:
    for sp in plan.get("sprints", []):
        recompute_sprint_capacity(sp)

    total = 0
    for sp in plan.get("sprints", []):
        total += int(sp.get("used_days", 0))

    plan.setdefault("summary", {})
    plan["summary"]["num_sprints"] = len(plan.get("sprints", []))
    plan["summary"]["total_estimated_days"] = total
    plan["summary"]["avg_days_per_sprint"] = round(total / max(1, plan["summary"]["num_sprints"]), 2)

def find_task(plan: Dict[str, Any], task_id: str) -> Optional[Dict[str, Any]]:
    for sp in plan.get("sprints", []):
        for t in sp.get("tasks", []):
            if t.get("id") == task_id:
                return t
    return None

def find_task_location(plan: Dict[str, Any], task_id: str) -> Optional[int]:
    for i, sp in enumerate(plan.get("sprints", [])):
        for t in sp.get("tasks", []):
            if t.get("id") == task_id:
                return i
    return None

def all_tasks_flat(plan: Dict[str, Any]) -> List[Dict[str, Any]]:
    out = []
    for sp in plan.get("sprints", []):
        for t in sp.get("tasks", []):
            out.append(t)
    return out

def title_to_task(plan: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    d = {}
    for t in all_tasks_flat(plan):
        title = (t.get("title") or "").strip()
        if title:
            d[title] = t
    return d

def title_to_sprint_index(plan: Dict[str, Any]) -> Dict[str, int]:
    d = {}
    for i, sp in enumerate(plan.get("sprints", [])):
        for t in sp.get("tasks", []):
            title = (t.get("title") or "").strip()
            if title:
                d[title] = i
    return d


# Validation

def validate_plan(plan: Dict[str, Any]) -> Tuple[List[str], List[str]]:
    errors: List[str] = []
    warnings: List[str] = []

    tmap = title_to_task(plan)
    sprint_idx = title_to_sprint_index(plan)

    # missing/removed deps
    for t in all_tasks_flat(plan):
        if t.get("status") == "removed":
            continue
        title = t.get("title", "")
        deps = t.get("depends_on", []) or []
        for dep_title in deps:
            dep_task = tmap.get(dep_title)
            if dep_task is None:
                errors.append(f"Task '{title}' depends on missing task title '{dep_title}'.")
                continue
            if dep_task.get("status") == "removed":
                errors.append(f"Task '{title}' depends on REMOVED task '{dep_title}'.")

    # sprint ordering for deps
    for t in all_tasks_flat(plan):
        if t.get("status") == "removed":
            continue
        title = t.get("title", "")
        my_idx = sprint_idx.get(title)
        if my_idx is None:
            continue
        for dep_title in (t.get("depends_on", []) or []):
            dep_idx = sprint_idx.get(dep_title)
            if dep_idx is None:
                continue
            if dep_idx > my_idx:
                errors.append(
                    f"Sprint ordering error: '{title}' is in Sprint {my_idx+1} but dependency '{dep_title}' is in Sprint {dep_idx+1}."
                )

    # capacity warnings
    for sp in plan.get("sprints", []):
        if sp.get("used_days", 0) > sp.get("capacity_days", 14):
            warnings.append(
                f"Capacity warning: {sp.get('sprint_id')} used_days={sp.get('used_days')} > capacity_days={sp.get('capacity_days')}."
            )

    return errors, warnings


# Safe typed inputs

def ask_int(prompt: str, default: int, min_v: int = None, max_v: int = None) -> int:
    while True:
        v = input(f"{prompt} [{default}]: ").strip()
        if not v:
            x = default
        else:
            try:
                x = int(v)
            except ValueError:
                print("   Enter an integer.")
                continue
        if min_v is not None and x < min_v:
            print(f"   Must be >= {min_v}")
            continue
        if max_v is not None and x > max_v:
            print(f"   Must be <= {max_v}")
            continue
        return x


# Audit log

def audit(audit_log: List[Dict[str, Any]], action: str, by: str, **kwargs) -> None:
    entry = {"timestamp_utc": now_utc_iso(), "action": action, "by": by}
    entry.update(kwargs)
    audit_log.append(entry)


# Actions

def approve_plan(plan: Dict[str, Any], audit_log: List[Dict[str, Any]], approver: str = "Project Manager") -> bool:
    errors, warnings = validate_plan(plan)
    if errors:
        print("\n Cannot approve plan due to ERRORS:")
        for e in errors:
            print(" -", e)
        return False

    if warnings:
        print("\n Warnings (approval allowed):")
        for w in warnings:
            print(" -", w)

    plan["status"] = "approved"
    plan["approved_by"] = approver
    plan["approved_at_utc"] = now_utc_iso()
    plan["last_modified_utc"] = now_utc_iso()
    audit(audit_log, "approve_plan", approver)
    return True

def modify_task(plan: Dict[str, Any], audit_log: List[Dict[str, Any]], task_id: str, editor: str = "Project Manager") -> None:
    t = find_task(plan, task_id)
    if t is None:
        raise ValueError(f"Task '{task_id}' not found.")

    before = {"title": t.get("title"), "estimate_days": t.get("estimate_days"), "depends_on": t.get("depends_on", [])}

    use_llm = input("Use AI suggestions for edits? (y/n) [n]: ").strip().lower() == "y"
    if use_llm:
        titles = [x.get("title","") for x in all_tasks_flat(plan) if x.get("title")]
        sug = llm_suggest_task_changes(
            goal=plan.get("original_goal",""),
            existing_task_titles=titles,
            action="modify_task",
            current_title=t.get("title",""),
            current_est=int(t.get("estimate_days", 1)),
            current_deps=t.get("depends_on", [])
        )
        print("\n🤖 AI Suggestions:")
        print(" - title:", sug["title"])
        print(" - estimate_days:", sug["estimate_days"])
        print(" - depends_on:", sug["depends_on"])
        print(" - sprint_hint:", sug["sprint_hint"])
        print(" - note:", sug["note"])
        print("")

    new_title = input("New title (Enter to skip): ").strip()
    new_est = input("New estimate days (Enter to skip): ").strip()

    if new_title:
        t["title"] = new_title.strip()

    if new_est:
        try:
            new_est_i = int(new_est)
            if new_est_i <= 0:
                raise ValueError
            t["estimate_days"] = new_est_i
        except ValueError:
            print(" Invalid estimate_days. Keeping old value.")

    dep_edit = input("Edit depends_on titles? (comma separated, Enter to skip): ").strip()
    if dep_edit:
        deps = [d.strip() for d in dep_edit.split(",") if d.strip()]
        t["depends_on"] = deps

    audit(audit_log, "modify_task", editor, task_id=task_id, before=before,
          after={"title": t.get("title"), "estimate_days": t.get("estimate_days"), "depends_on": t.get("depends_on", [])})

    recompute_all(plan)

def move_task(plan: Dict[str, Any], audit_log: List[Dict[str, Any]], task_id: str, target_sprint_id: str, editor: str = "Project Manager") -> None:
    old_idx = find_task_location(plan, task_id)
    if old_idx is None:
        raise ValueError(f"Task location not found for '{task_id}'.")

    old_sprint = plan["sprints"][old_idx]
    task_obj = None
    for i, tt in enumerate(old_sprint["tasks"]):
        if tt["id"] == task_id:
            task_obj = old_sprint["tasks"].pop(i)
            break
    if task_obj is None:
        raise ValueError("Task object not found during move.")

    target = None
    for sp in plan["sprints"]:
        if sp.get("sprint_id") == target_sprint_id:
            target = sp
            break
    if target is None:
        old_sprint["tasks"].append(task_obj)
        raise ValueError(f"Sprint '{target_sprint_id}' not found.")

    target["tasks"].append(task_obj)
    audit(audit_log, "move_task", editor, task_id=task_id, from_sprint=old_sprint.get("sprint_id"), to_sprint=target_sprint_id)
    recompute_all(plan)

def remove_task(plan: Dict[str, Any], audit_log: List[Dict[str, Any]], task_id: str, reason: str, remover: str = "Project Manager") -> None:
    t = find_task(plan, task_id)
    if t is None:
        raise ValueError(f"Task '{task_id}' not found.")
    t["status"] = "removed"
    t["removed_reason"] = reason.strip()
    audit(audit_log, "remove_task", remover, task_id=task_id, reason=reason.strip())
    recompute_all(plan)

def add_task(plan: Dict[str, Any], audit_log: List[Dict[str, Any]], sprint_id: str, added_by: str = "Project Manager") -> None:
    target = None
    for sp in plan.get("sprints", []):
        if sp.get("sprint_id") == sprint_id:
            target = sp
            break
    if target is None:
        raise ValueError(f"Sprint '{sprint_id}' not found.")

    title = input("Task title: ").strip()

    use_llm = input("Use AI suggestions for this new task? (y/n) [n]: ").strip().lower() == "y"
    if use_llm:
        titles = [x.get("title","") for x in all_tasks_flat(plan) if x.get("title")]
        sug = llm_suggest_task_changes(
            goal=plan.get("original_goal",""),
            existing_task_titles=titles,
            action="add_task",
            current_title=title,
            current_est=2,
            current_deps=[]
        )
        print("\n AI Suggestions:")
        print(" - title:", sug["title"])
        print(" - estimate_days:", sug["estimate_days"])
        print(" - depends_on:", sug["depends_on"])
        print(" - sprint_hint:", sug["sprint_hint"])
        print(" - note:", sug["note"])
        print("")

    est = ask_int("Estimate days", 2, 1, 200)
    deps_raw = input("Depends on (comma-separated task TITLES, Enter for none): ").strip()
    deps = [d.strip() for d in deps_raw.split(",") if d.strip()] if deps_raw else []

    existing_ids = [t.get("id", "") for t in all_tasks_flat(plan)]
    new_id = f"H-{len(existing_ids)+1:03d}"

    new_task = {
        "id": new_id,
        "title": title,
        "estimate_days": est,
        "depends_on": deps,
        "type": "human_override",
        "status": "active"
    }
    target["tasks"].append(new_task)
    audit(audit_log, "add_task", added_by, task_id=new_id, sprint_id=sprint_id,
          task={"title": title, "estimate_days": est, "depends_on": deps})
    recompute_all(plan)


# Display helpers

def print_plan_summary(plan: Dict[str, Any]) -> None:
    recompute_all(plan)
    print("\n=== CURRENT PLAN SUMMARY ===")
    print("Goal:", plan.get("original_goal"))
    print("Status:", plan.get("status", "draft"))
    print("Sprints:", plan["summary"]["num_sprints"])
    print("Total Estimated Days:", plan["summary"]["total_estimated_days"])
    for sp in plan.get("sprints", []):
        print(f"  {sp['sprint_id']}: used {sp['used_days']} / {sp['capacity_days']} (remaining {sp['remaining_capacity_days']})")

def print_sprint(plan: Dict[str, Any], sprint_id: str) -> None:
    for sp in plan.get("sprints", []):
        if sp.get("sprint_id") == sprint_id:
            print(f"\n--- {sprint_id} TASKS ---")
            for t in sp.get("tasks", []):
                status = t.get("status", "active")
                print(f"{t['id']} | {t['title']} | {t['estimate_days']}d | status={status} | deps={t.get('depends_on', [])}")
            return
    print("Sprint not found.")

def print_task_details(plan: Dict[str, Any], task_id: str) -> None:
    t = find_task(plan, task_id)
    if t is None:
        print("Task not found.")
        return
    print("\n--- TASK DETAILS ---")
    print("ID:", t.get("id"))
    print("Title:", t.get("title"))
    print("Estimate days:", t.get("estimate_days"))
    print("Depends on:", t.get("depends_on", []))
    print("Status:", t.get("status", "active"))
    if "removed_reason" in t:
        print("Removed reason:", t.get("removed_reason"))

print(" Cell 1 ready. Run Cell 2 to open FR03 plan and start FR04 review UI.")


Loading weights:   0%|          | 0/558 [00:00<?, ?it/s]



 FR04 Loaded: google/flan-t5-large | device: cpu
 Cell 1 ready. Run Cell 2 to open FR03 plan and start FR04 review UI.


In [5]:
# SprintPilotAI - FR05: Task Success Prediction (TRAIN + SAVE)
# Google Colab version
# Model: RandomForestClassifier (better accuracy than LogisticRegression)
# Dataset: /content/drive/MyDrive/Colab Notebooks/FYP/task_success.csv


from google.colab import drive
drive.mount('/content/drive')

import os
import json
import pandas as pd
import numpy as np
import joblib
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, ConfusionMatrixDisplay
)

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

from sklearn.ensemble import RandomForestClassifier


# Load dataset

CSV_PATH = "/content/drive/MyDrive/Colab Notebooks/FYP/task_success.csv"

df = pd.read_csv(CSV_PATH)
print("Loaded:", df.shape)
print("Columns:", df.columns.tolist())


# Define label + safe feature columns

TARGET = "task_success"

DROP_COLS = [
    "task_id", "project_id", "sprint_id", "task_title",
    "status",
    "actual_hours",
    "success_probability"
]
DROP_COLS = [c for c in DROP_COLS if c in df.columns]

df = df.dropna(subset=[TARGET]).copy()

X = df.drop(columns=[TARGET] + DROP_COLS, errors="ignore")
y = df[TARGET].astype(int)

print("\nUsing features:", X.columns.tolist())
print("X shape:", X.shape, "y shape:", y.shape)
print("\nLabel distribution:\n", y.value_counts())


# Train/Val/Test split

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.30, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, random_state=42, stratify=y_temp
)

print("\nSplit sizes:")
print("Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)


# Preprocessing

cat_cols = X.select_dtypes(include=["object", "bool"]).columns.tolist()
num_cols = [c for c in X.columns if c not in cat_cols]

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_cols),
        ("cat", categorical_transformer, cat_cols)
    ]
)


# Model: Random Forest

clf = RandomForestClassifier(
    n_estimators=600,
    max_depth=None,
    min_samples_split=10,
    min_samples_leaf=4,
    class_weight="balanced",
    random_state=42,
    n_jobs=-1
)

model = Pipeline(steps=[
    ("preprocess", preprocess),
    ("clf", clf)
])


# Train

model.fit(X_train, y_train)


# Evaluate helper

THRESHOLD = 0.5

def evaluate_split(name, X_split, y_split):
    proba = model.predict_proba(X_split)[:, 1]
    pred = (proba >= THRESHOLD).astype(int)

    acc = accuracy_score(y_split, pred)
    prec = precision_score(y_split, pred, zero_division=0)
    rec = recall_score(y_split, pred, zero_division=0)
    f1 = f1_score(y_split, pred, zero_division=0)

    auc = roc_auc_score(y_split, proba) if len(np.unique(y_split)) == 2 else float("nan")

    print(f"\n=== {name} Performance (Threshold={THRESHOLD}) ===")
    print("Accuracy :", acc)
    print("Precision:", prec)
    print("Recall   :", rec)
    print("F1-score :", f1)
    print("ROC-AUC  :", auc)
    return pred, proba

val_pred, val_proba = evaluate_split("Validation", X_val, y_val)
test_pred, test_proba = evaluate_split("Test", X_test, y_test)


# Save model + metadata to Drive

OUT_DIR = "/content/drive/MyDrive/Colab Notebooks/FYP/fr05_model_artifacts"
os.makedirs(OUT_DIR, exist_ok=True)

MODEL_PATH = f"{OUT_DIR}/fr05_task_success_model.joblib"
META_PATH  = f"{OUT_DIR}/fr05_feature_columns.json"

joblib.dump(model, MODEL_PATH)

json_meta = {
    "target": TARGET,
    "drop_cols": DROP_COLS,
    "feature_columns_used": X.columns.tolist(),
    "categorical_columns": cat_cols,
    "numeric_columns": num_cols,
    "threshold": THRESHOLD,
    "model": "RandomForestClassifier",
    "model_params": clf.get_params()
}

with open(META_PATH, "w", encoding="utf-8") as f:
    json.dump(json_meta, f, indent=2)

print("\n Saved:")
print("Model:", MODEL_PATH)
print("Meta :", META_PATH)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loaded: (30000, 24)
Columns: ['task_id', 'project_id', 'sprint_id', 'task_title', 'task_type', 'priority_moscow', 'assignee_role', 'experience_years', 'team_size', 'sprint_length_days', 'story_points', 'estimated_hours', 'actual_hours', 'dependencies_count', 'blockers_count', 'requirement_changes', 'communication_volume', 'sentiment_score', 'ai_suggestion_used', 'ai_acceptance_rate', 'override_requested', 'status', 'task_success', 'success_probability']

Using features: ['task_type', 'priority_moscow', 'assignee_role', 'experience_years', 'team_size', 'sprint_length_days', 'story_points', 'estimated_hours', 'dependencies_count', 'blockers_count', 'requirement_changes', 'communication_volume', 'sentiment_score', 'ai_suggestion_used', 'ai_acceptance_rate', 'override_requested']
X shape: (30000, 16) y shape: (30000,)

Label distribution:
 task_success
1    20793

In [None]:

# SprintPilotAI FR01 - CELL 2
# Inference / interactive run

# Re-initialize the model and tokenizer for FR01, as they may have been overwritten by other FR cells.

import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

MODEL_NAME = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
print(" FR01 Model re-initialized.")

print("=== SprintPilotAI FR01 (NLP + FREE local LLM) ===")
goal = input("Enter your high-level project goal: ").strip()

parsed = fr01_parse_goal(goal)

print("\n=== STRUCTURED OUTPUT (for FR02) ===")
print(json.dumps(parsed, indent=2, ensure_ascii=False))

save_goal_json(parsed, "fr01_goal_output.json")
print("\n Saved to: fr01_goal_output.json")

Loading weights:   0%|          | 0/558 [00:00<?, ?it/s]



 FR01 Model re-initialized.
=== SprintPilotAI FR01 (NLP + FREE local LLM) ===
Enter your high-level project goal: create website of shopping mall using react in 2 weeks

=== STRUCTURED OUTPUT (for FR02) ===
{
  "goal_id": "GOAL-20260204154006",
  "timestamp_utc": "2026-02-04T15:40:06.252406Z",
  "raw_goal": "create website of shopping mall using react in 2 weeks",
  "clean_goal": "create website of shopping mall using react in 2 weeks",
  "domain": "Web Application",
  "problem_summary": "Parsed project goal into structured fields for FR02.",
  "deliverables": [],
  "assumptions": [],
  "constraints": {
    "time_text": "2 weeks",
    "time_days_approx": 14,
    "budget_text": null,
    "budget_value_raw": null,
    "platform_hints": [
      "website"
    ],
    "non_functional": []
  },
  "stakeholders": [],
  "features_high_level": [],
  "keywords": [
    "create",
    "website",
    "shopping",
    "mall",
    "react",
    "week"
  ],
  "confidence": 0.75,
  "validation_error": null

In [None]:

# SprintPilotAI FR02 - CELL 2
# Inference / interactive run


print("=== SprintPilotAI FR02 (NLP + FREE local LLM): Task Decomposition ===")

# Load FR01 output file (must exist)
fr01_data = load_fr01_json("fr01_goal_output.json")

# Run FR02 decomposition
fr02_output = fr02_decompose(fr01_data)

print("\n=== DECOMPOSED TASKS OUTPUT (for FR03/FR04) ===")
print(json.dumps(fr02_output, indent=2, ensure_ascii=False))

# Save
save_fr02_json(fr02_output, "fr02_tasks_output.json")
print("\n Saved to: fr02_tasks_output.json")


=== SprintPilotAI FR02 (NLP + FREE local LLM): Task Decomposition ===

=== DECOMPOSED TASKS OUTPUT (for FR03/FR04) ===
{
  "decomposition_id": "DEC-20260204154049",
  "timestamp_utc": "2026-02-04T15:40:49.631249Z",
  "source_goal_id": "GOAL-20260204154006",
  "epic": "Web App Delivery",
  "domain": "Web Application",
  "original_goal": "create website of shopping mall using react in 2 weeks",
  "keywords_used": [
    "create",
    "website",
    "shopping",
    "mall",
    "react",
    "week"
  ],
  "constraints": {
    "time_text": "2 weeks",
    "time_days_approx": 14,
    "budget_text": null,
    "budget_value_raw": null,
    "platform_hints": [
      "website"
    ],
    "non_functional": []
  },
  "estimated_sprint_count_approx": 1,
  "tasks": [
    {
      "id": "T-001",
      "title": "Requirements & Scope",
      "estimate_days": 3,
      "depends_on": [],
      "type": "task",
      "source": "fallback_templates"
    },
    {
      "id": "T-002",
      "title": "Architecture &

In [None]:

# SprintPilotAI FR03 - CELL 2
# Inference / interactive run


print("=== SprintPilotAI FR03: Sprint Plan Generation (with FREE local LLM notes) ===")

# Load FR02 output file (must exist)
fr02_data = load_fr02_json("fr02_tasks_output.json")

# Tune these (optional)
SPRINT_LENGTH_DAYS = 14
VELOCITY_DAYS = 14

fr03_plan = fr03_generate_sprint_plan(
    fr02_data,
    sprint_length_days=SPRINT_LENGTH_DAYS,
    velocity_days_per_sprint=VELOCITY_DAYS,
    enrich_with_llm=True
)

print("\n=== SPRINT PLAN OUTPUT ===")
print(json.dumps(fr03_plan, indent=2, ensure_ascii=False))

save_fr03_json(fr03_plan, "fr03_sprint_plan_output.json")
print("\n Saved to: fr03_sprint_plan_output.json")


=== SprintPilotAI FR03: Sprint Plan Generation (with FREE local LLM notes) ===

=== SPRINT PLAN OUTPUT ===
{
  "sprint_plan_id": "PLAN-20260204154419",
  "timestamp_utc": "2026-02-04T15:45:45.912249Z",
  "source_decomposition_id": "DEC-20260204154049",
  "source_goal_id": "GOAL-20260204154006",
  "domain": "Web Application",
  "epic": "Web App Delivery",
  "original_goal": "create website of shopping mall using react in 2 weeks",
  "assumptions": {
    "sprint_length_days": 14,
    "velocity_days_per_sprint": 14,
    "planning_method": "topological + greedy packing",
    "notes": "Dependencies must be satisfied before a task can be scheduled. Oversized tasks are split."
  },
  "summary": {
    "num_sprints": 3,
    "total_estimated_days": 35,
    "avg_days_per_sprint": 11.67
  },
  "sprints": [
    {
      "sprint_id": "SPRINT-01",
      "sprint_length_days": 14,
      "capacity_days": 14,
      "used_days": 12,
      "remaining_capacity_days": 2,
      "tasks": [
        {
          "

In [None]:

# SprintPilotAI FR04 - CELL 2
# Interactive review / approve runner

print("=== SprintPilotAI FR04 (LLM-ASSISTED): Human Review / Approve ===")

IN_PATH = "fr03_sprint_plan_output.json"
OUT_PLAN = "fr04_final_plan_output.json"
OUT_AUDIT = "fr04_audit_log.json"

plan = load_json(IN_PATH)
audit_log: List[Dict[str, Any]] = []

plan["status"] = plan.get("status", "draft")
plan["last_modified_utc"] = now_utc_iso()
recompute_all(plan)

while True:
    print_plan_summary(plan)

    errors, warnings = validate_plan(plan)
    if errors:
        print("\n Validation errors:")
        for e in errors:
            print(" -", e)
    if warnings:
        print("\n Warnings:")
        for w in warnings:
            print(" -", w)

    print("\nChoose an action:")
    print("1) View sprint tasks")
    print("2) View single task details")
    print("3) Modify a task (title/estimate/deps) [AI optional]")
    print("4) Move a task to another sprint")
    print("5) Remove a task")
    print("6) Add a new task (human override) [AI optional]")
    print("7) Save draft & Exit")
    print("8) Approve plan & Save")
    print("0) Exit without saving")

    choice = input("Enter choice: ").strip()

    try:
        if choice == "1":
            sid = input("Enter sprint id (e.g., SPRINT-01): ").strip()
            print_sprint(plan, sid)

        elif choice == "2":
            tid = input("Enter task id (e.g., T-007): ").strip()
            print_task_details(plan, tid)

        elif choice == "3":
            tid = input("Enter task id to modify: ").strip()
            modify_task(plan, audit_log, tid)
            plan["last_modified_utc"] = now_utc_iso()

        elif choice == "4":
            tid = input("Enter task id to move: ").strip()
            sid = input("Move to sprint id (e.g., SPRINT-02): ").strip()
            move_task(plan, audit_log, tid, sid)
            plan["last_modified_utc"] = now_utc_iso()

        elif choice == "5":
            tid = input("Enter task id to remove: ").strip()
            reason = input("Reason for removal: ").strip()
            remove_task(plan, audit_log, tid, reason)
            plan["last_modified_utc"] = now_utc_iso()

        elif choice == "6":
            sid = input("Add to sprint id (e.g., SPRINT-01): ").strip()
            add_task(plan, audit_log, sid)
            plan["last_modified_utc"] = now_utc_iso()

        elif choice == "7":
            plan["status"] = "draft"
            plan["last_modified_utc"] = now_utc_iso()
            save_json(plan, OUT_PLAN)
            save_json({"audit_log": audit_log}, OUT_AUDIT)
            print("\n Draft saved:")
            print(" -", OUT_PLAN)
            print(" -", OUT_AUDIT)
            break

        elif choice == "8":
            ok = approve_plan(plan, audit_log)
            if ok:
                save_json(plan, OUT_PLAN)
                save_json({"audit_log": audit_log}, OUT_AUDIT)
                print("\n Approved and saved:")
                print(" -", OUT_PLAN)
                print(" -", OUT_AUDIT)
                break

        elif choice == "0":
            print("Exiting without saving.")
            break

        else:
            print("Invalid choice.")

    except Exception as e:
        print(" Error:", str(e))


=== SprintPilotAI FR04 (LLM-ASSISTED): Human Review / Approve ===

=== CURRENT PLAN SUMMARY ===
Goal: create website of shopping mall using react in 2 weeks
Status: draft
Sprints: 3
Total Estimated Days: 35
  SPRINT-01: used 12 / 14 (remaining 2)
  SPRINT-02: used 11 / 14 (remaining 3)
  SPRINT-03: used 12 / 14 (remaining 2)

Choose an action:
1) View sprint tasks
2) View single task details
3) Modify a task (title/estimate/deps) [AI optional]
4) Move a task to another sprint
5) Remove a task
6) Add a new task (human override) [AI optional]
7) Save draft & Exit
8) Approve plan & Save
0) Exit without saving
Enter choice: 8

 Approved and saved:
 - fr04_final_plan_output.json
 - fr04_audit_log.json


In [None]:

# SprintPilotAI - FR05: Inference (LOAD + INPUT + PREDICT)
# Uses saved:
#   fr05_task_success_model.joblib
#   fr05_feature_columns.json


from google.colab import drive
drive.mount('/content/drive')

import json, joblib
import numpy as np
import pandas as pd


# Paths

OUT_DIR = "/content/drive/MyDrive/Colab Notebooks/FYP/fr05_model_artifacts"
MODEL_PATH = f"{OUT_DIR}/fr05_task_success_model.joblib"
META_PATH  = f"{OUT_DIR}/fr05_feature_columns.json"


# Load model + meta

model = joblib.load(MODEL_PATH)
with open(META_PATH, "r", encoding="utf-8") as f:
    meta = json.load(f)

feature_cols = meta["feature_columns_used"]
threshold = float(meta.get("threshold", 0.5))

print("Loaded model:", meta.get("model"))
print("Threshold:", threshold)
print("Features needed:", len(feature_cols))


# Helper: prompt for inputs

def prompt_value(colname: str):
    raw = input(f"{colname}: ").strip()

    # blank => NaN (imputer will handle)
    if raw == "":
        return np.nan

    # try numeric
    try:
        if "." in raw:
            return float(raw)
        return int(raw)
    except:
        # otherwise keep string (categorical)
        return raw

def build_input_row():
    print("\n=== Enter values for each feature (blank = missing) ===")
    row = {}
    for c in feature_cols:
        row[c] = prompt_value(c)
    return pd.DataFrame([row], columns=feature_cols)


# Predict (interactive loop)

while True:
    cmd = input("\nType 'p' to predict, or 'q' to quit: ").strip().lower()
    if cmd == "q":
        break
    if cmd != "p":
        print("Type only 'p' or 'q'.")
        continue

    X_in = build_input_row()

    proba = float(model.predict_proba(X_in)[:, 1][0])
    pred = int(proba >= threshold)

    print("\n=== FR05 Prediction ===")
    print("Success Probability:", round(proba, 4))
    print("Predicted Label (1=Success, 0=Fail):", pred)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loaded model: RandomForestClassifier
Threshold: 0.5
Features needed: 16

Type 'p' to predict, or 'q' to quit: p

=== Enter values for each feature (blank = missing) ===
task_type: Bug
priority_moscow: Could
assignee_role: Developer
experience_years: 3
team_size: 3
sprint_length_days: 12
story_points: 8
estimated_hours: 40
dependencies_count: 0
blockers_count: 0
requirement_changes: 12
communication_volume: 0
sentiment_score: 1
ai_suggestion_used: 1
ai_acceptance_rate: 0.5
override_requested: 0

=== FR05 Prediction ===
Success Probability: 0.6322
Predicted Label (1=Success, 0=Fail): 1

Type 'p' to predict, or 'q' to quit: q
