In [4]:
import requests
import time
import json
import pandas as pd 
from tqdm import tqdm
from collections.abc import MutableMapping

In [7]:
# Transform the raw nested JSON file into a single level Dataframe compatible dict

def flatten_dict(d: MutableMapping, sep: str= '.') -> MutableMapping:
    [flat_dict] = pd.json_normalize(d, sep=sep).to_dict(orient='records')
    return flat_dict


def fully_flatten(d):  
    flattened = flatten_dict(d)

  ## DEAL WITH METRICS
    for version in ['2','30','31']:
        try:
            flattened[f"cve.metrics.cvssMetricV{version}"] = flatten_dict(flattened[f"cve.metrics.cvssMetricV{version}"][0])
            flattened[f"cve.metrics.cvssMetricV{version}"] = True
        except:
            flattened[f"cve.metrics.cvssMetricV{version}"] = np.nan

    ## DEAL WITH WEAKNESSES
    ## Note: when only one weakness, it's denoted with cve.weakness but if many it's denoted with cve.weaknesses
    if "cve.weakness" in flattened.keys():
        primary_source = np.nan
        primary_description = flattened['cve.weaknesses.description'][0]['value']
        del flattened['cve.weaknesses.description']
        flattened.update({
              "cve.weaknesses.primary.source":        primary_source,
              "cve.weaknesses.primary.description":   primary_description,
              "cve.weaknesses.secondary.source":      np.nan,
              "cve.weaknesses.secondary.description": np.nan,
        })
    elif "cve.weakness" in flattened.keys():
        try:
            flattened.update({
              "cve.weaknesses.primary.source":        flattened['cve.weaknesses'][0]['source'],
              "cve.weaknesses.primary.description":   flattened['cve.weaknesses'][0]['description'],
              "cve.weaknesses.secondary.source":      flattened['cve.weaknesses'][1]['source'],
              "cve.weaknesses.secondary.description": flattened['cve.weaknesses'][1]['description'],
            })
        except:
            flattened.update({
              "cve.weaknesses.primary.source":        flattened['cve.weaknesses'][0]['source'],
              "cve.weaknesses.primary.description":   flattened['cve.weaknesses'][0]['description'],
              "cve.weaknesses.secondary.source":      np.nan,
              "cve.weaknesses.secondary.description": np.nan
            })
        finally:
            del flattened['cve.weaknesses']

    else:
        flattened.update({
              "cve.weaknesses.primary.source":        np.nan,
              "cve.weaknesses.primary.description":   np.nan,
              "cve.weaknesses.secondary.source":      np.nan,
              "cve.weaknesses.secondary.description": np.nan
          })

    ## DEAL WITH DESCRIPTIONS
    flattened.update({"cve.descriptions.lang.en": flattened['cve.descriptions'][0]['value']})

    try:
        value = flattened['cve.descriptions'][1]['value']
    except:
        value = np.nan
    finally:
        flattened.update({"cve.descriptions.lang.es": value})

    del flattened['cve.descriptions']

    ## FLATTEN AGAIN TO DEAL WITH NEW NESTED DICTS
    flattened = flatten_dict(flattened)

    ## DEAL WITH REFERENCES
    for i in range(3):
        try:
            url = flattened['cve.references'][i]['url']
            source = flattened['cve.references'][i]['source']
        except:
            url = np.nan
            source = np.nan
        finally:
            flattened.update({
              f"cve.references.{i}.url": url,
              f"cve.references.{i}.source": source
            })

    del flattened['cve.references']

    ## DEAL WITH CONFIGURATIONS
    try:
        flattened.update({
            "cve.configurations.nodes.operator": flattened['cve.configurations'][0]["nodes"][0]['operator'],
            "cve.configurations.nodes.negate": flattened['cve.configurations'][0]["nodes"][0]['negate'],
            "cve.configurations.nodes.cpeMatch.vulnerable": flattened['cve.configurations'][0]["nodes"][0]['cpeMatch'][0]['vulnerable'],
            "cve.configurations.nodes.cpeMatch.criteria": flattened['cve.configurations'][0]["nodes"][0]['cpeMatch'][0]['criteria'],
            "cve.configurations.nodes.cpeMatch.matchCriteriaId": flattened['cve.configurations'][0]["nodes"][0]['cpeMatch'][0]['matchCriteriaId']
        })
        del flattened['cve.configurations']
    except:
        flattened.update({
            "cve.configurations.nodes.operator": np.nan,
            "cve.configurations.nodes.negate": np.nan,
            "cve.configurations.nodes.cpeMatch.vulnerable": np.nan,
            "cve.configurations.nodes.cpeMatch.criteria": np.nan,
            "cve.configurations.nodes.cpeMatch.matchCriteriaId": np.nan
        })

    return flattened

In [None]:
# Scrap all the data from NVD and store them in a text file. 
# LAST SCRAP DATE: 19-03-2023

for i in range(106):
    startIndex = (i * 2000)
    resultsPerPage = 2000
    API_KEY = "0a418269-2f9e-400f-b24a-2693b70d9ce3"
    time.sleep(1)
    response = requests.get(f"https://services.nvd.nist.gov/rest/json/cves/2.0/?resultsPerPage={resultsPerPage}&startIndex={startIndex}", headers={'apiKey': API_KEY})
    content = json.loads(response.content)['vulnerabilities']
    # Append data to file
    with open("Data/api_cves.txt", "a") as f:
        for item in content:
            f.write(json.dumps(item) + "\n")

In [None]:
# Load data from file
with open("api_cves.txt", "r") as f:
    loaded_data = []
    [loaded_data.append(json.loads(line)) for line in f]

In [None]:
processed = []

for i,cve in tqdm(enumerate(loaded_data)):
    if cve['cve']['vulnStatus'] not in ['Rejected','Deferred']:
        flattened = fully_flatten(cve)
        processed.append(flattened)

df = pd.DataFrame.from_dict(processed)
print("df.shape: ", df.shape)

print('total CVEs scrapped: ', len(loaded_data))
print('Rejected CVEs: ', len(loaded_data) - df.shape[0])
print('Useable CVEs: ', df.shape[0])

# Just in case
df.fillna(value=np.nan,inplace=True)
df.replace('NaN',np.nan, inplace=True)

# Type conversion error because of mixed types
df['cve.configurations.nodes.negate'] = df['cve.configurations.nodes.negate'].astype('string') 
df['cve.configurations.nodes.cpeMatch.vulnerable'] = df['cve.configurations.nodes.cpeMatch.vulnerable'].astype('string') 

for i,row in tqdm(df.iterrows()):
    for metric in ['cvssMetricV2', 'cvssMetricV30', 'cvssMetricV31']:
        column = f'cve.metrics.{metric}'
        if row.filter(regex=column+'.').isna().sum() == 0:
            df.at[i, column] = True
        elif row.filter(regex=column+'.').isna().sum() == 16:
            df.at[i, column] = False
        else:
            df.at[i, column] = "?"

df.to_csv('data/raw_cves.csv',index=False)

## Al9a kifeh t'intégri hal code lfou9

In [None]:
# Vulnerability status check
df['cve.vulnStatus'].value_counts()

for metric in ['cvssMetricV2', 'cvssMetricV30', 'cvssMetricV31']:
    print(metric,': ')
    print(df[metric].value_counts())

df['version'] = df['cve.published']
df ['tayech'] = df["cve.metrics.cvssMetricV2"].replace("?", True).astype(bool)

for i, row in tqdm(df.iterrows()):
    binary_vector = row[["cve.metrics.cvssMetricV30","cve.metrics.cvssMetricV31","tayech"]].values
    if (binary_vector[0] == False) and (binary_vector[1] == False) and (binary_vector[2] == True):
        df.at[i,"version"] = "V2"
    elif (binary_vector[0] == True) and (binary_vector[1] == False) and (binary_vector[2] == False):
        df.at[i,"version"] = "V3.0"
    elif (binary_vector[0] == False) and (binary_vector[1] == True) and (binary_vector[2] == False):
        df.at[i,"version"] = "V3.1"
    elif (binary_vector[0] == True) and (binary_vector[1] == True) and (binary_vector[2] == False):
        df.at[i,"version"] = "V3.X"
    elif (binary_vector[0] == False) and (binary_vector[1] == True) and (binary_vector[2] == True):
        df.at[i,"version"] = 'V2 & V3.1'
    elif (binary_vector[0] == True) and (binary_vector[1] == False) and (binary_vector[2] == True):
        df.at[i,"version"] = 'V2 & V3.0'
    elif (binary_vector[0] == True) and (binary_vector[1] == True) and (binary_vector[2] == True):
        df.at[i,"version"] = 'all'
    else:
        df.at[i,"version"] = 'other'