In [2]:
from elasticsearch import Elasticsearch

client = Elasticsearch(
    "https://f0c40d3bde77471f83e6cace87537665.ap-southeast-1.aws.found.io",
    api_key="MU9oZlg1Y0JiN3JESEE4V0Y5aUg6N1lNZ0VaREtPU1pob3hYQWQtNU1zZw=="
)

index_name = "cve-index"

# Delete if exists (optional cleanup step for testing)
if client.indices.exists(index=index_name):
    client.indices.delete(index=index_name)

# Create index with proper mapping
mappings = {
    "mappings": {
        "properties": {
            "text": {
                "properties": {
                    "cve_id": {"type": "keyword"},
                    "description": {"type": "text"},
                    "severity": {"type": "keyword"}
                }
            }
        }
    }
}
client.indices.create(index=index_name, body=mappings)

# Sample document
doc = {
    "text": {
        "cve_id": "CVE-2024-12345",
        "description": "This is a test CVE vulnerability.",
        "severity": "High"
    }
}

# Index the document
response = client.index(index=index_name, document=doc)
print("Document indexed:", response)

Document indexed: {'_index': 'cve-index', '_id': 'wWBiX5cBOQ2nh5PuxeR9', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


POST /_security/api_key
{
  "name": "my-api-key",
  "role_descriptors": {
    "my-role": {
      "cluster": ["all"],
      "index": [
        {
          "names": ["cve-index"],
          "privileges": ["all"]
        }
      ]
    }
  }
}

In [23]:
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
MODEL_NAME = "all-MiniLM-L6-v2"  # small and fast
ES_INDEX = "cve-index"
model = SentenceTransformer(MODEL_NAME)


  from .autonotebook import tqdm as notebook_tqdm


In [143]:
import re
from urllib.parse import unquote

def parse_cpe(cpe_uri):
    parts = cpe_uri.split(":")
    if len(parts) >= 7:
        return {
            "type": parts[2],         # a / o / h
            "vendor": unquote(parts[3]),
            "product": unquote(parts[4]),
            "version": unquote(parts[5])
        }
    return {}

def extract_cve_data(json_data, embed_model=None):
    cve_id = json_data.get("cveMetadata", {}).get("cveId", "")
    containers = json_data.get("containers", {})
    cna = containers.get("cna", {})

    # Description
    description = next(
        (d.get("value") for d in cna.get("descriptions", []) if d.get("lang") == "en"),
        ""
    )

    # CVSS metrics
    metrics = cna.get("metrics", [])
    cvss = {}
    for m in metrics:
        if "cvssV3_1" in m:
            cvss = m["cvssV3_1"]
            break

    # CWE
    cwes = []
    for pt in cna.get("problemTypes", []):
        for desc in pt.get("descriptions", []):
            if desc.get("lang") == "en":
                cwes.append({
                    "id": desc.get("cweId"),
                    "description": desc.get("description")
                })

    # Affected packages
    affected_packages = []
    for a in cna.get("affected", []):
        vendor = a.get("vendor", "")
        product = a.get("product", "")
        version_info = a.get("versions", [])
        for v in version_info:
            pkg = {
                "vendor": vendor,
                "product": product,
                "version": v.get("version"),
                "lessThanOrEqual": v.get("lessThanOrEqual"),
                "version_type": v.get("versionType"),
                "status": v.get("status")
            }
            affected_packages.append(pkg)

    # Infer fix version
    fix_version = None
    for pkg in affected_packages:
        if pkg.get("lessThanOrEqual"):
            fix_version = f"> {pkg['lessThanOrEqual']}"
            break

    # References
    references = [r.get("url") for r in cna.get("references", []) if "url" in r]

    # Timeline
    timeline = [
        {"date": t["time"], "event": t["value"]}
        for t in cna.get("timeline", []) if "time" in t and "value" in t
    ]

    # Credits
    credits = [c.get("value") for c in cna.get("credits", []) if c.get("lang") == "en"]

    # SSVC (CISA enrichment)
    ssvc_data = {}
    for adp in containers.get("adp", []):
        for metric in adp.get("metrics", []):
            if "other" in metric and metric["other"].get("type") == "ssvc":
                content = metric["other"].get("content", {})
                ssvc_data = {
                    "role": content.get("role"),
                    "version": content.get("version")
                }
                for opt in content.get("options", []):
                    ssvc_data.update(opt)
                break

    # Embedding (optional)
    vector = []
    if embed_model:
        vector = embed_model.encode(description).tolist()

    return {
        "cve_id": cve_id,
        "text": description,
        "vector": vector,
        "metadata": {
            "title": cve_id+" "+cna.get("title", ""),
            "published_date": json_data.get("cveMetadata", {}).get("datePublished", ""),
            "last_updated": json_data.get("cveMetadata", {}).get("dateUpdated", ""),
            "cvss_score": cvss.get("baseScore"),
            "cvss_vector": cvss.get("vectorString"),
            "severity": cvss.get("baseSeverity"),
            "cwe": cwes,
            "affected_packages": affected_packages,
            "fix_version": fix_version,
            "references": references,
            "timeline": timeline,
            "credits": credits,
            "assigner": json_data.get("cveMetadata", {}).get("assignerShortName"),
            "ssvc": ssvc_data
        }
    }

In [147]:
import os
from pathlib import Path

def convert_all(input_dir: str):
    input_path = Path(input_dir)

    all_json_files = list(input_path.rglob("*.json"))
    print(f"[i] Found {len(all_json_files)} JSON files in '{input_dir}'")

    for file_path in all_json_files[:1]:
        try:
            with file_path.open() as f:
                cve_json = json.load(f)
            doc = extract_cve_data(cve_json,model)
            print(doc)
            res=client.index(index="cve-index", id=doc["cve_id"], document=doc)
            print(doc)
            print(res)
        except Exception as e:
            print(f"[!] Failed to process {file_path}: {e}")



convert_all("/Users/ar2024/Downloads/cvelistV5-main/cves/2025/0xxx/")

[i] Found 738 JSON files in '/Users/ar2024/Downloads/cvelistV5-main/cves/2025/0xxx/'
{'cve_id': 'CVE-2025-0749', 'text': "The Homey theme for WordPress is vulnerable to authentication bypass in versions up to, and including, 2.4.3. This is due to the 'verification_id' value being set to empty, and the not empty check is missing in the dashboard user profile page. This makes it possible for unauthenticated attackers to log in to the first verified user.", 'vector': [0.010989012196660042, 0.06895221769809723, 0.006752158980816603, -0.053339775651693344, 0.16157951951026917, -0.05878076329827309, 0.01195523887872696, -0.06536251306533813, 0.0031834638211876154, 0.014222901314496994, 0.006453228648751974, 0.04073279723525047, 0.02417699061334133, -0.0053020985797047615, 0.05884591117501259, -0.018591605126857758, -0.015843136236071587, -0.07499281316995621, -0.020772196352481842, 0.028086723759770393, -0.13129417598247528, -0.008201041258871555, -0.012271127663552761, 0.07010074704885483, 

In [None]:
result

In [109]:
s=""
dir(s)

['__add__',
 '__class__',
 '__contains__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__getnewargs__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__len__',
 '__lt__',
 '__mod__',
 '__mul__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__rmod__',
 '__rmul__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 'capitalize',
 'casefold',
 'center',
 'count',
 'encode',
 'endswith',
 'expandtabs',
 'find',
 'format',
 'format_map',
 'index',
 'isalnum',
 'isalpha',
 'isascii',
 'isdecimal',
 'isdigit',
 'isidentifier',
 'islower',
 'isnumeric',
 'isprintable',
 'isspace',
 'istitle',
 'isupper',
 'join',
 'ljust',
 'lower',
 'lstrip',
 'maketrans',
 'partition',
 'removeprefix',
 'removesuffix',
 'replace',
 'rfind',
 'rindex',
 'rjust',
 'rpartition',
 'rsplit',
 'rstrip',
 'split',
 'splitlines',
 'startswith',
 'stri