In [1]:
# %%capture
%pip install nvdlib streamlit google-generativeai datetime

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
from nvdlib import searchCVE
import streamlit as st
import datetime
from dotenv import load_dotenv 
import time

used AI to generate a more comprehensive keyword list to use it for the filter to help the company reduce the costs **;)**

In [3]:
KEYWORDS = [
    # --- General Terms ---
    "SCADA", "ICS", "Industrial Control", "HMI", "PLC", "RTU", "DCS",
    "SIS", "Process Control", "Operational Technology",

    # --- Protocols (The languages machines speak) ---
    "Modbus", "DNP3", "Profinet", "Profibus", "EtherNet/IP", "BACnet",
    "OPC UA", "IEC 61850", "EtherCAT", "CIP", "MMS",

    # --- Major Vendors (The big players) ---
    "Siemens", "Rockwell", "Schneider", "ABB", "Honeywell",
    "Emerson", "Mitsubishi", "Omron", "Yokogawa", "General Electric", "Fanuc",

    # --- Specific Product Lines (High probability of being OT) ---
    "Simatic", "WinCC", "Tia Portal",  # Siemens
    "Logix", "FactoryTalk", "Rslinx",  # Rockwell/Allen-Bradley
    "DeltaV", "Ovation",               # Emerson
    "Triconex", "Foxboro",             # Schneider
    "Centum", "ProSafe",               # Yokogawa
    "Wonderware", "Citect"             # AVEVA/Schneider
]

## Function to detect potential threats
this also filters out unrelated threats before sending to LLM to check

In [4]:
def is_potential_ot(description):
  if not description:
    return False
  return any(keyword.lower() in description.lower() for keyword in KEYWORDS)

## using Gemini to analyze the descriptions of the CVEs

In [None]:
import json
import os
import google.generativeai as genai

load_dotenv()
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)

model = genai.GenerativeModel(
        model_name='gemini-2.5-flash-lite',
        generation_config={'response_mime_type':'application/json'} # set the output format
    )

def analyze_with_gemini(description):
    short_desc = description[:800] 
    
    prompt=f"""
    You are an expert OT threat analyst with 160 IQ.
    Throughly analyze the following CVE description.
    Return ONLY a JSON object with exactly these keys:
    1. "ot_related" : boolean (True if OT/ICS/SCADA related, False otherwise).
    2. "reason" : string (an expert-level, detailed explanation of why. If "ot_related" is True, explain why this vulnerability is dangerous).

    ---------------------
    Description:
    ---------------------
    {short_desc}
    """

    attempts = 0
    while attempts < 3:
        try:
            response = model.generate_content(prompt)
            return json.loads(response.text)
        except Exception as e:
            if "429" in str(e):
                wait_time = (attempts + 1) * 20 # 20s, then 40s, then 60s
                print(f"Quota hit. Backing off for {wait_time}s...")
                time.sleep(wait_time)
                attempts += 1
            else:
                print(f"AI error: {e}")
                return {"ot_related": False, "reason": "Error"}

    try:
        response = model.generate_content(prompt)
        return json.loads(response.text) #used this to convert from JSON object to python dict

    except Exception as e:
        print(f"AI error: {e}")
        return {"ot_related":False, "reason":"Error processing request"} # safe output

AIzaSyAsc_lgHCFeqIrRrYMytGBypXNkdIDxgmE



All support for the `google.generativeai` package has ended. It will no longer be receiving 
updates or bug fixes. Please switch to the `google.genai` package as soon as possible.
See README for more details:

https://github.com/google-gemini/deprecated-generative-ai-python/blob/main/README.md

  import google.generativeai as genai


## Main Program

In [None]:
seen_cves = set()
approved_cves=dict()
dashboard_data=dict()
NVD_API = os.environ.get('NVD_API_KEY')
while True:
  end = datetime.datetime.now()
  start = end - datetime.timedelta(days=1)
  r = searchCVE(pubStartDate=start, pubEndDate=end, key=NVD_API)
  print(f"Fetched {len(r)} records.")

  for cve in r[:5]: # Print first 5 to check
      print(f"{cve.id}: {cve.descriptions[0].value}")
  for cve in r:
    if cve.id not in seen_cves and cve.vulnStatus != "Rejected": #check if this is a new cve and wasn't rejected as a vulnerability
      seen_cves.add(cve.id) #to avoid reprocessing the same cve

      try:
        description = cve.descriptions[0].value #to get the description
        if is_potential_ot(description):  # check if the CVE can be OT related
            print(f"New potential threat is detected: {cve.id}")
            print(f"sending {cve.id} description to LLM for analysis...")
            response = analyze_with_gemini(description)
            time.sleep(5) # making sure to wait 20 seconds between each check to prevent hitting the API limit
            
            if response['ot_related'] == True: # check if the response is OT related to keep/ignore it
              print(f"""
              ###################################
              APPROVED: {cve.id} is an OT threat!
              ###################################
              """)

              try:
                cvss = cve.metrics.cvssMetricV31[0].cvssData.baseScore #new CVEs might not have a severity score. Implemented a safety check
                severity = cve.metrics.cvssMetricV31[0].cvssData.baseSeverity
              except:
                cvss = 'N/A' # if no severity score. set the severity value to 'N/A'
                severity = 'N/A'
              #initialized this to save every new approved CVE
              approved_cves[cve.id] = {'cvss':cvss,
                                       'severity':severity,
                                       'description':f"{description}",
                                       'ai_insight':f'{response['reason']}'}
              new_entry = {'cve_id':f'{cve.id}'}|approved_cves[cve.id] #concatenating the dict to output a separate new output to pass to the dashboard
              
              # output the OT threat found to a JSON file
              try:
                with open('/home/pierreramez/Hard Drive/Projects/ControlPoint Task/output_sample.json',mode='r') as f: #used mode=a to append to existing file and not overwrite existing data
                  data = json.load(f)
                  
                  # checking and correcting the data type
                  if isinstance(data,dict):
                    data = [data]
                  elif not isinstance(data,list):
                    data = []

              except:
                data = []
              
              data.append(new_entry)

              if data:
                with open('/home/pierreramez/Hard Drive/Projects/ControlPoint Task/output_sample.json',mode='w') as f:
                  json.dump(data,f,indent=4)
        else:
          continue

      except:
        continue
  time.sleep(600)

d790bab2-1372-426e-8626-6efb8ffd68d1
Fetched 246 records.
CVE-2025-68658: Open Source Point of Sale (opensourcepos) is a web based point of sale application written in PHP using CodeIgniter framework. opensourcepos 3.4.0 and 3.4.1 has a stored XSS vulnerability exists in the Configuration (Information) functionality. An authenticated user with the permission “Configuration: Change OSPOS's Configuration” can inject a malicious JavaScript payload into the Company Name field when updating Information in Configuration. The malicious payload is stored and later triggered when a user accesses /sales/complete. First select Sales, and choose New Item to create an item, then click on Completed . Due to insufficient input validation and output encoding, the payload is rendered and executed in the user’s browser, resulting in a stored XSS vulnerability. This vulnerability is fixed in 3.4.2.
CVE-2025-68947: NSecsoft 'NSecKrnl' is a Windows driver that allows a local, authenticated attacker to term