Code created by: Yamid Felipe Quiroga González

In [1]:
# The imported libraries and modules serve the following purposes:
# - json: for encoding and decoding JSON data
# - requests: for making HTTP requests
# - os: for interacting with the operating system
# - JSONDecodeError: for handling errors when decoding JSON data
# - datetime: for manipulating date and time
# - MongoClient and ServerApi: for connecting and interacting with a MongoDB database
# - get_installed_software: for fetching a list of installed software on a Windows machine
# - uuid: for generating universally unique identifiers (UUIDs)
Feel free to provide more code or specify particular sections where you'd like m
import json, requests, os
from json.decoder import JSONDecodeError
from datetime import datetime
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from datetime import datetime
from windows_tools.installed_software import get_installed_software
import uuid

In [2]:
# URI for connecting to the MongoDB database
uri = "[INSERT_MONGO_DB_CONNECTION]"

# Create a new MongoClient instance and connect to the server
# The ServerApi '1' indicates that the connection will use the stable API version 1
client = MongoClient(uri, server_api=ServerApi('1'))

# Try to send a ping command to the MongoDB server to confirm a successful connection
try:
    client.admin.command('ping')  # 'ping' command checks the connection to the server
    print("Pinged your deployment. You successfully connected to MongoDB!")  # Success message
except Exception as e:
    # Print the exception if there is an error while connecting to MongoDB
    print(e)

# Comment mentioning what this line is related to (possibly the collection name for vulnerabilities and software CVEs)
# Vulnerabilities.PcSoftwareCVE

# Select the database you want to use
db = client['[DATABASE]']  # Replace '[DATABASE]' with the name of your database

# Select the collection you want to use
collection = db['[CONTAINER]']  # Replace '[CONTAINER]' with the name of your collection

# Fetch all documents from the selected collection
db = collection.find()  # Retrieves all documents from the collection


Pinged your deployment. You successfully connected to MongoDB!


In [3]:
"""
    Save a JSON document to a MongoDB collection.

    Parameters:
    json_data (dict): The JSON data to be saved into the MongoDB collection.

"""
def save_Mongo(json_data):
    try:
        # Attempt to insert the JSON document into the collection
        collection.insert_one(json_data)
        print("JSON guardado exitosamente en MongoDB.")  # Success message indicating the document was saved
    except Exception as e:
        # Print the exception if there is an error during insertion
        print(f"Error al guardar JSON en MongoDB: {e}")

In [5]:
"""
Retrieves information about a specific CVE (Common Vulnerabilities and Exposures) entry from the MITRE CVE database.

Args:
    cve (str): The CVE ID to retrieve information for.

Returns:
    dict: A dictionary containing information about the specified CVE entry in JSON format.

"""
def apiCVE(cve):
    try:
        response = requests.get(f"https://cveawg.mitre.org/api/cve/{str(cve)}")
        
        # Verifica si la solicitud fue exitosa (código de estado 200)
        if response.status_code == 200:
            try:
                # Intenta decodificar la respuesta JSON
                data = response.json()
                return data
            except ValueError:
                # Si no se pudo decodificar la respuesta JSON, imprime un mensaje de error y devuelve None
                print("La respuesta no contiene datos JSON válidos.")
                return None
        elif response.status_code == 404:
            # Si el código de estado es 404, el recurso no fue encontrado
            print(f"El recurso para CVE {cve} no fue encontrado.")
            return None
        else:
            # Si la solicitud falla por alguna otra razón, imprime el código de estado y devuelve None
            print(f"Error en la solicitud. Código de estado: {response.status_code}")
            return None
    except requests.exceptions.RequestException as e:
        # Si ocurre un error durante la solicitud, imprime el mensaje de error y devuelve None
        print("Error durante la solicitud:", e)
        return None

In [6]:
"""
Retrieves the list of affected products for a given CVE (Common Vulnerabilities and Exposures) entry.

Args:
    cve (str): The CVE ID to retrieve affected products for.

Returns:
    list or str: A list containing the names of affected products, or 'not found' if no affected products are found.

"""
def get_affected_products(cve):
    # Load the JSON data for the CVE
    dataCVE = apiCVE(cve)

    # Check if the JSON data is None (indicating an error in fetching the data)
    if dataCVE is None:
        return 'not found'
    
    # Check if the JSON data contains an error message
    if 'error' in dataCVE and dataCVE['error'] == 'CVE_RECORD_DNE':
        return 'not found'
    
    # Check if the JSON data contains the list of affected products
    if 'containers' in dataCVE and 'cna' in dataCVE['containers'] and 'affected' in dataCVE['containers']['cna']:
        # Get the list of affected products from the JSON data
        affected_products = dataCVE['containers']['cna']['affected']
        
        # Create a list to store the product names
        product_list = []
        
        # Iterate over the list of affected products and extract the product names
        for affected_product in affected_products:
            if "product" in affected_product:
                product_list.append(affected_product['product'])
        
        # Return the list of products
        return product_list
    
    # If the JSON data doesn't contain the expected structure, return 'not found'
    return 'not found'

In [7]:
"""
Retrieves CVSS (Common Vulnerability Scoring System) metrics for a given CVE (Common Vulnerabilities and Exposures) entry from the OpenCVE API.

Args:
    cve (str): The CVE ID to retrieve CVSS metrics for.

Returns:
    dict: A dictionary containing CVSS metrics for the specified CVE entry in JSON format.
"""
def apiCVEmetric(cve):

    payload = {}
    headers = {
        #'Authorization': 'Basic WTRNMUQ6WWFtaWRPcGVuLjIwMDI='
        'Authorization': 'Basic NTdyb2xsZXI6NTdSb2xsZXIhMjAwMQ=='
    }
    url = f"https://www.opencve.io/api/cve/{cve}"
    response = requests.request("GET", url, headers=headers, data=payload)
    return response.json()


In [8]:
"""
Retrieves combined CVSS metrics for a given CVE (Common Vulnerabilities and Exposures) entry from the OpenCVE API.

Args:
    cve (str): The CVE ID to retrieve combined CVSS metrics for.

Returns:
    dict: A dictionary containing combined CVSS metrics for the specified CVE entry.

"""
def getCVSSmetrics(cve):

    response = apiCVEmetric(cve)

    baseScore = 0
    baseSeverity = "Null"
    
    # Primera opción: cvssMetricV31
    if "cvssMetricV31" in response["raw_nvd_data"]["metrics"]:
        cvss_metrics = response["raw_nvd_data"]["metrics"]["cvssMetricV31"][0]
        baseScore = cvss_metrics["cvssData"]["baseScore"]
        baseSeverity = cvss_metrics["cvssData"]["baseSeverity"]
    # Segunda opción: cvssMetricV30
    elif "cvssMetricV30" in response["raw_nvd_data"]["metrics"]:
        cvss_metrics = response["raw_nvd_data"]["metrics"]["cvssMetricV30"][0]
        baseScore = cvss_metrics["cvssData"]["baseScore"]
        baseSeverity = cvss_metrics["cvssData"]["baseSeverity"]
    # Tercera opción: cvssMetricV2
    elif "cvssMetricV2" in response["raw_nvd_data"]["metrics"]:
        cvss_metrics = response["raw_nvd_data"]["metrics"]["cvssMetricV2"][0]
        baseScore = cvss_metrics["cvssData"]["baseScore"]
        baseSeverity = cvss_metrics["baseSeverity"]
    else:
        print("No se encontró información de métricas CVSS.")
        cvss_metrics = None

    scoreMetric = {
        "baseScore": baseScore,
        "baseSeverity": baseSeverity
    }
    
    return scoreMetric

In [9]:
"""
Retrieves CVSS (Common Vulnerability Scoring System) metrics for a given CVE (Common Vulnerabilities and Exposures) entry from the OpenCVE API.

Args:
    cve (str): The CVE ID to retrieve CVSS metrics for.

Returns:
    dict: A dictionary containing CVSS metrics for the specified CVE entry in JSON format.
"""
def apiCVEmetric2(cve):
    url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve}"
    payload = {}
    headers = {}

    try:
        response = requests.request("GET", url, headers=headers, data=payload)
        response_json = response.json()
    except JSONDecodeError:
        print(f"Error:")
        response_json = {}  # Valor predeterminado: un diccionario vacío
    
    return response_json

In [10]:
"""
Retrieves combined CVSS metrics for a given CVE (Common Vulnerabilities and Exposures) entry from the OpenCVE API.

Args:
    cve (str): The CVE ID to retrieve combined CVSS metrics for.

Returns:
    dict: A dictionary containing combined CVSS metrics for the specified CVE entry.

"""
def getCVSSmetrics2(cve):

    response = apiCVEmetric2(cve)

    baseScore = 0
    baseSeverity = "Null"
    
    if "vulnerabilities" in response:
        vulnerabilities = response["vulnerabilities"]
        for vulnerability in vulnerabilities:
            if "cve" in vulnerability and "metrics" in vulnerability["cve"]:
                metrics = vulnerability["cve"]["metrics"]
                if "cvssMetricV31" in metrics:
                    baseScore = metrics["cvssMetricV31"][0]["cvssData"]["baseScore"]
                    baseSeverity = metrics["cvssMetricV31"][0]["cvssData"]["baseSeverity"]
                elif "cvssMetricV30" in metrics:
                    baseScore = metrics["cvssMetricV30"][0]["cvssData"]["baseScore"]
                    baseSeverity = metrics["cvssMetricV30"][0]["cvssData"]["baseSeverity"]
                elif "cvssMetricV2" in metrics:
                    baseScore = metrics["cvssMetricV2"][0]["cvssData"]["baseScore"]
                    baseSeverity = metrics["cvssMetricV2"][0]["baseSeverity"]
                    

    scoreMetric = {
        "baseScore": baseScore,
        "baseSeverity": baseSeverity
    }
    
    return scoreMetric

In [12]:
"""
    Retrieves a list of CVE identifiers related to the specified software from the Vulners API.

    Args:
        software (str): The name of the software to search for vulnerabilities.

    Returns:
        list: A list of CVE identifiers related to the specified software.
"""
def getVulnersCVE(software):
    # URL for the Vulners API search endpoint
    url = 'https://vulners.com/api/v3/search/lucene/'

    # Construct the query string to search for the specified software
    query = f"software:'{software}'"

    # Define the parameters for the API request
    params = {
        "query": query,  # Query parameter
        "size" : 4       # Number of results to return
    }

    try:
        # Make the GET request to the Vulners API with the specified URL and parameters
        response = requests.get(url, params=params)
        
        # Raise an exception if the response indicates an HTTP error
        response.raise_for_status()

        # Parse the response JSON data
        data = response.json()
        
        # Check if the response contains the expected data structure
        if "data" in data and "search" in data["data"]:
            cve_list = []  # Initialize an empty list to store CVE identifiers

            # Iterate over each item in the search results
            for item in data["data"]["search"]:
                # Retrieve the list of CVE identifiers from the "_source" field
                cvelist = item.get("_source", {}).get("cvelist", [])
                # Extend the cve_list with the retrieved CVE identifiers
                cve_list.extend(cvelist)
                
            return cve_list  # Return the list of CVE identifiers
        else:
            # Print a message if the response does not contain the expected data
            print("La respuesta no contiene los datos esperados:", data)
            return []  # Return an empty list
    except requests.RequestException as e:
        # Print an error message if there is an issue with the request
        print("Error al realizar la solicitud:", e)
        return []  # Return an empty list

In [13]:
"""
    Searches for vulnerabilities related to the specified software product.

    Args:
        product (str): The name of the software product to search for vulnerabilities.

    Returns:
        list: A list of dictionaries, each containing details about a specific vulnerability,
              including the CVE identifier, affected products, and CVSS metrics.
"""
def seachSWVulnerabilities(product):
    # Retrieve a list of CVE identifiers related to the specified product
    cve_list = getVulnersCVE(product)

    # Initialize an empty list to store detailed vulnerability information
    swVulnerabilities = []

    # Access each element within 'cve_list'
    for cve in cve_list:
        print(cve)  # Print the CVE identifier for debugging or logging purposes
        
        # Create a dictionary with detailed information about the vulnerability
        vulnerability = {
            "cve": cve,  # The CVE identifier
            "affect": get_affected_products(cve),  # Retrieve affected products for the CVE
            "metrics": getCVSSmetrics2(cve)  # Retrieve CVSS metrics for the CVE
        }

        # Append the vulnerability dictionary to the list of software vulnerabilities
        swVulnerabilities.append(vulnerability) 

    # Return the list of detailed vulnerabilities
    return swVulnerabilities

In [14]:
"""
    Retrieves data from a MongoDB database using a specified endpoint.

    Returns:
        dict: A dictionary containing the response data retrieved from the MongoDB database.
"""
def getPcDataBase():
    # URL for the MongoDB API endpoint to retrieve data
    url = "https://us-east-1.aws.data.mongodb-api.com/app/data-bcurboy/endpoint/data/v1/action/find"

    # Payload specifying the collection, database, dataSource, and filter for the MongoDB query
    payload = {
        "collection": "softwareByIP",   # Collection to query
        "database": "vulnsData",         # Database to query
        "dataSource": "Cluster0",        # Data source to use
        "filter": {}                      # Filter criteria (empty for fetching all data)
    }

    # Headers for the HTTP request, including content type and API key
    headers = {
        'Content-Type': 'application/ejson',  # Specify the content type as EJSON
        'apiKey': '8zblZqSy1gtnZqmSd8YCsyHqXuMmzCfMdKaa22xY5lc8RzVQMGfEF3W3Lwt6Y2G1'  # API key for authentication
    }

    # Make a POST request to the MongoDB API endpoint with the specified URL, headers, and payload
    response = requests.post(url, headers=headers, json=payload)

    # Return the response data in JSON format
    return response.json()


In [15]:
"""
    Save JSON data as a text file.

    Args:
        json_data (dict): The JSON data to be saved.

    Returns:
        None
"""
def save_txt(json_data):

    # Define the file path where the text file will be saved
    ruta = r"C:\Users\{USER}}\Documents\archivo.txt"

    try:
        # Open the file in write mode and write the JSON data to it
        with open(ruta, 'w') as archivo:
            # Convert the JSON data to a string with indentation and write it to the file
            archivo.write(json.dumps(json_data, indent=4))
        print("JSON guardado como archivo de texto en:", ruta)  # Print a success message
    except Exception as e:
        # Print an error message if there is an issue with saving the JSON data to a text file
        print("Error al guardar el JSON como archivo de texto:", e)


In [18]:
"""
    Searches for vulnerabilities related to the software installed on a given PC.

    Args:
        pc (dict): Dictionary containing information about the PC and its installed software.

    Returns:
        None
"""
def seachPCSoftwareVulnerabilities(pc):
    # Build the JSON document to be saved in the database
    swVulnerabilities = {
        'pc': f"{pc['hostname']}",              # PC hostname
        'operativeSystem': pc["operativeSystem"],  # Operating system of the PC
        "softwares": [],                        # List to store software vulnerabilities
        "analysisDate": datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # Current analysis date
    }
    
    print(pc["hostname"])  # Print the PC hostname for debugging or logging purposes
    
    # Iterate over each entry in the 'softwareData' of the PC
    for software_entry in pc["softwareData"]:
        name = software_entry["Name"]  # Get the name of the software
        print(name)  # Print the name of the software for debugging or logging purposes
        
        # Check if the software is not "Atmel Segger USB Drivers (501e)"
        if name != "Atmel Segger USB Drivers (501e)":
            # Search for vulnerabilities related to the software and get the results
            vulnerabilities = seachSWVulnerabilities(name)
            
            # Create a dictionary for the software entry with its vulnerabilities
            software = {
                "software": name,                # Name of the software
                "vulnerabilities": vulnerabilities  # List of vulnerabilities for the software
            }
            
            # Append the software entry to the list of software vulnerabilities
            swVulnerabilities['softwares'].append(software)

    # Print the JSON result for debugging or logging purposes
    print(json.dumps(swVulnerabilities, indent=4))
    
    # Save the data to MongoDB
    save_Mongo(swVulnerabilities)
    
    # Save the data to a text file
    save_txt(swVulnerabilities)

In [None]:
# Iterate over a range of 18 (from 0 to 17)
for i in range(0, 18):
    # Retrieve data from the MongoDB database using getPcDataBase function
    pc_data = getPcDataBase()['documents'][i]
    
    # Call the seachPCSoftwareVulnerabilities function with the retrieved PC data
    seachPCSoftwareVulnerabilities(pc_data)