<a href="https://colab.research.google.com/github/Sounakray2003/Asmadiya-tech/blob/main/CodeValidator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================
#  FULL AUTO-REMEDIATION AUDITOR – ZERO CRITICAL
#  Removes dangerous code when needed
# ==============================

!pip install -q semgrep transformers torch accelerate bitsandbytes jinja2 > /dev/null 2>&1

import os, json, re, shutil, subprocess, html, ast
from pathlib import Path
from datetime import datetime
from jinja2 import Environment
from IPython.display import HTML, display
from google.colab import files
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig

# ------------------------------------------------------------------
# 1. Load LLM (4-bit)
# ------------------------------------------------------------------
print("Loading Mistral-7B-Instruct (4-bit)…")
quant_cfg = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype="float16")
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3", use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-Instruct-v0.3",
    device_map="auto",
    quantization_config=quant_cfg
)
llm = pipeline("text-generation", model=model, tokenizer=tokenizer,
               max_new_tokens=2000, temperature=0.0, do_sample=False)

# ------------------------------------------------------------------
# 2. Upload
# ------------------------------------------------------------------
print("\nUpload .py file:")
uploaded = files.upload()
extract_dir = "/content/uploaded_code"
shutil.rmtree(extract_dir, ignore_errors=True)
os.makedirs(extract_dir)
for name, data in uploaded.items():
    dest = os.path.join(extract_dir, name)
    with open(dest, "wb") as f: f.write(data)
py_files = list(Path(extract_dir).rglob("*.py"))
file_path = py_files[0]
rel_path = str(file_path.relative_to(Path(extract_dir).parent))
original_code = file_path.read_text(encoding="utf-8", errors="replace")
print(f"\nFile: {rel_path}")

# ------------------------------------------------------------------
# 3. FULL AUDIT FUNCTION
# ------------------------------------------------------------------
def run_full_audit(code, temp_path="/tmp/audit.py"):
    issues = []
    try: compile(code, temp_path, "exec")
    except SyntaxError as e:
        issues.append({"line": e.lineno, "type": "SyntaxError", "severity": "CRITICAL",
                       "message": f"SyntaxError: {e.msg}", "source": "Python"})
    if not any(i["type"] == "SyntaxError" for i in issues):
        try:
            tree = ast.parse(code)
            defined = set()
            class V(ast.NodeVisitor):
                def visit_Assign(self, n):
                    for t in n.targets:
                        if isinstance(t, ast.Name): defined.add(t.id)
                    self.generic_visit(n)
                def visit_Return(self, n):
                    if isinstance(n.value, ast.Name) and n.value.id not in defined:
                        issues.append({"line": n.lineno, "type": "LogicBug", "severity": "MEDIUM",
                                       "message": f"Undefined return: {n.value.id}", "source": "AST"})
            V().visit(tree)
        except: pass
    if not any(i["type"] == "SyntaxError" for i in issues):
        with open(temp_path, "w") as f: f.write(code)
        proc = subprocess.run(
            ["semgrep", "scan", "--config=p/r2c-security-audit", "--json", "--quiet", temp_path],
            capture_output=True, text=True
        )
        if proc.returncode == 0:
            try:
                data = json.loads(proc.stdout)
                for r in data.get("results", []):
                    meta = r.get("extra", {}).get("metadata", {})
                    sev = r["extra"]["severity"].upper()
                    cwe = meta.get("cwe", ["N/A"])[0].split(":")[0]
                    if cwe in {"CWE-78","CWE-89","CWE-502","CWE-94","CWE-95"}:
                        sev = "CRITICAL"
                    issues.append({
                        "line": r["start"]["line"], "cwe": cwe, "severity": sev,
                        "message": r["extra"]["message"], "rule": r["check_id"], "source": "Semgrep"
                    })
            except: pass
    return issues

# ------------------------------------------------------------------
# 4. PHASE 1: ORIGINAL AUDIT + REPORT
# ------------------------------------------------------------------
print("\nPHASE 1: Running original audit…")
initial_issues = run_full_audit(original_code)
counts = {s: sum(1 for i in initial_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

env = Environment()
ORIGINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Original Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#f9f9fb;}
  .summary{background:#f8d7da;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .critical{background:#c0392b;color:white;}
</style></head><body>

<div class="summary">
  <h1>Original Audit Report</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Time:</strong> {{ timestamp }}</p>
  <p><strong>Severity:</strong>
     <span class="critical">Critical: {{ counts.CRITICAL }}</span> |
     High: {{ counts.HIGH }} | Medium: {{ counts.MEDIUM }} | Low: {{ counts.LOW }}
  </p>
</div>

<h2>Original Code</h2>
<pre>{{ escaped_original }}</pre>

{% if initial_issues %}
<h2>Findings</h2>
<table>
  <tr><th>Severity</th><th>Type</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in initial_issues %}
  <tr class="critical" {% if i.severity != 'CRITICAL' %}style="background:#eee;"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.get('type', i.get('rule', '—')) }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>No issues found.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(ORIGINAL_REPORT)
original_html = tmpl.render(
    rel_path=rel_path,
    timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S IST"),
    counts=counts,
    escaped_original=html.escape(original_code),
    initial_issues=initial_issues
)

print("\n" + "="*70)
print("PHASE 1 COMPLETE – ORIGINAL AUDIT")
print("="*70)
display(HTML(original_html))

# ------------------------------------------------------------------
# 5. PHASE 2: AUTO-FIX – REMOVE DANGEROUS CODE
# ------------------------------------------------------------------
critical_issues = [i for i in initial_issues if i["severity"] == "CRITICAL"]
corrected_code = original_code
fixed = False

if critical_issues:
    print(f"\nPHASE 2: Fixing {len(critical_issues)} CRITICAL issue(s) – removing dangerous routes…")
    issue_list = "\n".join([
        f"Line {i.get('line','?')}: {i['message']} ({i.get('type','Semgrep')})"
        for i in critical_issues
    ])

    FIX_PROMPT = f"""You are a senior security engineer.
Fix ALL critical issues in the Flask app below. **Remove any route that cannot be made safe**.

CRITICAL ISSUES TO FIX:
{issue_list}

ORIGINAL CODE:
{original_code}

INSTRUCTIONS:
- **REMOVE** `/calc` (eval) – cannot be safe
- **REMOVE** `/login` (open redirect) – cannot validate all URLs safely
- **REMOVE** `/load` if pickle used – replace with JSON-only
- Fix SQLi → use parameterized query
- Fix Cmd Inj → use subprocess.run([...])
- Fix SSRF → strict whitelist + http(s) only
- Remove debug mode
- Remove hardcoded secrets
- Remove app.run() in production
- Keep only safe routes

Return ONLY the fixed code. No markdown. No comments.
"""

    try:
        out = llm(FIX_PROMPT)[0]["generated_text"].strip()
        m = re.search(r"```(?:python)?\s*(.*?)\s*```", out, re.DOTALL)
        corrected_code = m.group(1) if m else out

        compile(corrected_code, "fixed.py", "exec")
        print("LLM fixed all critical issues – dangerous routes removed!")
        fixed = True
    except Exception as e:
        print(f"Fix failed: {e}")
else:
    print("\nPHASE 2: No critical issues.")

# ------------------------------------------------------------------
# 6. PHASE 3: FINAL AUDIT
# ------------------------------------------------------------------
print("\nPHASE 3: Re-running audit on fixed code…")
final_issues = run_full_audit(corrected_code, "/tmp/fixed.py")
final_counts = {s: sum(1 for i in final_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

FINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Final Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#d5f4e6;}
  .summary{background:#d4edda;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .diff{background:#fff3cd;padding:1rem;border-left:5px solid #f39c12;margin:1rem 0;}
</style></head><body>

<div class="summary">
  <h1>Final Audit Report (After Auto-Fix)</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Fix Applied:</strong> {{ 'YES' if fixed else 'NO' }}</p>
  <p><strong>Before:</strong> Critical: {{ counts.CRITICAL }} | <strong>After:</strong> Critical: {{ final_counts.CRITICAL }}</p>
</div>

{% if fixed %}
<div class="diff"><strong>Fixed Code (Safe Version):</strong><pre>{{ escaped_fixed }}</pre></div>
{% endif %}

<h2>Final Code</h2>
<pre>{{ escaped_fixed }}</pre>

{% if final_issues %}
<h2>Remaining Issues</h2>
<table>
  <tr><th>Severity</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in final_issues %}
  <tr {% if i.severity == 'CRITICAL' %}class="critical"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>All critical issues eliminated! Code is secure.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(FINAL_REPORT)
final_html = tmpl.render(
    rel_path=rel_path,
    fixed=fixed,
    counts=counts,
    final_counts=final_counts,
    escaped_fixed=html.escape(corrected_code),
    final_issues=final_issues
)

print("\n" + "="*70)
print("PHASE 3 COMPLETE – FINAL AUDIT")
print("="*70)
display(HTML(final_html))

In [None]:
# ==============================
#  FULL AUTO-REMEDIATION AUDITOR – ZERO CRITICAL + SYNTAX FIX + LLM CODE EXTRACTION
#  Removes dangerous code when needed
# ==============================

!pip install -q semgrep transformers torch accelerate bitsandbytes jinja2 > /dev/null 2>&1

import os, json, re, shutil, subprocess, html, ast, textwrap
from pathlib import Path
from datetime import datetime
from jinja2 import Environment
from IPython.display import HTML, display
from google.colab import files
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig

# ------------------------------------------------------------------
# 1. Load LLM (4-bit)
# ------------------------------------------------------------------
print("Loading Mistral-7B-Instruct (4-bit)…")
quant_cfg = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype="float16")
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3", use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-Instruct-v0.3",
    device_map="auto",
    quantization_config=quant_cfg
)
llm = pipeline("text-generation", model=model, tokenizer=tokenizer,
               max_new_tokens=3000, temperature=0.0, do_sample=False)

# ------------------------------------------------------------------
# 2. Upload
# ------------------------------------------------------------------
print("\nUpload .py file:")
uploaded = files.upload()
extract_dir = "/content/uploaded_code"
shutil.rmtree(extract_dir, ignore_errors=True)
os.makedirs(extract_dir)
for name, data in uploaded.items():
    dest = os.path.join(extract_dir, name)
    with open(dest, "wb") as f: f.write(data)
py_files = list(Path(extract_dir).rglob("*.py"))
file_path = py_files[0]
rel_path = str(file_path.relative_to(Path(extract_dir).parent))
original_code = file_path.read_text(encoding="utf-8", errors="replace")
print(f"\nFile: {rel_path}")

# ------------------------------------------------------------------
# 3. PHASE 0: SYNTAX AUTO-CORRECTION (unchanged)
# ------------------------------------------------------------------
def fix_syntax_with_llm(code):
    print("\nPHASE 0: Checking & fixing syntax errors with LLM…")
    try:
        compile(code, "<string>", "exec")
        print("No syntax errors detected.")
        return code, False
    except SyntaxError as e:
        print(f"SyntaxError found: {e}")

        prompt = f"""You are an expert Python developer. Fix ONLY the syntax errors in the following code.
Do NOT change any logic, imports, or functionality. Only correct indentation, missing colons, parentheses, quotes, commas, etc.

Error reported: {e}

Code:
{textwrap.dedent(code)}

Return ONLY the corrected Python code. No markdown, no explanations, no ```python blocks."""

        result = llm(prompt)[0]["generated_text"].strip()

        # Extract code if wrapped in backticks
        m = re.search(r"```(?:python)?\s*(.*?)\s*```", result, re.DOTALL)
        if m:
            candidate = m.group(1)
        else:
            candidate = result

        # Final validation
        try:
            compile(candidate, "<fixed>", "exec")
            print("Syntax successfully fixed by LLM!")
            return candidate, True
        except SyntaxError as e2:
            print(f"LLM failed to fully fix syntax: {e2}")
            print("Falling back to original (with syntax error).")
            return code, False

syntax_fixed_code, syntax_was_fixed = fix_syntax_with_llm(original_code)
code_under_audit = syntax_fixed_code  # This is what all later phases will use

# ------------------------------------------------------------------
# 4. FULL AUDIT FUNCTION (unchanged)
# ------------------------------------------------------------------
def run_full_audit(code, temp_path="/tmp/audit.py"):
    issues = []
    try: compile(code, temp_path, "exec")
    except SyntaxError as e:
        issues.append({"line": e.lineno or 0, "type": "SyntaxError", "severity": "CRITICAL",
                       "message": f"SyntaxError: {e.msg}", "source": "Python"})
    if not any(i["type"] == "SyntaxError" for i in issues):
        try:
            tree = ast.parse(code)
            defined = set()
            class V(ast.NodeVisitor):
                def visit_Assign(self, n):
                    for t in n.targets:
                        if isinstance(t, ast.Name): defined.add(t.id)
                    self.generic_visit(n)
                def visit_Return(self, n):
                    if isinstance(n.value, ast.Name) and n.value.id not in defined:
                        issues.append({"line": n.lineno, "type": "LogicBug", "severity": "MEDIUM",
                                       "message": f"Undefined return: {n.value.id}", "source": "AST"})
            V().visit(tree)
        except: pass
    if not any(i["type"] == "SyntaxError" for i in issues):
        with open(temp_path, "w") as f: f.write(code)
        proc = subprocess.run(
            ["semgrep", "scan", "--config=p/r2c-security-audit", "--json", "--quiet", temp_path],
            capture_output=True, text=True
        )
        if proc.returncode == 0:
            try:
                data = json.loads(proc.stdout)
                for r in data.get("results", []):
                    meta = r.get("extra", {}).get("metadata", {})
                    sev = r["extra"]["severity"].upper()
                    cwe = meta.get("cwe", ["N/A"])[0].split(":")[0]
                    if cwe in {"CWE-78","CWE-89","CWE-502","CWE-94","CWE-95"}:
                        sev = "CRITICAL"
                    issues.append({
                        "line": r["start"]["line"], "cwe": cwe, "severity": sev,
                        "message": r["extra"]["message"], "rule": r["check_id"], "source": "Semgrep"
                    })
            except: pass
    return issues

# ------------------------------------------------------------------
# 5. PHASE 1: ORIGINAL AUDIT + REPORT (unchanged)
# ------------------------------------------------------------------
print("\nPHASE 1: Running original audit…")
initial_issues = run_full_audit(code_under_audit)
counts = {s: sum(1 for i in initial_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

env = Environment()
ORIGINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Original Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#f9f9fb;}
  .summary{background:#f8d7da;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .critical{background:#c0392b;color:white;}
  .note{background:#fff3cd;padding:0.8rem;border-left:4px solid #f39c12;margin:1rem 0;}
</style></head><body>

<div class="summary">
  <h1>Original Audit Report</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Time:</strong> {{ timestamp }}</p>
  {% if syntax_was_fixed %}<div class="note">⚠️ Syntax errors were automatically corrected before audit.</div>{% endif %}
  <p><strong>Severity:</strong>
     <span class="critical">Critical: {{ counts.CRITICAL }}</span> |
     High: {{ counts.HIGH }} | Medium: {{ counts.MEDIUM }} | Low: {{ counts.LOW }}
  </p>
</div>

<h2>Code Under Audit (Syntax-Fixed if Needed)</h2>
<pre>{{ escaped_original }}</pre>

{% if initial_issues %}
<h2>Findings</h2>
<table>
  <tr><th>Severity</th><th>Type</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in initial_issues %}
  <tr class="critical" {% if i.severity != 'CRITICAL' %}style="background:#eee;"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.get('type', i.get('rule', '—')) }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>No issues found.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(ORIGINAL_REPORT)
original_html = tmpl.render(
    rel_path=rel_path,
    timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S IST"),
    syntax_was_fixed=syntax_was_fixed,
    counts=counts,
    escaped_original=html.escape(code_under_audit),
    initial_issues=initial_issues
)

print("\n" + "="*70)
print("PHASE 1 COMPLETE – ORIGINAL AUDIT")
print("="*70)
display(HTML(original_html))

# ------------------------------------------------------------------
# 6. PHASE 2: AUTO-FIX – REMOVE DANGEROUS CODE + LLM-BASED EXTRACTION
# ------------------------------------------------------------------
critical_issues = [i for i in initial_issues if i["severity"] == "CRITICAL"]
corrected_code = code_under_audit
fixed = False

if critical_issues:
    print(f"\nPHASE 2: Fixing {len(critical_issues)} CRITICAL issue(s)…")
    issue_list = "\n".join([
        f"Line {i.get('line','?')}: {i['message']} ({i.get('type','Semgrep')})"
        for i in critical_issues
    ])

    # Detect if it's likely a web app
    is_web_app = any(keyword in code_under_audit.lower() for keyword in ['flask', 'django', 'fastapi', '@app.route', 'def get', 'def post'])

    if is_web_app:
        fix_prompt = f"""<s>[INST] You are a senior security engineer. Fix ALL critical issues in this Flask/Django/FastAPI app by removing or replacing dangerous code. Preserve as much safe functionality as possible. Do not remove safe routes like '/' or API endpoints without vulnerabilities.

CRITICAL ISSUES:
{issue_list}

ORIGINAL CODE:
{code_under_audit}

FIXES (apply surgically):
- Replace os.system(path) with subprocess.run(['tar', '-czf', 'backup.tar.gz', path], shell=False)
- Replace pickle.loads(data) with json.loads(data) and remove /load route if unsafe
- Remove /calc route entirely (eval cannot be safely fixed)
- For SQL: Use cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
- For redirects: from urllib.parse import urlparse; if urlparse(next_url).netloc == 'yourdomain.com': redirect(next_url) else: redirect(url_for('index'))
- Set app.debug = False
- Replace hardcoded secrets: API_KEY = os.getenv('API_KEY')
- For file read: filename = os.path.basename(request.args.get('file')); open(os.path.join('/app/data', filename))
- For SSRF: if url.startswith('http://localhost'): requests.get(url)
- For hash: Use hashlib.sha256(pwd.encode()).hexdigest()
- Remove insecure chmod and random; use secrets.token_hex(16)
- Rename shadowing vars (sum -> total)
- app.run(host='127.0.0.1', port=5000)
- Remove PII exposure (comment out or use env vars)

Output ONLY the complete fixed Python code. Start with imports. No other text. [/INST]</s>"""
    else:
        fix_prompt = f"""<s>[INST] You are an expert Python developer. Fix ONLY the critical syntax/logic issues. Do not rewrite or add anything new.

CRITICAL ISSUES:
{issue_list}

ORIGINAL CODE:
{code_under_audit}

FIXES:
- Correct syntax (parentheses, colons, etc.)
- Remove eval/exec/pickle if present

Output ONLY the fixed Python code. No text before or after. [/INST]</s>"""

    try:
        fix_out = llm(fix_prompt)[0]["generated_text"].strip()
        print(f"Raw fix output preview: {fix_out[:200]}...")

        # LLM-BASED EXTRACTION: Use another LLM call to clean/extract the code reliably
        extract_prompt = f"""<s>[INST] Extract ONLY the complete Python code from the following response. Ignore all instructions, explanations, or non-code text. Return just the code, starting from imports or first line, ending at the last line. No markdown, no extra text. [/INST]</s>

Response:
{fix_out}"""
        extract_out = llm(extract_prompt, max_new_tokens=2000)[0]["generated_text"].strip()

        # Simple cleanup for extraction output
        m = re.search(r"```(?:python)?\s*(.*?)\s*```", extract_out, re.DOTALL)
        corrected_code = m.group(1).strip() if m else extract_out.strip()

        # Validate
        compile(corrected_code, "<fixed>", "exec")
        print(f"Fixed code extracted successfully (length: {len(corrected_code)} chars)")
        fixed = True
    except Exception as e:
        print(f"Fix failed: {e}")
        corrected_code = code_under_audit
else:
    print("\nPHASE 2: No critical issues.")

# ------------------------------------------------------------------
# 7. PHASE 3: FINAL AUDIT (unchanged)
# ------------------------------------------------------------------
print("\nPHASE 3: Re-running audit on fixed code…")
final_issues = run_full_audit(corrected_code, "/tmp/fixed.py")
final_counts = {s: sum(1 for i in final_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

FINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Final Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#d5f4e6;}
  .summary{background:#d4edda;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .diff{background:#fff3cd;padding:1rem;border-left:5px solid #f39c12;margin:1rem 0;}
  .note{background:#fff3cd;padding:0.8rem;border-left:4px solid #f39c12;margin:1rem 0;}
</style></head><body>

<div class="summary">
  <h1>Final Audit Report (After Auto-Fix)</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Fix Applied:</strong> {{ 'YES' if fixed else 'NO' }}</p>
  {% if syntax_was_fixed %}<div class="note">⚠️ Syntax errors were automatically corrected before audit.</div>{% endif %}
  <p><strong>Before:</strong> Critical: {{ counts.CRITICAL }} | <strong>After:</strong> Critical: {{ final_counts.CRITICAL }}</p>
</div>

{% if fixed %}
<div class="diff"><strong>Fixed Code (Safe Version):</strong><pre>{{ escaped_fixed }}</pre></div>
{% endif %}

<h2>Final Code</h2>
<pre>{{ escaped_fixed }}</pre>

{% if final_issues %}
<h2>Remaining Issues</h2>
<table>
  <tr><th>Severity</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in final_issues %}
  <tr {% if i.severity == 'CRITICAL' %}class="critical"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>All critical issues eliminated! Code is secure.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(FINAL_REPORT)
final_html = tmpl.render(
    rel_path=rel_path,
    syntax_was_fixed=syntax_was_fixed,
    fixed=fixed,
    counts=counts,
    final_counts=final_counts,
    escaped_fixed=html.escape(corrected_code),
    final_issues=final_issues
)

print("\n" + "="*70)
print("PHASE 3 COMPLETE – FINAL AUDIT")
print("="*70)
display(HTML(final_html))

In [None]:
# ==============================
#  FULL AUTO-REMEDIATION AUDITOR – ZERO CRITICAL + SYNTAX FIX + ROBUST EXTRACTION
#  Removes dangerous code when needed
# ==============================

!pip install -q semgrep transformers torch accelerate bitsandbytes jinja2 > /dev/null 2>&1

import os, json, re, shutil, subprocess, html, ast, textwrap
from pathlib import Path
from datetime import datetime
from jinja2 import Environment
from IPython.display import HTML, display
from google.colab import files
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig

# ------------------------------------------------------------------
# 1. Load LLM (4-bit)
# ------------------------------------------------------------------
print("Loading Mistral-7B-Instruct (4-bit)…")
quant_cfg = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype="float16")
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3", use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-Instruct-v0.3",
    device_map="auto",
    quantization_config=quant_cfg
)
llm = pipeline("text-generation", model=model, tokenizer=tokenizer,
               max_new_tokens=3000, temperature=0.0, do_sample=False)

# ------------------------------------------------------------------
# 2. Upload
# ------------------------------------------------------------------
print("\nUpload .py file:")
uploaded = files.upload()
extract_dir = "/content/uploaded_code"
shutil.rmtree(extract_dir, ignore_errors=True)
os.makedirs(extract_dir)
for name, data in uploaded.items():
    dest = os.path.join(extract_dir, name)
    with open(dest, "wb") as f: f.write(data)
py_files = list(Path(extract_dir).rglob("*.py"))
file_path = py_files[0]
rel_path = str(file_path.relative_to(Path(extract_dir).parent))
original_code = file_path.read_text(encoding="utf-8", errors="replace")
print(f"\nFile: {rel_path}")

# ------------------------------------------------------------------
# 3. PHASE 0: SYNTAX AUTO-CORRECTION
# ------------------------------------------------------------------
def fix_syntax_with_llm(code):
    print("\nPHASE 0: Checking & fixing syntax errors with LLM…")
    try:
        compile(code, "<string>", "exec")
        print("No syntax errors detected.")
        return code, False
    except SyntaxError as e:
        print(f"SyntaxError found: {e}")

        prompt = f"""<s>[INST] You are an expert Python developer. Fix ONLY the syntax errors in the following code.
Do NOT change any logic, imports, or functionality. Only correct indentation, missing colons, parentheses, quotes, commas, etc.

Error reported: {e}

Code:
{textwrap.dedent(code)}

Output ONLY the corrected Python code. Start directly with the code. No markdown, no explanations. [/INST]</s>"""

        result = llm(prompt)[0]["generated_text"].strip()

        # Extract code
        m = re.search(r"```(?:python)?\s*(.*?)\s*```", result, re.DOTALL)
        candidate = m.group(1).strip() if m else result.strip()

        # Final validation
        try:
            compile(candidate, "<fixed>", "exec")
            print("Syntax successfully fixed by LLM!")
            return candidate, True
        except SyntaxError as e2:
            print(f"LLM failed to fully fix syntax: {e2}")
            print("Falling back to original (with syntax error).")
            return code, False

syntax_fixed_code, syntax_was_fixed = fix_syntax_with_llm(original_code)
code_under_audit = syntax_fixed_code  # This is what all later phases will use

# ------------------------------------------------------------------
# 4. FULL AUDIT FUNCTION
# ------------------------------------------------------------------
def run_full_audit(code, temp_path="/tmp/audit.py"):
    issues = []
    try: compile(code, temp_path, "exec")
    except SyntaxError as e:
        issues.append({"line": e.lineno or 0, "type": "SyntaxError", "severity": "CRITICAL",
                       "message": f"SyntaxError: {e.msg}", "source": "Python"})
    if not any(i["type"] == "SyntaxError" for i in issues):
        try:
            tree = ast.parse(code)
            defined = set()
            class V(ast.NodeVisitor):
                def visit_Assign(self, n):
                    for t in n.targets:
                        if isinstance(t, ast.Name): defined.add(t.id)
                    self.generic_visit(n)
                def visit_Return(self, n):
                    if isinstance(n.value, ast.Name) and n.value.id not in defined:
                        issues.append({"line": n.lineno, "type": "LogicBug", "severity": "MEDIUM",
                                       "message": f"Undefined return: {n.value.id}", "source": "AST"})
            V().visit(tree)
        except: pass
    if not any(i["type"] == "SyntaxError" for i in issues):
        with open(temp_path, "w") as f: f.write(code)
        proc = subprocess.run(
            ["semgrep", "scan", "--config=p/r2c-security-audit", "--json", "--quiet", temp_path],
            capture_output=True, text=True
        )
        if proc.returncode == 0:
            try:
                data = json.loads(proc.stdout)
                for r in data.get("results", []):
                    meta = r.get("extra", {}).get("metadata", {})
                    sev = r["extra"]["severity"].upper()
                    cwe = meta.get("cwe", ["N/A"])[0].split(":")[0]
                    if cwe in {"CWE-78","CWE-89","CWE-502","CWE-94","CWE-95"}:
                        sev = "CRITICAL"
                    issues.append({
                        "line": r["start"]["line"], "cwe": cwe, "severity": sev,
                        "message": r["extra"]["message"], "rule": r["check_id"], "source": "Semgrep"
                    })
            except: pass
    return issues

# ------------------------------------------------------------------
# 5. PHASE 1: ORIGINAL AUDIT + REPORT
# ------------------------------------------------------------------
print("\nPHASE 1: Running original audit…")
initial_issues = run_full_audit(code_under_audit)
counts = {s: sum(1 for i in initial_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

env = Environment()
ORIGINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Original Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#f9f9fb;}
  .summary{background:#f8d7da;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .critical{background:#c0392b;color:white;}
  .note{background:#fff3cd;padding:0.8rem;border-left:4px solid #f39c12;margin:1rem 0;}
</style></head><body>

<div class="summary">
  <h1>Original Audit Report</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Time:</strong> {{ timestamp }}</p>
  {% if syntax_was_fixed %}<div class="note">⚠️ Syntax errors were automatically corrected before audit.</div>{% endif %}
  <p><strong>Severity:</strong>
     <span class="critical">Critical: {{ counts.CRITICAL }}</span> |
     High: {{ counts.HIGH }} | Medium: {{ counts.MEDIUM }} | Low: {{ counts.LOW }}
  </p>
</div>

<h2>Code Under Audit (Syntax-Fixed if Needed)</h2>
<pre>{{ escaped_original }}</pre>

{% if initial_issues %}
<h2>Findings</h2>
<table>
  <tr><th>Severity</th><th>Type</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in initial_issues %}
  <tr class="critical" {% if i.severity != 'CRITICAL' %}style="background:#eee;"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.get('type', i.get('rule', '—')) }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>No issues found.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(ORIGINAL_REPORT)
original_html = tmpl.render(
    rel_path=rel_path,
    timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S IST"),
    syntax_was_fixed=syntax_was_fixed,
    counts=counts,
    escaped_original=html.escape(code_under_audit),
    initial_issues=initial_issues
)

print("\n" + "="*70)
print("PHASE 1 COMPLETE – ORIGINAL AUDIT")
print("="*70)
display(HTML(original_html))

# ------------------------------------------------------------------
# 6. PHASE 2: AUTO-FIX – REMOVE DANGEROUS CODE
# ------------------------------------------------------------------
critical_issues = [i for i in initial_issues if i["severity"] == "CRITICAL"]
corrected_code = code_under_audit
fixed = False

if critical_issues:
    print(f"\nPHASE 2: Fixing {len(critical_issues)} CRITICAL issue(s) – removing dangerous routes…")
    issue_list = "\n".join([
        f"Line {i.get('line','?')}: {i['message']} ({i.get('type','Semgrep')})"
        for i in critical_issues
    ])

    # Detect if it's likely a web app
    is_web_app = any(keyword in code_under_audit.lower() for keyword in ['flask', 'django', 'fastapi', '@app.route', 'def get', 'def post'])

    if is_web_app:
        FIX_PROMPT = f"""<s>[INST] You are a senior security engineer.
Fix ALL critical issues in the Flask app below. **Remove any route that cannot be made safe**.

CRITICAL ISSUES TO FIX:
{issue_list}

ORIGINAL CODE:
{code_under_audit}

INSTRUCTIONS:
- **REMOVE** `/calc` (eval) – cannot be safe
- **REMOVE** `/login` (open redirect) – cannot validate all URLs safely
- **REMOVE** `/load` if pickle used – replace with JSON-only
- Fix SQLi → use parameterized query
- Fix Cmd Inj → use subprocess.run([...])
- Fix SSRF → strict whitelist + http(s) only
- Remove debug mode
- Remove hardcoded secrets
- Remove app.run() in production
- Keep only safe routes

Output ONLY the fixed code. Start directly with imports. No markdown. No comments. [/INST]</s>"""
    else:
        FIX_PROMPT = f"""<s>[INST] You are an expert Python developer. Fix ONLY the critical issues below in this Python code.
Do NOT add new features, imports, or rewrite the entire script. Preserve original logic and structure.

CRITICAL ISSUES TO FIX:
{issue_list}

ORIGINAL CODE:
{code_under_audit}

INSTRUCTIONS:
- Fix syntax errors (e.g., missing parentheses, colons, indentation)
- Remove any use of eval(), exec(), pickle.loads() if present
- For non-web code, ensure safe practices only

Output ONLY the fixed code. No markdown. No explanations. [/INST]</s>"""

    try:
        out = llm(FIX_PROMPT)[0]["generated_text"].strip()
        print(f"Raw LLM output preview: {out[:200]}...")  # Debug

        # LLM-BASED EXTRACTION: Use another LLM call to clean/extract the code reliably
        extract_prompt = f"""<s>[INST] Extract ONLY the complete Python code from the following response. Ignore all instructions, explanations, or non-code text. Return just the code, starting from imports or first line, ending at the last line. No markdown, no extra text. [/INST]</s>

Response:
{out}"""
        extract_out = llm(extract_prompt)[0]["generated_text"].strip()

        # Simple cleanup
        m = re.search(r"```(?:python)?\s*(.*?)\s*```", extract_out, re.DOTALL)
        corrected_code = m.group(1).strip() if m else extract_out.strip()

        compile(corrected_code, "fixed.py", "exec")
        print("LLM fixed all critical issues – dangerous routes removed!")
        fixed = True
    except Exception as e:
        print(f"Fix failed: {e}")
else:
    print("\nPHASE 2: No critical issues.")

# ------------------------------------------------------------------
# 7. PHASE 3: FINAL AUDIT
# ------------------------------------------------------------------
print("\nPHASE 3: Re-running audit on fixed code…")
final_issues = run_full_audit(corrected_code, "/tmp/fixed.py")
final_counts = {s: sum(1 for i in final_issues if i["severity"] == s) for s in ["CRITICAL","HIGH","MEDIUM","LOW"]}

FINAL_REPORT = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Final Audit</title>
<style>
  body{font-family:Arial;margin:2rem;background:#d5f4e6;}
  .summary{background:#d4edda;padding:1rem;border-radius:8px;margin-bottom:2rem;}
  pre{background:#eee;padding:8px;border-radius:4px;overflow:auto;font-size:0.9em;}
  table{width:100%;border-collapse:collapse;margin-top:1rem;}
  th,td{border:1px solid #ddd;padding:8px;text-align:left;}
  .diff{background:#fff3cd;padding:1rem;border-left:5px solid #f39c12;margin:1rem 0;}
  .note{background:#fff3cd;padding:0.8rem;border-left:4px solid #f39c12;margin:1rem 0;}
</style></head><body>

<div class="summary">
  <h1>Final Audit Report (After Auto-Fix)</h1>
  <p><strong>File:</strong> {{ rel_path }}</p>
  <p><strong>Fix Applied:</strong> {{ 'YES' if fixed else 'NO' }}</p>
  {% if syntax_was_fixed %}<div class="note">⚠️ Syntax errors were automatically corrected before audit.</div>{% endif %}
  <p><strong>Before:</strong> Critical: {{ counts.CRITICAL }} | <strong>After:</strong> Critical: {{ final_counts.CRITICAL }}</p>
</div>

{% if fixed %}
<div class="diff"><strong>Fixed Code (Safe Version):</strong><pre>{{ escaped_fixed }}</pre></div>
{% endif %}

<h2>Final Code</h2>
<pre>{{ escaped_fixed }}</pre>

{% if final_issues %}
<h2>Remaining Issues</h2>
<table>
  <tr><th>Severity</th><th>Source</th><th>Line</th><th>Message</th></tr>
{% for i in final_issues %}
  <tr {% if i.severity == 'CRITICAL' %}class="critical"{% endif %}>
    <td>{{ i.severity }}</td>
    <td>{{ i.source }}</td>
    <td>{{ i.line }}</td>
    <td>{{ i.message }}</td>
  </tr>
{% endfor %}
</table>
{% else %}
<p><strong>All critical issues eliminated! Code is secure.</strong></p>
{% endif %}
</body></html>
"""

tmpl = env.from_string(FINAL_REPORT)
final_html = tmpl.render(
    rel_path=rel_path,
    syntax_was_fixed=syntax_was_fixed,
    fixed=fixed,
    counts=counts,
    final_counts=final_counts,
    escaped_fixed=html.escape(corrected_code),
    final_issues=final_issues
)

print("\n" + "="*70)
print("PHASE 3 COMPLETE – FINAL AUDIT")
print("="*70)
display(HTML(final_html))