Simple algorithm to prioritize CVE

In [3]:
!pip install --upgrade epss
!pip install nvdlib

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting epss
  Downloading epss-0.1.1-py2.py3-none-any.whl (4.9 kB)
Installing collected packages: epss
Successfully installed epss-0.1.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting nvdlib
  Downloading nvdlib-0.5.7-py3-none-any.whl (11 kB)
Installing collected packages: nvdlib
Successfully installed nvdlib-0.5.7


In [4]:
import requests
import pandas as pd
import matplotlib.pyplot as plt 

Assuming we have a list of vulnerabilities in our environment below

In [5]:
MY_VULNS = ['CVE-2022-0540','CVE-2022-24160','CVE-2010-0379','CVE-2022-1813']

In [6]:
from epss import epss
import nvdlib
import logging

logging.basicConfig(level=logging.INFO)

# key (str) – NVD API Key. Allows for a request every 0.6 seconds instead of 6 seconds.

def add_cvss(cveid,API_KEY=None):
  try:
    r = nvdlib.getCVE(cveid,key=API_KEY)
    scores = r.score
    return scores
  except LookupError as e:
    return [None,None,None]

client = epss.EPSS()

found = []
for cve in MY_VULNS:
  epss_df,status=client.get(cve=cve)
  if epss_df.shape[0]>0:
    found.append(epss_df)
epss_df = pd.concat(found)

# All the vulns that are not in NVD
Potentially very dangerous we know very little.

In [61]:
epss_df[epss_df.score.isna()]

Unnamed: 0,cve,epss,percentile,date,version,score,severity


## Then consider the ones with both scores

In [62]:
epss_df[~epss_df.score.isna()]

Unnamed: 0,cve,epss,percentile,date,version,score,severity
0,CVE-2022-0540,0.2217,0.96373,2022-05-30,V3,9.8,CRITICAL
1,CVE-2022-24160,0.00885,0.24937,2022-05-30,V3,7.5,HIGH
2,CVE-2010-0379,0.56371,0.98701,2022-05-30,V2,9.3,HIGH
3,CVE-2022-1813,0.02055,0.78213,2022-05-30,V3,9.8,CRITICAL


In [None]:
epss_df.reset_index(inplace=True)
# remember that the NVD has throttling so this will be slow
epss_df[['version','score','severity']]=epss_df.apply(lambda x:add_cvss(x['cve']),axis=1, result_type="expand")

In [10]:
epss_df

Unnamed: 0,cve,epss,percentile,date,version,score,severity
0,CVE-2022-0540,0.2217,0.96373,2022-05-30,V3,9.8,CRITICAL
1,CVE-2022-24160,0.00885,0.24937,2022-05-30,V3,7.5,HIGH
2,CVE-2010-0379,0.56371,0.98701,2022-05-30,V2,9.3,HIGH
3,CVE-2022-1813,0.02055,0.78213,2022-05-30,V3,9.8,CRITICAL


# Current ranking prioritization

When all CVE have EPSS and CVSS scoring you can follow the diagonal.

In [11]:
priority_df = epss_df[epss_df.version.notna()].sort_values(by=['epss','score'],ascending = False)
priority_df.sort_index(inplace=True)
# top priority is 0 and then so on...
priority_df

Unnamed: 0,cve,epss,percentile,date,version,score,severity
0,CVE-2022-0540,0.2217,0.96373,2022-05-30,V3,9.8,CRITICAL
1,CVE-2022-24160,0.00885,0.24937,2022-05-30,V3,7.5,HIGH
2,CVE-2010-0379,0.56371,0.98701,2022-05-30,V2,9.3,HIGH
3,CVE-2022-1813,0.02055,0.78213,2022-05-30,V3,9.8,CRITICAL


# Identify vuln movers and prioritize them

Very naive change point detection and keep the last delta as a way to prioritize them.

More advanced strategies here will be to apply RLE,SAX/PAX, change point detection and other better methods to infer the trend.

In [59]:
import numpy as np

moved = []
for idx,row in priority_df.iterrows():
  print(f'Time series of CVE {row["cve"]}')
  ts_df,status=client.get(cve=row['cve'],scope='time-series')
  ts_df['epss'] = ts_df['epss'].astype('float')
  ts_df['percentile'] = ts_df['percentile'].astype('float')

  epss_range =ts_df['epss'].max()-ts_df['epss'].min()
  perc_range =ts_df['epss'].max()-ts_df['epss'].min()

  if epss_range != 0.0:
    ts_df['date'] = pd.to_datetime(ts_df['date'])
    ts_df.sort_values(by='date',inplace=True)
    ts_df.reset_index(inplace=True)
    ts_df['depss'] = ts_df['epss'] - ts_df['epss'].shift(1)
    ts_df.replace(np.nan, 0.0,inplace=True)
    all_up = (ts_df['depss'] >= 0.0).all(axis=0)
    all_down = (ts_df['depss'] <= 0.0).all(axis=0) 

    chg_df = ts_df[(ts_df.depss!=0.0) & (ts_df.depss.notna())]
    # quick check is it monotoic increasing or decreasing
    last_chg = chg_df.iloc[-1]
    if all_up:
      moved.append({'CVE':row['cve'],'Trend':'variable up','Delta':last_chg['depss']})
    elif all_down:
      moved.append({'CVE':row['cve'],'Trend':'variable down','Delta':last_chg['depss']})
    else:
      moved.append({'CVE':row['cve'],'Trend':'variable steady','Delta':last_chg['depss']})
  else:
    moved.append({'CVE':row['cve'],'Trend':'static','Delta':0.0})
moved_df = pd.DataFrame(moved)
moved_df.sort_values(by='Delta',ascending=False,inplace=True)

Time series of CVE CVE-2022-0540
Time series of CVE CVE-2022-24160
Time series of CVE CVE-2010-0379
Time series of CVE CVE-2022-1813


In [60]:
moved_df

Unnamed: 0,CVE,Trend,Delta
0,CVE-2022-0540,variable up,0.21034
3,CVE-2022-1813,variable up,0.0105
1,CVE-2022-24160,static,0.0
2,CVE-2010-0379,variable down,-0.01944
