In [1]:
from pymongo import MongoClient

uri = "mongodb://localhost:27017"
client = MongoClient(uri)

try:
    client.admin.command("ping")
    print("Connected to MongoDB!")
except :
    print("❌ Could not connect to MongoDB:")



Connected to MongoDB!


In [2]:
# List available databases
db_list = client.list_database_names()
print("Databases:", db_list)

# Check if your database exists
db_name = "nvdcve"
if db_name in db_list:
    db = client[db_name]
    print(f"✅ Database '{db_name}' exists.")
    
    # List collections in that database
    collection_list = db.list_collection_names()
    print("Collections:", collection_list)

    # Check if your collection exists
    collection_name = "nvdcve"
    if collection_name in collection_list:
        print(f"✅ Collection '{collection_name}' exists.")
    else:
        print(f"❌ Collection '{collection_name}' does not exist.")
else:
    print(f"❌ Database '{db_name}' does not exist.")


Databases: ['admin', 'config', 'foss_github_data', 'local', 'nvdcve']
✅ Database 'nvdcve' exists.
Collections: ['nvdcve']
✅ Collection 'nvdcve' exists.


In [3]:
### Get the CVE database ###
CVE_DB = client["nvdcve"]

### Get the NVD CVE collection ###
NVD_CVE_COLLECTION = CVE_DB["nvdcve"]


### Get a sample of the collection documents
for doc in NVD_CVE_COLLECTION.find().limit(1):
    print(doc)


{'_id': ObjectId('68000746fe426ba2c401febe'), 'cve': {'data_type': 'CVE', 'data_format': 'MITRE', 'data_version': '4.0', 'CVE_data_meta': {'ID': 'CVE-1999-0001', 'ASSIGNER': 'cve@mitre.org'}, 'problemtype': {'problemtype_data': [{'description': [{'lang': 'en', 'value': 'CWE-20'}]}]}, 'references': {'reference_data': [{'url': 'http://www.openbsd.org/errata23.html#tcpfix', 'name': 'http://www.openbsd.org/errata23.html#tcpfix', 'refsource': '', 'tags': []}, {'url': 'http://www.openbsd.org/errata23.html#tcpfix', 'name': 'http://www.openbsd.org/errata23.html#tcpfix', 'refsource': '', 'tags': []}, {'url': 'http://www.osvdb.org/5707', 'name': '5707', 'refsource': '', 'tags': []}, {'url': 'http://www.osvdb.org/5707', 'name': '5707', 'refsource': '', 'tags': []}]}, 'description': {'description_data': [{'lang': 'en', 'value': 'ip_input.c in BSD-derived TCP/IP implementations allows remote attackers to cause a denial of service (crash or hang) via crafted packets.'}]}}, 'configurations': {'CVE_da

In [6]:
import re

### Regex pattern for getting vendor:product:version from CPE
pattern = re.compile(r'cpe:2\.3:[aho]:([^:]+):([^:]+):([^:]+)')

results = []
for doc in NVD_CVE_COLLECTION.find({}):
    cve_id = doc.get("cve", {}).get("CVE_data_meta", {}).get("ID", None)

    # Extract list of all CWE values
    cwe_list = []
    try:
        for desc in doc["cve"]["problemtype"]["problemtype_data"][0]["description"]:
            val = desc.get("value")
            if val:
                cwe_list.append(val)
    except (KeyError, IndexError):
        pass

    # Extract CVE description
    description = None
    try:
        description = doc["cve"]["description"]["description_data"][0]["value"]
    except (KeyError, IndexError):
        pass

    # Extract CPEs into vendor:product -> [versions] dict
    cpe_dict = {}
    try:
        for node in doc.get("configurations", {}).get("nodes", []):
            for match in node.get("cpe_match", []):
                cpe_uri = match.get("cpe23Uri", "")
                m = pattern.match(cpe_uri)
                if m:
                    vendor, product, version = m.groups()
                    key = f"{vendor} {product}"
                    cpe_dict.setdefault(key, []).append(version)
    except Exception as e:
        print("Error parsing CPEs:", e)

    results.append({
        "cve_id": cve_id,
        "cwe_val": cwe_list,
        "cve_description": description,
        "cpes_with_versions": cpe_dict,
        "foss_vec_matches": []  # Will be populated later
    })




In [7]:
### Write data to CSV ###
import json

with open("cve-cpe-foss_data.json", "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

In [None]:
"""
cve_id: id,
cwe_val: cwe,
cve description: text,
set of unique cpes with versions: dict[vendor product : list[versions]],
foss_vec_matches: list[dict[name of model (Nomic): {'foss_project_name': 'FFmpeg FFmpeg',
  'vector_certainty': 0.9790698885917664,
  'vector_distance': 0.041860222816467285}]]

  *** I want to get the top 3 results across all models. ***
                        
                            

"""

In [None]:
### Create a new mongoDB collection with this new data. ###

# Now, you can insert `results` into your MongoDB collection
##NEW_COLLECTION.insert_many(result