In [None]:
!pip install scikit-learn xgboost pandas



In [None]:
!git clone https://github.com/abhinavuser/UAV-UAS /content/repo

Cloning into '/content/repo'...
remote: Enumerating objects: 183076, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (18/18), done.[K
remote: Total 183076 (delta 1), reused 18 (delta 1), pack-reused 183057 (from 1)[K
Receiving objects: 100% (183076/183076), 103.31 MiB | 26.01 MiB/s, done.
Resolving deltas: 100% (145612/145612), done.


In [None]:
from textwrap import dedent
from pathlib import Path

base = Path("/content")

generate_dataset_py = dedent("""
#!/usr/bin/env python3
\"\"\"
generate_dataset.py
----------------------------------
Builds a *realistic* coverage dataset from a source repo using:
- Heuristics (always available)
- Optional open‑source scanners if installed: hadolint, trivy, osv-scanner, semgrep, radon

Each row in the CSV corresponds to an artifact (Dockerfile, dependency manifest, or code module).
It outputs the schema your ML expects:
[name, module, priority, risk_score, complexity_score, status, business_impact]

USAGE:
  python generate_dataset.py --repo /path/to/repo --out coverage_dataset.csv
  # Optional: --prefer-tools to use CLI scanners when available
  # Optional: --llm (experimental) to summarize risks if `ollama` is installed

Install suggestions (optional):
  hadolint, trivy, osv-scanner, semgrep, radon, gitleaks, trufflehog, ollama
\"\"\"

import os
import re
import json
import math
import argparse
import subprocess
from pathlib import Path
from collections import defaultdict

# ----------------------------
# Utilities
# ----------------------------

def which(cmd: str) -> bool:
    from shutil import which as _which
    return _which(cmd) is not None

def run(cmd):
    try:
        res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
        return res.returncode, res.stdout.strip(), res.stderr.strip()
    except Exception as e:
        return -1, "", str(e)

def top_level_module(path: Path, repo: Path) -> str:
    try:
        rel = path.relative_to(repo)
    except Exception:
        rel = path
    parts = rel.parts
    return parts[0] if parts else "root"

def load_text(path: Path) -> str:
    try:
        return path.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        return ""

# ----------------------------
# Scoring helpers
# ----------------------------

PRIORITY_KEYWORDS = {
    "high": ["auth", "payment", "payments", "security", "infra", "gateway", "checkout"],
    "medium": ["admin", "product", "order", "catalog", "search", "inventory"],
    "low": ["ui", "docs", "styles", "examples", "demo", "sample"]
}

IMPACT_KEYWORDS = {
    "Critical": ["payment", "payments", "auth", "security", "secrets", "keys", "pii"],
    "High": ["checkout", "orders", "infra", "gateway"],
    "Medium": ["product", "catalog", "search", "email"],
    "Low": ["docs", "ui", "theme"]
}

def infer_priority(name: str, module: str) -> str:
    text = f\"{name} {module}\".lower()
    for level, kws in PRIORITY_KEYWORDS.items():
        if any(kw in text for kw in kws):
            return level
    return "medium"

def infer_impact(name: str, module: str) -> str:
    text = f\"{name} {module}\".lower()
    for level, kws in IMPACT_KEYWORDS.items():
        if any(kw in text for kw in kws):
            return level
    # Default a bit conservative
    return "Medium"

def normalize_1_to_5(value, max_value, fallback=1.0):
    try:
        if max_value <= 0:
            return fallback
        x = 5.0 * float(value) / float(max_value)
        return max(1.0, min(5.0, x))
    except Exception:
        return fallback

# Risk = f(vulns, lint findings, secrets, outdated deps)
def combine_risk(vuln_score, lint_score, secret_score, misc_score) -> float:
    # logarithmic dampening, then scale to 1-5
    total = vuln_score*1.2 + lint_score*0.8 + secret_score*1.5 + misc_score*0.7
    return min(5.0, 1.0 + math.log(1.0 + total + 1e-6, 2))

# Complexity = f(code size, cyclomatic complexity if available)
def combine_complexity(sloc, cc_avg) -> float:
    # heuristics: sloc contributes, cc adds a bit
    sloc_part = min(5.0, 1.0 + math.log(1 + sloc, 5))
    if cc_avg is None:
        return sloc_part
    return min(5.0, 0.7*sloc_part + 0.3*min(5.0, 1.0 + math.log(1 + cc_avg, 2)))

def infer_status(tests_count: int, sloc: int) -> str:
    if tests_count == 0:
        return "uncovered"
    ratio = tests_count / max(10, sloc/50)  # rough proxy
    if ratio >= 1.0:
        return "covered"
    if ratio >= 0.3:
        return "partial"
    return "uncovered"

# ----------------------------
# Heuristic analyzers (no external tools)
# ----------------------------

DOCKER_ISSUE_PATTERNS = [
    (r\"FROM .*:latest\", 5, "Using latest tag"),
    (r\"^USER root\", 4, "Running as root"),
    (r\"apk add .* --no-cache\" , 1, "apk add ok (no-cache)"),
    (r\"apt-get install .* -y\", 2, "apt-get install potentially caching"),
    (r\"curl .*\\|\\s*sh\", 5, "Curl pipe to shell"),
    (r\"ADD \", 2, "Prefer COPY over ADD"),
]

def analyze_dockerfile_heuristic(text: str):
    vulns = 0
    for pat, weight, _desc in DOCKER_ISSUE_PATTERNS:
        if re.search(pat, text, flags=re.MULTILINE):
            vulns += weight
    return vulns

def sloc_count(text: str) -> int:
    return sum(1 for line in text.splitlines() if line.strip())

def guess_tests_for_path(path: Path, repo: Path) -> int:
    # crude heuristic: count files under tests/ that mention the module name
    tests_root = repo / "tests"
    if not tests_root.exists():
        return 0
    name = top_level_module(path, repo).lower()
    count = 0
    for root, _, files in os.walk(tests_root):
        for f in files:
            p = Path(root) / f
            try:
                t = p.read_text(encoding="utf-8", errors="ignore").lower()
                if name in t:
                    count += 1
            except Exception:
                pass
    return count

# ----------------------------
# Optional tool-based analyzers
# ----------------------------

def hadolint_findings(path: Path) -> int:
    if not which("hadolint"):
        return 0
    code, out, _ = run(["hadolint", "-f", "json", str(path)])
    if code != 0 or not out:
        return 0
    try:
        arr = json.loads(out)
        # weight by severity
        sev_w = {"error": 4, "warning": 2, "info": 1, "style": 1}
        score = sum(sev_w.get(item.get("level","info").lower(),1) for item in arr)
        return score
    except Exception:
        return 0

def trivy_fs_findings(repo: Path) -> int:
    if not which("trivy"):
        return 0
    code, out, _ = run(["trivy", "fs", "--quiet", "--format", "json", str(repo)])
    if code != 0 or not out:
        return 0
    try:
        obj = json.loads(out)
        score = 0
        sev_w = {"CRITICAL": 8, "HIGH": 5, "MEDIUM": 3, "LOW": 1, "UNKNOWN": 1}
        for res in obj.get("Results", []):
            for v in res.get("Vulnerabilities", []) if res.get("Vulnerabilities") else []:
                score += sev_w.get(v.get("Severity","LOW").upper(), 1)
        return score
    except Exception:
        return 0

def osv_findings(repo: Path) -> int:
    if not which("osv-scanner"):
        return 0
    code, out, _ = run(["osv-scanner", "--json", str(repo)])
    if code != 0 or not out:
        return 0
    try:
        obj = json.loads(out)
        score = 0
        sev_w = {"CRITICAL": 8, "HIGH": 5, "MODERATE": 3, "MEDIUM": 3, "LOW": 1}
        for r in obj.get("results", []):
            for p in r.get("packages", []):
                for v in p.get("vulnerabilities", []):
                    sev = v.get("severity", [])
                    # try to read a numeric CVSS if present
                    cvss_max = 0.0
                    for s in sev:
                        # GHSA style might have score
                        try:
                            sc = float(s.get("score", 0))
                            cvss_max = max(cvss_max, sc)
                        except Exception:
                            pass
                    if cvss_max >= 9.0:
                        score += 8
                    elif cvss_max >= 7.0:
                        score += 5
                    elif cvss_max >= 4.0:
                        score += 3
                    elif cvss_max > 0.0:
                        score += 1
        return score
    except Exception:
        return 0

def semgrep_findings(repo: Path) -> int:
    if not which("semgrep"):
        return 0
    code, out, _ = run(["semgrep", "--json", "--quiet", "--error", "--severity", "WARNING", str(repo)])
    if code != 0 or not out:
        return 0
    try:
        obj = json.loads(out)
        score = 0
        for r in obj.get("results", []):
            sev = r.get("extra", {}).get("severity","WARNING").upper()
            score += {"ERROR":4, "WARNING":2, "INFO":1}.get(sev, 1)
        return score
    except Exception:
        return 0

def radon_cc_avg(repo: Path, module_name: str) -> float | None:
    if not which("radon"):
        return None
    # radon cc -s -j <path>
    path = repo / module_name
    if not path.exists():
        return None
    code, out, _ = run(["radon", "cc", "-s", "-j", str(path)])
    if code != 0 or not out:
        return None
    try:
        obj = json.loads(out)
        totals = 0
        count = 0
        for _, entries in obj.items():
            for e in entries:
                try:
                    totals += int(e.get("complexity", 0))
                    count += 1
                except Exception:
                    pass
        return (totals / count) if count else None
    except Exception:
        return 0

# ----------------------------
# (Optional) LLM summarizer
# ----------------------------
def ollama_summarize(text: str) -> str | None:
    if not which("ollama"):
        return None
    prompt = (
        "You are a security auditor. Read the following Dockerfile or code snippet "
        "and list 3-5 concrete security risks or best-practice gaps in short bullet points:\\n\\n"
        + text[:6000]
    )
    code, out, _ = run(["ollama", "run", "llama3.1", prompt])
    return out if code == 0 else None

# ----------------------------
# Dataset assembly
# ----------------------------

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--repo", required=True, help="Path to repo root")
    ap.add_argument("--out", default="coverage_dataset.csv", help="Output CSV path")
    ap.add_argument("--prefer-tools", action="store_true", help="Use external scanners if present")
    ap.add_argument("--llm", action="store_true", help="Use ollama to summarize risks if installed")
    args = ap.parse_args()

    repo = Path(args.repo).resolve()
    rows = []

    # Gather artifacts
    dockerfiles = [Path(root)/f for root,_,files in os.walk(repo) for f in files if f.lower().startswith("dockerfile")]
    manifests = []
    for root,_,files in os.walk(repo):
        for f in files:
            lf = f.lower()
            if lf in ("requirements.txt", "package.json", "yarn.lock", "pnpm-lock.yaml", "poetry.lock", "pyproject.toml", "go.mod", "pom.xml"):
                manifests.append(Path(root)/f)

    # Analyze Dockerfiles
    for dpath in dockerfiles:
        module = top_level_module(dpath, repo)
        text = load_text(dpath)
        name = f"Dockerfile:{dpath.relative_to(repo)}"

        lint_score = hadolint_findings(dpath) if (args.prefer_tools) else 0
        heur_score = analyze_dockerfile_heuristic(text)
        vuln_fs = trivy_fs_findings(repo) if args.prefer_tools else 0  # repo-wide baseline
        secret_score = 0  # placeholder; could add gitleaks/trufflehog

        risk = combine_risk(vuln_fs, lint_score + heur_score, secret_score, 0)
        sloc = sloc_count(text)
        cc = None  # no CC for Dockerfile
        complexity = combine_complexity(sloc, cc)

        # test heuristic
        tests = guess_tests_for_path(dpath, repo)
        status = infer_status(tests, sloc)

        priority = infer_priority(name, module)
        impact = infer_impact(name, module)

        rows.append([name, module, priority, round(risk,2), round(complexity,2), status, impact])

        if args.llm:
            summary = ollama_summarize(text)
            if summary:
                # write sidecar note
                note = (repo / f".dataset_notes_{dpath.name}.txt")
                try:
                    note.write_text(summary, encoding="utf-8")
                except Exception:
                    pass

    # Analyze manifests (dependencies)
    dep_score_repo = osv_findings(repo) if args.prefer_tools else 0
    for mpath in manifests:
        module = top_level_module(mpath, repo)
        text = load_text(mpath)
        name = f"Deps:{mpath.relative_to(repo)}"

        vuln_score = dep_score_repo  # repo-wide OSV results
        lint_score = 0
        secrets = 0
        misc = 0

        risk = combine_risk(vuln_score, lint_score, secrets, misc)
        sloc = sloc_count(text)
        complexity = combine_complexity(sloc, None)
        tests = guess_tests_for_path(mpath, repo)
        status = infer_status(tests, sloc)

        priority = infer_priority(name, module)
        impact = infer_impact(name, module)

        rows.append([name, module, priority, round(risk,2), round(complexity,2), status, impact])

    # Fallback: add one row per top-level module folder with heuristics
    top_levels = {p for p in (p for p in repo.iterdir() if p.is_dir() and not p.name.startswith("."))}
    for mod in top_levels:
        files = list(mod.rglob("*.*"))
        total_sloc = 0
        for fp in files:
            total_sloc += sloc_count(load_text(fp))
        cc = radon_cc_avg(repo, mod.name) if args.prefer_tools else None
        tests = guess_tests_for_path(mod, repo)
        # risk from trivy/semgrep across repo as a baseline signal
        vuln = (trivy_fs_findings(repo) + semgrep_findings(repo)) if args.prefer_tools else 0
        risk = combine_risk(vuln, 0, 0, 0)
        complexity = combine_complexity(total_sloc, cc)

        name = f"Module:{mod.name}"
        module = mod.name
        priority = infer_priority(name, module)
        impact = infer_impact(name, module)
        status = infer_status(tests, total_sloc)

        rows.append([name, module, priority, round(risk,2), round(complexity,2), status, impact])

    # Write CSV
    import csv
    out_path = Path(args.out).resolve()
    with open(out_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["name","module","priority","risk_score","complexity_score","status","business_impact"])
        writer.writerows(rows)

    print(f"Wrote dataset with {len(rows)} rows to {out_path}")
    if not (which("hadolint") or which("trivy") or which("osv-scanner") or which("semgrep")):
        print("Tip: Install scanners (hadolint, trivy, osv-scanner, semgrep) for richer, more realistic risk signals.")
    if rows == []:
        print("No artifacts found. Ensure the repo path is correct and contains code or Dockerfiles/manifests.")

if __name__ == "__main__":
    main()
""")

train_and_save_py = dedent("""
#!/usr/bin/env python3
\"\"\"
train_and_save.py
----------------------------------
Trains multiple models on the generated coverage dataset and saves artifacts:
- Supervised classifiers: DecisionTree, RandomForest, XGBoost
- Unsupervised clustering: KMeans
- Scaler + LabelEncoder

USAGE:
  python train_and_save.py --csv coverage_dataset.csv
\"\"\"

import argparse
import pickle
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.cluster import KMeans
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, accuracy_score

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--csv", required=True, help="Path to coverage dataset CSV")
    args = ap.parse_args()

    df = pd.read_csv(args.csv)

    # Encode categories
    priority_map = {"high": 3, "medium": 2, "low": 1}
    impact_map = {"Critical": 5, "High": 4, "Medium": 3, "Low": 2}

    df["priority_num"] = df["priority"].map(priority_map).fillna(2)
    df["impact_num"] = df["business_impact"].map(impact_map).fillna(3)

    # Features: DO NOT include status-derived signal to avoid leakage
    X = df[["priority_num", "risk_score", "complexity_score", "impact_num"]].astype(float)
    y = df["status"].astype(str)

    # Label encode target
    le = LabelEncoder()
    y_enc = le.fit_transform(y)

    n_classes = len(np.unique(y_enc))
    # For tiny datasets, set test_size to 0.5 for more balanced splits
    test_size = 0.2 if len(df) > 12 else 0.5

    # Ensure all classes are present in both sets
    for _ in range(10):
        X_train, X_test, y_train, y_test = train_test_split(
            X, y_enc, test_size=test_size, random_state=np.random.randint(0, 10000), stratify=y_enc
        )
        if len(np.unique(y_train)) == n_classes and len(np.unique(y_test)) >= 1:
            break
    else:
        raise ValueError("Could not create a split with all classes present in train. Please add more data.")

    # Scale features
    scaler = StandardScaler()
    X_train_s = scaler.fit_transform(X_train)
    X_test_s = scaler.transform(X_test)

    # Models
    dtree = DecisionTreeClassifier(max_depth=5, random_state=42)
    dtree.fit(X_train, y_train)

    rf = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42)
    rf.fit(X_train, y_train)

    # Default to multi:softprob for >2 classes, otherwise binary:logistic
    xgb = None
    if n_classes > 1 and len(X_train) > 1:
        try:
            xgb = XGBClassifier(
                n_estimators=300, max_depth=6, learning_rate=0.1,
                subsample=0.8, colsample_bytree=0.8, random_state=42,
                eval_metric="mlogloss",
                objective="multi:softprob" if n_classes > 2 else "binary:logistic",
                use_label_encoder=False
            )
            xgb.fit(X_train, y_train)
        except Exception as e:
            print(f"XGBoost training failed: {e}")
            xgb = None
    else:
        print("Skipping XGBoost: not enough samples or only one class present in train set.")

        # Evaluate
    for name, model in [("Decision Tree", dtree), ("Random Forest", rf)] + ([("XGBoost", xgb)] if xgb else []):
        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        print(f"{name} Accuracy: {acc:.3f}")
        print(classification_report(y_test, y_pred, target_names=le.classes_))
        if hasattr(model, "feature_importances_"):
            print(f"Feature importances for {name}:")
            for col, imp in zip(["priority","risk","complexity","impact"], model.feature_importances_):
                print(f"  {col}: {imp:.3f}")

    # Clustering on scaled features
    kmeans = KMeans(n_clusters=min(n_classes, 3), random_state=42, n_init=10)
    kmeans.fit(X_train_s)

    # Save artifacts
    with open("scaler.pkl", "wb") as f: pickle.dump(scaler, f)
    with open("label_encoder.pkl", "wb") as f: pickle.dump(le, f)
    with open("decision_tree.pkl", "wb") as f: pickle.dump(dtree, f)
    with open("random_forest.pkl", "wb") as f: pickle.dump(rf, f)
    if xgb is not None:
        with open("xgboost.pkl", "wb") as f: pickle.dump(xgb, f)
    with open("kmeans_model.pkl", "wb") as f: pickle.dump(kmeans, f)

    print("\\nModels saved: scaler.pkl, label_encoder.pkl, decision_tree.pkl, random_forest.pkl, " + ("xgboost.pkl, " if xgb else "") + "kmeans_model.pkl")

if __name__ == "__main__":
    main()
""")

(base / "generate_dataset.py").write_text(generate_dataset_py, encoding="utf-8")
(base / "train_and_save.py").write_text(train_and_save_py, encoding="utf-8")

print("Files created in /content/: generate_dataset.py, train_and_save.py")

Files created in /content/: generate_dataset.py, train_and_save.py


In [None]:
!python3 /content/generate_dataset.py --repo /content/repo --out /content/coverage_dataset.csv

Wrote dataset with 16 rows to /content/coverage_dataset.csv
Tip: Install scanners (hadolint, trivy, osv-scanner, semgrep) for richer, more realistic risk signals.


In [None]:
!python3 /content/train_and_save.py --csv /content/coverage_dataset.csv

Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Decision Tree Accuracy: 1.000
              precision    recall  f1-score   support

     covered       1.00      1.00      1.00         1
   uncovered       1.00      1.00      1.00         3

    accuracy                           1.00         4
   macro avg       1.00      1.00      1.00         4
weighted avg       1.00      1.00      1.00         4

Feature importances for Decision Tree:
  priority: 0.000
  risk: 0.000
  complexity: 1.000
  impact: 0.000
Random Forest Accuracy: 1.000
              precision    recall  f1-score   support

     covered       1.00      1.00      1.00         1
   uncovered       1.00      1.00      1.00         3

    accuracy                           1.00         4
   macro avg       1.00      1.00      1.00         4
weighted avg       1.00      1.00      1.00         4

Feature importances for Random Forest:
  priority: 0.232
  risk: 0.000
  complexity: