<a href="https://colab.research.google.com/github/ds7389/CS_6233-Introduction_to_OS-Rearch_Project/blob/main/Int_to_OS_RESEARCH_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*Disclaimer:*
"This product uses data from the NVD API but is not endorsed or certified by the NVD."


All NIST publications are available in the public domain according to Title 17 of the United States Code, however services which utilize or access the NVD are asked to display the following notice prominently within the application:

"This product uses data from the NVD API but is not endorsed or certified by the NVD."

You may use the NVD name to identify the source of the data. You may not use the NVD name, to imply endorsement of any product, service, or entity, not-for-profit, commercial or otherwise. For information on how to the cite the NVD, including the database's Digital Object Identifier (DOI), please consult NIST's Public Data Repository.

Reference Google Sheets Workbook:  https://docs.google.com/spreadsheets/d/1MZ-RE-MddD1OQr2IyKb7IAD8-KXpCLZ1RZ18fqMDHZo/edit?usp=sharing

# Setup

In [1]:
###--- Mount Drive ---###
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
###-- Libraries ---###
import json
import requests
import os
import time
import datetime
import calendar # Import the calendar module to get the number of days in a month
import pandas as pd
import random



In [3]:
%pip install xlsxwriter

Collecting xlsxwriter
  Downloading xlsxwriter-3.2.5-py3-none-any.whl.metadata (2.7 kB)
Downloading xlsxwriter-3.2.5-py3-none-any.whl (172 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.3/172.3 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.5


## Helper Functions

In [4]:
################################################################################
### Helper Functions
################################################################################

def path_to_file_in_project_folder(file_name: str) -> str:
  """
  Takes the file name, and gives it a fully qualified path to the project folder.
  Returns:
    str: The fully qualified path to the file.
  """
  file_path = os.path.join('/content/drive/MyDrive/CY-GYV_6233/Reserach_Paper/DATA', file_name)
  return file_path

def path_to_file_in_top_level(file_name: str) -> str:
  """
  Takes the file name, and gives it a fully qualified path to the top level folder.
  Returns:
    str: The fully qualified path to the file.
  """
  file_path = os.path.join('/content/drive/MyDrive', file_name)
  return file_path


def json_to_dict(file_path: str) -> dict:
  """
  Take a json file name and convert it into a dictionary to manipulate.
  The default location is the DATA folder inside this project.
  """
  try:
      with open(file_path, 'r') as f:
          dict_name = json.load(f)
          return dict_name
  except FileNotFoundError:
      print(f"Error: The file {json_file} was not found.")
      dict_name = None # Set nist_data to None if the file is not found
  except json.JSONDecodeError:
      print(f"Error: Could not decode JSON from {json_file}. Check the file format.")
      dict_name = None # Set nist_data to None if there's a JSON decoding error

def dict_to_list(dict_name: dict) -> list:
  """
  Take a dictionary and convert it into a list to manipulate.
  """
  dict_list = []
  for year, entries in dict_name.items():
    dict_list.append(entries)
  return dict_list


def json_to_list(json_file: str) -> list:
  """
  Take a json file path and convert it into a list to manipulate.
  """
  file_path = os.path.join('/content/drive/MyDrive/CY-GY_6233/Reserach_Paper/DATA', json_file)
  try:
      with open(file_path, 'r') as f:
          data_list = json.load(f)
          if isinstance(data_list, list):
              return data_list
          else:
              print(f"Warning: JSON file {file_path} does not contain a list at the top level. Returning loaded data as list anyway.")
              forced_list = dict_to_list(data_list)
              return forced_list
              #return
              #return data_list # Return the loaded data even if it's not a list
  except FileNotFoundError:
      print(f"Error: The file {file_path} was not found.")
      return None # Return None if the file is not found
  except json.JSONDecodeError:
      print(f"Error: Could not decode JSON from {file_path}. Check the file format.")
      return None # Return None if there's a JSON decoding error


def remove_rejected_vulnerabilities(data):
  """
  Removes vulnerabilities with 'vulnStatus' equal to 'Rejected' from a dictionary of vulnerabilities.
  Args:
      data (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
  Returns:
      dict: A new dictionary with 'Rejected' vulnerabilities removed.
  """
  cleaned_data = {}
  if data:
      for year, vulnerabilities in data.items():
          cleaned_data[year] = []
          for vulnerability in vulnerabilities:
              # Check if the vulnerability has the 'cve' key and 'vulnStatus' within 'cve'
              if 'cve' in vulnerability and 'vulnStatus' in vulnerability['cve']:
                  if vulnerability['cve']['vulnStatus'] != 'Rejected':
                      cleaned_data[year].append(vulnerability)
              else:
                  # If 'cve' or 'vulnStatus' is missing, keep the entry (or decide to discard based on requirements)
                  # For now, let's assume we keep entries that don't have this structure
                  cleaned_data[year].append(vulnerability)
  return cleaned_data





def filter_vulnerabilities_by_keyword(data: dict, keywords: list) -> dict:
  """
  Filters a dictionary of vulnerabilities by keywords in the description.
  Args:
      data (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
      keywords (list): A list of strings representing keywords to filter by.
  Returns:
      dict: A new dictionary containing only the vulnerabilities that match the keywords.
  prompt:
      Create a helper function that takes the cleaned_data dict, and removes any entries that contain keywords from an array and output the results in a new dict called filtered_data.
  """
  filtered_data = {}
  for year, vulnerabilities in data.items():
      filtered_data[year] = []
      for vulnerability in vulnerabilities:
          # Check if the vulnerability has descriptions
          if 'cve' in vulnerability and 'descriptions' in vulnerability['cve']:
              # Concatenate all description values into a single string for searching
              description_text = " ".join([desc['value'] for desc in vulnerability['cve']['descriptions']])
              # Check if any of the keywords are present in the description (case-insensitive)
              if any(keyword.lower() in description_text.lower() for keyword in keywords):
                  filtered_data[year].append(vulnerability)
          # Optionally, you could add an else here to handle entries without descriptions
          # else:
          #     print(f"Entry missing descriptions: {vulnerability.get('cve', {}).get('id', 'N/A')}")
  return filtered_data



def filter_vulnerabilities_by_keyword(data: dict, keywords: list) -> dict:
  """
  Filters a dictionary of vulnerabilities by keywords in the description.
  Args:
      data (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
      keywords (list): A list of strings representing keywords to filter by.
  Returns:
      dict: A new dictionary containing only the vulnerabilities that match the keywords.
  prompt:
      Create a helper function that takes the cleaned_data dict, and removes any entries that contain keywords from an array and output the results in a new dict called filtered_data.
  """
  filtered_data = {}
  for year, vulnerabilities in data.items():
      filtered_data[year] = []
      for vulnerability in vulnerabilities:
          # Check if the vulnerability has descriptions
          if 'cve' in vulnerability and 'descriptions' in vulnerability['cve']:
              # Concatenate all description values into a single string for searching
              description_text = " ".join([desc['value'] for desc in vulnerability['cve']['descriptions']])
              # Check if any of the keywords are present in the description (case-insensitive)
              if any(keyword.lower() in description_text.lower() for keyword in keywords):
                  filtered_data[year].append(vulnerability)
          # Optionally, you could add an else here to handle entries without descriptions
          # else:
          #     print(f"Entry missing descriptions: {vulnerability.get('cve', {}).get('id', 'N/A')}")
  return filtered_data


def dict_to_dataframe(data_dict: dict):
  """
  Converts a dictionary of vulnerability data (structured by year) into a pandas DataFrame.
  Args:
      data_dict (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
  Returns:
      pd.DataFrame: A pandas DataFrame containing all vulnerabilities from the dictionary.
  """
  all_vulnerabilities = []
  for year, vulnerabilities in data_dict.items():
      all_vulnerabilities.extend(vulnerabilities)
  # Normalize the nested JSON data into a flat table
  df = pd.json_normalize(all_vulnerabilities)
  return df



#def dataframe_to_json(dataframe: DataFrame, file_path: str) -> file:
def dataframe_to_json(dataframe, file_path: str):
  """
  Saves a pandas DataFrame to a JSON file.
  Args:
      dataframe (pd.DataFrame): The pandas DataFrame to save.
      file_path (str): The path where the JSON file will be saved.
  """
  dataframe.to_json(file_path, indent=4, orient='records')
  print(f"Successfully saved DataFrame to {file_path}")



def remove_vulnerabilities_by_id(data: dict, cve_ids_to_remove: list) -> dict:
   """
   Removes entries from a dictionary of vulnerabilities based on a list of cve.ids.
   Args:
       data (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
       cve_ids_to_remove (list): A list of strings representing the cve.ids to remove.
   Returns:
       dict: A new dictionary with the specified vulnerabilities removed.
   """
   cleaned_data = {}
   for year, vulnerabilities in data.items():
       cleaned_data[year] = []
       for vulnerability in vulnerabilities:
           cve_id = vulnerability.get('cve', {}).get('id')
           if cve_id and cve_id not in cve_ids_to_remove:
               cleaned_data[year].append(vulnerability)
           elif not cve_id:
                # Optionally, you could add an else here to handle entries without cve.id
                # print(f"Entry missing cve.id: {vulnerability}")
                cleaned_data[year].append(vulnerability)
   return cleaned_data



def get_cve_ids_by_keyword(data: dict, keywords: list) -> list:
  """
  Finds and returns the cve.ids of vulnerabilities that contain any of the specified keywords in their description.
  This works in conjunction with remove_vulnerabilities_by_id to identify entries to remove.
  Args:
      data (dict): A dictionary where keys are years and values are lists of vulnerability dictionaries.
      keywords (list): A list of strings representing keywords to search for.
  Returns:
      list: A list of cve.ids for the matching vulnerabilities.
  """
  matching_cve_ids = []
  for year, vulnerabilities in data.items():
      for vulnerability in vulnerabilities:
          # Check if the vulnerability has descriptions
          if 'cve' in vulnerability and 'descriptions' in vulnerability['cve']:
              # Concatenate all description values into a single string for searching
              description_text = " ".join([desc['value'] for desc in vulnerability['cve']['descriptions']])
              # Check if any of the keywords are present in the description (case-insensitive)
              if any(keyword.lower() in description_text.lower() for keyword in keywords):
                  cve_id = vulnerability.get('cve', {}).get('id')
                  if cve_id:
                      matching_cve_ids.append(cve_id)
  return matching_cve_ids


def count_entries(data_dict: dict) -> tuple[dict, int]:
  """
  Counts the number of entries per year and the total number of entries in a dictionary.
  Args:
      data_dict (dict): A dictionary where keys are years and values are lists of entries.
  Returns:
      tuple: A tuple containing:
          - dict: A dictionary with years as keys and entry counts as values.
          - int: The total number of entries.
  """
  entries_per_year = {}
  total_entries = 0
  for year, entries in data_dict.items():
    count = len(entries)
    entries_per_year[year] = count
    total_entries += count
  return entries_per_year, total_entries


def print_entries_total_and_per_year(data_dict: dict):
  """
  Prints the total number of entries and the number of entries per year in a formatted way.
  """

  entries_per_year_count, total_entries_count = count_entries(data_dict)
  print("\nNumber of entries per year:")
  for year, count in entries_per_year_count.items():
      print(f"Year {year}: {count} entries")
  print(f"\nTotal number of entries: {total_entries_count}")


def display_random_cve_ids_and_descriptions(data_dict: dict, num_entries: int = 10):
  """
  Randomly selects a specified number of entries from a dictionary of vulnerabilities,
  extracts their CVE ID and English description, and displays them in a pandas DataFrame.
  Args:
      data_dict (dict): A dictionary where values are lists of entries.
      num_entries (int): The number of random entries to display.
  """
  if not data_dict:
      print("Error: Input dictionary is empty.")
      return
  # Flatten the dictionary into a single list of all entries
  all_entries = []
  for key, entries in data_dict.items():
      all_entries.extend(entries)
  # Check if the flattened list is empty
  if not all_entries:
      print("Dataset is empty after flattening. Cannot display random entries.")
      return
  # Select random entries from the flattened list
  # Use min() to handle cases where the list has fewer than num_entries
  random_entries = random.sample(all_entries, min(num_entries, len(all_entries)))
  # Extract CVE ID and English description for the selected entries
  extracted_data = []
  for entry in random_entries:
      cve_id = entry.get('cve', {}).get('id')
      english_description = None
      descriptions = entry.get('cve', {}).get('descriptions', [])
      for desc in descriptions:
          if desc.get('lang') == 'en':
              english_description = desc.get('value')
              break  # Assuming there's only one English description
      if cve_id and english_description:
          extracted_data.append({'CVE ID': cve_id, 'English Description': english_description})
      elif cve_id:
          extracted_data.append({'CVE ID': cve_id, 'English Description': 'N/A (No English description found)'})
  # Create a pandas DataFrame from the extracted data
  if extracted_data:
      df = pd.DataFrame(extracted_data)
      print(f"\nDisplaying {len(df)} random CVE IDs and English descriptions:")
      display(df)
  else:
      print("No entries with both CVE ID and English description found in the random sample.")




## Project Controls

In [5]:
###---  Control Varibales ---###
DEBUG = False               # Set to True if need to debug funcationality
ORIGINAL_DATA = False       # Set to True if you want to repull the original dataset.

In [6]:
# Define the file path in Google Drive
# You can change 'nvd_vulnerabilities.json' to your desired filename
json_file_path = path_to_file_in_top_level('nvd_vulnerabilities.json')



print(json_file_path)

/content/drive/MyDrive/nvd_vulnerabilities.json


# Data Work

## Load Dataset

In [7]:
################################################################################
### Load the saved data from Google Drive and store as nvd_data
################################################################################

# Function
nvd_data = json_to_dict(json_file_path)

# Output
print_entries_total_and_per_year(nvd_data)



Number of entries per year:
Year 2020: 19222 entries
Year 2021: 21950 entries
Year 2022: 26431 entries
Year 2023: 30949 entries
Year 2024: 40704 entries
Year 2025: 28675 entries

Total number of entries: 167931


## Filter Data Set:

In [8]:
################################################################################
# Remove rejected vulnerabilities from nvd_data
################################################################################

# Function
cleaned_data = remove_rejected_vulnerabilities(nvd_data)


# Output
print_entries_total_and_per_year(cleaned_data)



Number of entries per year:
Year 2020: 18322 entries
Year 2021: 20150 entries
Year 2022: 25074 entries
Year 2023: 28818 entries
Year 2024: 39970 entries
Year 2025: 27562 entries

Total number of entries: 159896


In [9]:
################################################################################
# Filter the cleaned_data  for IOT related CVEs using the helper function
################################################################################


# Define a list of keywords related to IoT (can be expanded)
iot_keywords = ["IoT", "Internet of Things", "smart home", "connected device", "embedded device", "wireless sensor", "industrial IoT", "IIoT"]

# Function
filtered_data = filter_vulnerabilities_by_keyword(cleaned_data, iot_keywords)

# Output
print_entries_total_and_per_year(filtered_data)


Number of entries per year:
Year 2020: 661 entries
Year 2021: 726 entries
Year 2022: 544 entries
Year 2023: 79 entries
Year 2024: 121 entries
Year 2025: 71 entries

Total number of entries: 2202


In [10]:
################################################################################
# Get the cve.ids of entries containing the keyword "biblioteca" from the
#   filtered_data and save as a list matching_ids
#
#     "bibl(IOT)teca" is a false positive result
#
################################################################################

# Define a list of keywords to search for
search_keywords = ["biblioteca"] # Replace with your desired keywords


# Function
matching_ids = get_cve_ids_by_keyword(filtered_data, search_keywords)


# Output
print(f"\nNumber of matching entries: {len(matching_ids)}")


Number of matching entries: 1057


In [11]:
################################################################################
### Remove all the  CVE's with biblioteca
################################################################################

# Function
data_with_removed_ids = remove_vulnerabilities_by_id(filtered_data, matching_ids)


# Output
print_entries_total_and_per_year(data_with_removed_ids)


Number of entries per year:
Year 2020: 341 entries
Year 2021: 295 entries
Year 2022: 295 entries
Year 2023: 65 entries
Year 2024: 95 entries
Year 2025: 54 entries

Total number of entries: 1145


In [12]:
################################################################################
# Cross reference the data_with_removed_ids with Memory Keywords
################################################################################

# Keywords or CWEs for Memory issues (this might need refinement based on actual data)
# Common memory-related CWEs include:
# CWE-119: Improper Restriction of Operations within the Bounds of a Memory Buffer
# CWE-120: Buffer Copy without Checking Size of Input ('Classic Buffer Overflow')
# CWE-125: Out-of-bounds Read
# CWE-416: Use After Free
# CWE-787: Out-of-bounds Write
memory_keywords = ["memory", "buffer overflow", "out-of-bounds read", "out-of-bounds write", "use after free", "CWE-119", "CWE-120", "CWE-125", "CWE-416", "CWE-787"]

# Function
cross_referenced_data = filter_vulnerabilities_by_keyword(data_with_removed_ids, memory_keywords)

# Output
print_entries_total_and_per_year(cross_referenced_data)







Number of entries per year:
Year 2020: 134 entries
Year 2021: 112 entries
Year 2022: 118 entries
Year 2023: 13 entries
Year 2024: 14 entries
Year 2025: 7 entries

Total number of entries: 398


In [13]:
################################################################################
# Display random CVE ID's with their English Descriptions in a Pandas Table
################################################################################


# Example usage (assuming cross_referenced_data is defined):
display_random_cve_ids_and_descriptions(cross_referenced_data, 25)



Displaying 25 random CVE IDs and English descriptions:


Unnamed: 0,CVE ID,English Description
0,CVE-2019-10603,Use after free issue occurs If the real device...
1,CVE-2021-22547,"In IoT Devices SDK, there is an implementation..."
2,CVE-2022-35886,Four format string injection vulnerabilities e...
3,CVE-2022-36054,"Contiki-NG is an open-source, cross-platform o..."
4,CVE-2022-41873,"Contiki-NG is an open-source, cross-platform o..."
5,CVE-2021-1923,Incorrect pointer argument passed to trusted a...
6,CVE-2020-11308,Buffer overflow occurs when trying to convert ...
7,CVE-2021-30349,Improper access control sequence for AC databa...
8,CVE-2020-18735,A heap buffer overflow in /src/dds_stream.c of...
9,CVE-2021-1983,Possible buffer overflow due to improper handl...


In [14]:
################################################################################
# Count the CVE id's that pertain to Snapdragon
################################################################################

search_keywords = ["snapdragon"] # Replace with your desired keywords

# Function
matching_ids = get_cve_ids_by_keyword(cross_referenced_data, search_keywords)

# Output
#print("\nCVE IDs of entries containing the keywords:")
#display(matching_ids)
print(f"\nNumber of matching entries: {len(matching_ids)}")


################################################################################
### Remove all the  CVE's related to Snapdragon
################################################################################

# Remove entries from the cleaned_data using the helper function
extra_small_data = remove_vulnerabilities_by_id(cross_referenced_data, matching_ids)

# Output
print_entries_total_and_per_year(extra_small_data)


Number of matching entries: 305

Number of entries per year:
Year 2020: 17 entries
Year 2021: 17 entries
Year 2022: 25 entries
Year 2023: 13 entries
Year 2024: 14 entries
Year 2025: 7 entries

Total number of entries: 93
