In [2]:
!pip install fastapi uvicorn faiss-cpu sentence-transformers openai --quiet
!nohup uvicorn threat_explainer_mvp:app --host 0.0.0.0 --port 8000 --reload &
!pip install uvicorn
!pip install fastapi
!pip install sentence-transformers
!pip install faiss-cpu
!pip install openai
!pip install gradio

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/95.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.5/62.5 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m61.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m73.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m58.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m 

In [3]:
# threat_explainer_mvp.py (Optimized)

import os
import json
import urllib.request
import faiss
import numpy as np
from typing import List
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI
from pydantic import BaseModel
import openai
from google.colab import userdata
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# --- Configuration ---
DATA_PATH = "/content/drive/MyDrive/cves.json"
MODEL_NAME = "all-MiniLM-L6-v2"
EMBEDDING_DIM = 384
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0?resultsPerPage=5"
openai.api_key = userdata.get('OPEN_ROUTER_API_KEY')

# Initialize model and API
model = SentenceTransformer(MODEL_NAME)
app = FastAPI()

# --- Pydantic Schemas ---
class CVEItem(BaseModel):
    cve_id: str
    description: str
    references: List[str]

class Query(BaseModel):
    question: str
    user_context: str = ""

# --- Load or Download CVE Data ---
def load_or_download_cve_data(path=DATA_PATH):
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)

    print("No local CVE file found. Downloading from NVD...")
    try:
        with urllib.request.urlopen(NVD_API_URL) as url:
            data = json.loads(url.read().decode())
            cves = [
                {
                    "cve_id": item["cve"]["id"],
                    "description": item["cve"]["descriptions"][0]["value"],
                    "references": [ref["url"] for ref in item["cve"].get("references", [])]
                }
                for item in data.get("vulnerabilities", [])
            ]
            os.makedirs(os.path.dirname(path), exist_ok=True)
            with open(path, "w") as f:
                json.dump(cves, f, indent=2)
            return cves
    except Exception as e:
        print(f"Download failed: {e}, using fallback.")
        fallback = [{
            "cve_id": "CVE-2024-12345",
            "description": "A remote code execution vulnerability in OpenSSL...",
            "references": ["https://nvd.nist.gov/vuln/detail/CVE-2024-12345"]
        }]
        with open(path, "w") as f:
            json.dump(fallback, f, indent=2)
        return fallback

# --- Build FAISS Index ---
def build_faiss_index(cve_data):
    embeddings = model.encode([item['description'] for item in cve_data], convert_to_numpy=True)
    index = faiss.IndexFlatL2(EMBEDDING_DIM)
    index.add(embeddings)
    return index

# Load data and build index
cve_data = load_or_download_cve_data()
faiss_index = build_faiss_index(cve_data)

# --- Semantic Search ---
def search_similar_cves(query: str, top_k=5):
    query_embedding = model.encode([query], convert_to_numpy=True)
    _, I = faiss_index.search(query_embedding, top_k)
    return [cve_data[i] for i in I[0]]

# --- LLM Prompting ---
def explain_threat_with_llm(user_input: str, retrieved_cves: List[dict]) -> str:
    context_block = "\n\n".join([f"{c['cve_id']}: {c['description']}" for c in retrieved_cves])
    prompt = f"""
    You are a cybersecurity assistant.
    Analyze the following context and describe:
    1. The nature of the threat
    2. Affected systems or users
    3. Whether the user is likely at risk
    4. Suggested mitigation steps

    User Input:
    {user_input}

    Related CVEs:
    {context_block}

    Respond concisely and clearly for a non-technical user.
    """
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.4,
        max_tokens=500
    )
    return response.choices[0].message.content.strip()

# --- FastAPI Endpoint ---
@app.post("/explain")
def explain(query: Query):
    top_cves = search_similar_cves(query.question)
    user_input = f"{query.user_context}\n{query.question}"
    answer = explain_threat_with_llm(user_input, top_cves)
    return {"answer": answer, "related_cves": [c['cve_id'] for c in top_cves]}


Mounted at /content/drive


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [6]:
# gradio_ui.py (OpenRouter version with user concern/context support)
import gradio as gr
import json
import os
import urllib.request
import requests
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

# --- Config ---
DATA_PATH = "/content/drive/MyDrive/cves.json"
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0?resultsPerPage=5"
MODEL_NAME = "all-MiniLM-L6-v2"
EMBEDDING_DIM = 384
OPENROUTER_ENDPOINT = "https://openrouter.ai/api/v1/chat/completions"

# --- Load model ---
model = SentenceTransformer(MODEL_NAME)

# --- CVE Data Loader ---
def create_or_download_cves():
    os.makedirs(os.path.dirname(DATA_PATH), exist_ok=True)
    try:
        with urllib.request.urlopen(NVD_API_URL) as url:
            data = json.loads(url.read().decode())
            cves = []
            for item in data.get("vulnerabilities", []):
                cve_id = item["cve"]["id"]
                description = item["cve"]["descriptions"][0]["value"]
                references = [ref["url"] for ref in item["cve"].get("references", [])]
                cves.append({"cve_id": cve_id, "description": description, "references": references})
            with open(DATA_PATH, "w") as f:
                json.dump(cves, f, indent=2)
            print("Downloaded CVE data from NVD.")
    except Exception as e:
        print(f"Failed to download CVEs from NVD. Using fallback. Reason: {e}")
        cves = [
            {
                "cve_id": "CVE-2024-5678",
                "description": "Zohocorp ManageEngine Applications Manager versions 170900 and below are vulnerable to the authenticated admin-only SQL Injection in the Create Monitor feature.",
                "references": ["https://nvd.nist.gov/vuln/detail/CVE-2024-5678"]
            },
            {
                "cve_id": "CVE-2023-99999",
                "description": "All versions of @fastify/oauth2 used a statically generated state parameter at startup time and were used across all requests for all users. The purpose of the Oauth2 state parameter is to prevent Cross-Site-Request-Forgery attacks. ",
                "references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-31999"]
            }
        ]
        with open(DATA_PATH, "w") as f:
            json.dump(cves, f, indent=2)
    return cves

# --- Load data and build FAISS index ---
if not os.path.exists(DATA_PATH):
    cve_data = create_or_download_cves()
else:
    with open(DATA_PATH, "r") as f:
        cve_data = json.load(f)

descriptions = [item['description'] for item in cve_data]
embeddings = model.encode(descriptions, convert_to_numpy=True)
faiss_index = faiss.IndexFlatL2(EMBEDDING_DIM)
faiss_index.add(embeddings)

# --- Semantic search ---
def search_similar_cves(query, top_k=5):
    query_embedding = model.encode([query], convert_to_numpy=True)
    D, I = faiss_index.search(query_embedding, top_k)
    return [cve_data[i] for i in I[0]]

# --- LLM-powered explanation using OpenRouter ---
def query_threat_assistant(user_question, user_context):
    api_key = userdata.get('OPEN_ROUTER_API_KEY')
    if not api_key:
        return "Missing OPEN_ROUTER_API_KEY in environment", ""

    top_cves = search_similar_cves(user_question)
    docs = "\n\n".join([f"{item['cve_id']}: {item['description']}" for item in top_cves])

    prompt = f"""
You are a cybersecurity assistant.
You will be given a user concern or context, and a vulnerability they are asking about.

Your job is to:
1. Briefly explain what the CVE(s) are
2. Determine if the user's concern indicates they may be affected
3. Suggest what action they should take

User Concern:
{user_context or '[No context provided]'}

User Question:
{user_question}

Relevant CVEs:
{docs}

Give a clear and concise answer.
"""

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    data = {
        "model": "openai/gpt-4o-mini",
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ],
        "temperature": 0.2,
        "max_tokens": 1500
    }

    try:
        response = requests.post(OPENROUTER_ENDPOINT, headers=headers, json=data)
        result = response.json()
        answer = result["choices"][0]["message"]["content"].strip()

        # --- Unique references per CVE ---
        cve_links = []
        seen = set()
        for cve in top_cves:
            if cve['cve_id'] in seen:
                continue
            seen.add(cve['cve_id'])

            if cve.get("references"):
                # Prefer first non-NVD link if available
                preferred = next((r for r in cve["references"] if "nvd.nist.gov" not in r), None)
                link = preferred or f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
            else:
                link = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
            cve_links.append(f"- {cve['cve_id']}: {link}")

        # --- Related CVEs summary ---
        seen_refs = list({c['cve_id']: c for c in top_cves}.values())
        related_summary = "\n".join(
            f"- {c['cve_id']}: {c['description'][:80].strip()}..." for c in seen_refs
        )


        return answer + "\n\n References:\n" + "\n".join(cve_links), related_summary


    except Exception as e:
        return f"Error: {str(e)}", ""

# --- Gradio UI ---
demo = gr.Interface(
    fn=query_threat_assistant,
    inputs=[
        gr.Textbox(label="Enter the CVE or threat-related question"),
        gr.Textbox(label="Describe your concern or system context (optional)")
    ],
    outputs=[
        gr.Textbox(label="Explanation from Assistant", lines=10),
        gr.Textbox(label="Related CVEs")
    ],
    title="Security Threat Explanation Assistant",
    description="Ask about a vulnerability and describe your situation. The assistant will tell you what it means and if it affects you."
)

demo.launch()

Downloaded CVE data from NVD.
It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://1fcebee14aebff61d8.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


