<a href="https://colab.research.google.com/github/Akshansh-Kulshrestha/ArchiveInfotech/blob/main/CVE_DATA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
import os
import json

# Create a directory for CVE data
CVE_DATA_PATH = "./cve_data"
os.makedirs(CVE_DATA_PATH, exist_ok=True)

# Upload CVE JSON files
print("Upload your CVE JSON files:")
uploaded = files.upload()

# Save uploaded files
for filename, content in uploaded.items():
    with open(os.path.join(CVE_DATA_PATH, filename), "wb") as f:
        f.write(content)
print("Files uploaded successfully!")


In [None]:
from IPython import get_ipython
from IPython.display import display
import os
import json
import unittest
import tkinter as tk
from tkinter import ttk, messagebox
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import threading
import csv
import pygraphviz
import seaborn as sns
import requests
import time


# Local CVE data path
CVE_DATA_PATH = "./cve_data"  # Change this path to the folder containing your local CVE data

# Load CVE data from the local machine
def load_cve_data(cve_id):
    """
    Load CVE details from local storage or the NVD server.
    Args:
        cve_id (str): The CVE ID to load.
    Returns:
        dict: CVE details.
    """
    file_path = os.path.join(CVE_DATA_PATH, f"{cve_id}.json")
    if os.path.exists(file_path):
        with open(file_path, "r") as file:
            return json.load(file)
    else:
        print(f"Warning: {cve_id} not found locally. Fetching from NVD...")
        time.sleep(1)  # Wait for 1 second before making the request

        # Fetch from NVD
        try:
            nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0/{cve_id}"
            response = requests.get(nvd_url)
            response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
            nvd_data = response.json()

            # Extract relevant details (customize as needed)
            cve_details = {
                "id": cve_id,
                "details": nvd_data.get("result", {}).get("CVE_Items", [{}])[0].get("cve", {}).get("description", {}).get("description_data", [{}])[0].get("value", "No description available"),
                # Add other details you need here (e.g., severity, impact, etc.)
                # ...
            }

            # Save the fetched CVE data to local storage
            with open(file_path, "w") as f:
                json.dump(cve_details, f, indent=4)

            return cve_details

        except requests.exceptions.HTTPError as errh:
            if errh.response.status_code == 403:
                print(f"Error fetching {cve_id} from NVD: 403 Forbidden. This CVE might be deprecated or rate limited.")
                # Handle 403 error - retry, skip, or use fallback data
                # ...
            else:
                print(f"Error fetching {cve_id} from NVD: {errh}")
                return {"id": cve_id, "details": "No data available"}

        except requests.exceptions.RequestException as e:
            print(f"Error fetching {cve_id} from NVD: {e}")
            return {"id": cve_id, "details": "No data available"}


def test_load_cve_data_manual():
    """Test the load_cve_data function with manual input, fetching from NVD if needed."""
    while True:
        cve_id = input("Enter CVE ID (or 'q' to quit): ")
        if cve_id.lower() == 'q':
            break
        # If you intend to test against expected values, you may enter them here:
        expected_details = input("Enter expected details (or leave blank for default): ")

        if not expected_details:
            # Check if the file exists for the CVE ID
            file_path = os.path.join(CVE_DATA_PATH, f"{cve_id}.json")
            if os.path.exists(file_path):
                # If the file exists, load and use its "details" as expected details
                with open(file_path, "r") as f:
                    expected_details = json.load(f).get("details", "No details in file") # Handle case where "details" key might be missing
            else:
                # Assume it will be fetched from NVD
                expected_details = None

        # Force fetching from NVD by removing local file (if present) before load operation
        file_path = os.path.join(CVE_DATA_PATH, f"{cve_id}.json")
        if os.path.exists(file_path):
            os.remove(file_path)

        cve_data = load_cve_data(cve_id)
        assert cve_data is not None
        assert "id" in cve_data
        assert cve_data["id"] == cve_id

        # If expecting specific data, provide expected_details during the unit test
        if expected_details is not None:
           # if expecting detailed match, use:
           # assert expected_details == cve_data["details"]
           # if expecting a partial match (e.g., testing that data has id in its details):
           assert expected_details in cve_data["details"]

# Construct tree for a single CVE
def construct_vulnerability_tree_from_local(cve_id, depth=1):
    """
    Construct a vulnerability tree using local data starting with a given CVE ID.
    """
    tree = nx.DiGraph()
    visited = set()

    def add_node_and_related(cve_id, current_depth):
        if cve_id in visited or current_depth > depth:
            return
        visited.add(cve_id)

        cve_data = load_cve_data(cve_id)
        tree.add_node(cve_id, details=cve_data)

        related_cves = cve_data.get("related_cves", [])
        for related_cve in related_cves:
            tree.add_edge(cve_id, related_cve)
            add_node_and_related(related_cve, current_depth + 1)

    add_node_and_related(cve_id, 1)
    return tree

# Batch processing
def construct_batch_vulnerability_tree(cve_ids, depth=1):
    """
    Construct a merged vulnerability tree from multiple CVEs.
    """
    merged_tree = nx.DiGraph()
    for cve_id in cve_ids:
        print(f"Building tree for CVE: {cve_id}")
        individual_tree = construct_vulnerability_tree_from_local(cve_id, depth)
        merged_tree = nx.compose(merged_tree, individual_tree)
    return merged_tree

# Visualization with Matplotlib
def visualize_tree_with_matplotlib(tree):
    """
    Visualize the vulnerability tree using Matplotlib with a colorful palette.
    """
    plt.figure(figsize=(12, 10))
    pos = nx.nx_agraph.graphviz_layout(tree, prog="dot")

    node_sizes = []
    node_colors = []
    edge_colors = []

    for node, data in tree.nodes(data=True):
        severity = data.get("details", {}).get("impact", {}).get("baseMetricV3", {}).get("cvssV3", {}).get("baseSeverity", "Unknown")
        node_sizes.append(1000 + (1000 * (["LOW", "MEDIUM", "HIGH", "CRITICAL"].index(severity) if severity in ["LOW", "MEDIUM", "HIGH", "CRITICAL"] else 0)))
        node_colors.append(severity_palette.get(severity, "gray"))

    for u, v, data in tree.edges(data=True):
        target_severity = tree.nodes[v].get("details", {}).get("impact", {}).get("baseMetricV3", {}).get("cvssV3", {}).get("baseSeverity", "Unknown")
        edge_colors.append(severity_palette.get(target_severity, "gray"))

    nx.draw(tree, pos, with_labels=True, node_size=node_sizes, node_color=node_colors,
            font_size=10, font_weight="bold", edge_color=edge_colors, width=2) # Use edge colors, increased width

    labels = {node: f"{node}\nSeverity: {tree.nodes[node].get('details', {}).get('impact', {}).get('baseMetricV3', {}).get('cvssV3', {}).get('baseSeverity', 'Unknown')}" for node in tree.nodes()}
    nx.draw_networkx_labels(tree, pos, labels, font_size=8, font_color="black")

    plt.title("Vulnerability Tree Visualization", fontsize=15)
    plt.show()

# Export tree to JSON
def export_tree_to_json(tree, file_name="vulnerability_tree.json"):
    """
    Export the vulnerability tree to a JSON file.
    """
    tree_data = {
        "nodes": [{"id": node, "details": data.get("details", {})} for node, data in tree.nodes(data=True)],
        "edges": list(tree.edges())
    }
    with open(file_name, "w") as json_file:
        json.dump(tree_data, json_file, indent=4)
    print(f"Tree data exported to {file_name}.")

# Export tree to CSV
def export_tree_to_csv(tree, file_name="vulnerability_tree.csv"):
    """
    Export the vulnerability tree to a CSV file.
    """
    with open(file_name, "w", newline="") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["Node ID", "Details", "Connected To"])
        for node, data in tree.nodes(data=True):
            connected_nodes = list(tree.successors(node))
            writer.writerow([node, data.get("details", {}), connected_nodes])
    print(f"Tree data exported to {file_name}.")

# GUI Code
def start_analysis():
    """
    Start analysis based on user input.
    """
    def run_analysis():
        cve_ids = entry_cves.get().split(",")
        depth = int(entry_depth.get())
        severity_filter = entry_severity.get().upper().split(",")

        tree = construct_batch_vulnerability_tree(cve_ids, depth)
       # filtered_tree = filter_tree_by_severity(tree, severity_filter)

        # Always use Matplotlib for visualization in this environment
        #visualize_tree_with_matplotlib(filtered_tree)
        visualize_tree_with_matplotlib(tree)
        # messagebox.showinfo("Success", "Interactive tree saved as 'vulnerability_tree.html'.") # This line and PyVis options can be removed

        if export_option.get() == "JSON":
            export_tree_to_json(tree)
        elif export_option.get() == "CSV":
            export_tree_to_csv(tree)

       # messagebox.showinfo("Done", "Analysis completed successfully!")

    threading.Thread(target=run_analysis).start()

# Input fields
cve_ids = input("Enter CVE IDs (comma-separated): ").split(" ")
depth = int(input("Enter tree depth (1-3): "))
severity_filter = input("Enter severity levels (LOW, MEDIUM, HIGH, ALL): ").upper().split(",")
export_format = input("Choose export format (JSON, CSV, None): ").upper()

# Analysis
tree = construct_batch_vulnerability_tree(cve_ids, depth)
#filtered_tree1 = filter_tree_by_severity(tree, severity_filter)

# Visualization
visualize_tree_with_matplotlib(tree)

# Export
if export_format == "JSON":
    export_tree_to_json(tree)
elif export_format == "CSV":
    export_tree_to_csv(tree)

# Create a test suite
suite = unittest.TestSuite()

# Add the test function to the suite
suite.addTest(unittest.FunctionTestCase(test_load_cve_data_manual))

# Create a test runner and run the suite
runner = unittest.TextTestRunner()
runner.run(suite)

print("Analysis completed successfully!")