# Grype vulnerabilities collector

Starting from a set of <b> organizations</b> already present in the database, this notebook collected vulnerabilities with <b>Grype</b> and store them in the database.
<hr>

In [None]:
organizations = ["opendatatrentino"] # Set here the GitHub username of the organization

# In order to obtain further information about vulnerabilities, NVD API are used. The public rate limit (without an API key) 
# is 5 requests in a rolling 30 second window, thus in this notebook the default wait_time is setted to 6 sec between to requests.
# If you want to speed up the process (up to 50 requests in a rolling 30 second window), you can get an API KEY by following the
# instructions in #https://nvd.nist.gov/developers/start-here#:~:text=to%20in%20sequence.-,Request%20an%20API%20Key,-On%20the%20API.

# Once the key is obtained, it is sufficient to paste it in the following variable for increasing the rate limit used in this notebook.
# If you do not want to use a KEY simply leave the following variable EMPTY.
nvd_api_key = "<NVD-API-KEY>" 

#### Requirements

In [None]:
# In order to execute Grype, wsl (Windows subsystem for Linux) is required along with an Ubuntu distro.
# Install WSL and Ubuntu (if does not work, enable WSL features from Windows settings: 'Turn Windows features on or off'):
#! wsl --install -d ubuntu
#! wsl --set-default Ubuntu

#Install Grype (if it does not work from PowerShell (or from the console used by the notebook), try to install it with Command Prompt):
#! wsl curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | wsl sh -s -- -b /usr/local/bi

<hr>

#### Logger set up

In [None]:
import logging, os, datetime,sys
from pathlib import Path
Path('logs').mkdir(parents=True,exist_ok=True)
# Logging Levels: DEBUG, INFO, WARNING, ERROR, CRITICAL
logging.basicConfig(#filename=os.path.join('logs','sbom_creator',str(datetime.datetime.now().strftime("%d-%m-%Y T%H %M %S")) +'.log'),
handlers=[
        logging.FileHandler(os.path.join('logs','log-'+str(datetime.datetime.now().strftime("%d-%m-%Y")) +'.log')),
        logging.StreamHandler(sys.stdout)
    ],
                    format='%(asctime)s |:| LEVEL:%(levelname)-2s |:| FILE:notebook_3 (grype_vulns).ipynb:%(lineno)-s |:| %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.DEBUG)
logging.getLogger("urllib3").propagate = False

#### Database connection

In [None]:
from lib.sqlite_utils import DBConnection 

if not os.path.exists(os.path.join('database','database.sqlite')):
    logging.critical('Database does not exists! You need to create it first (db_builder.ipynb)')
    raise Exception('Database does not exists! You need to create it first (db_builder.ipynb)')

conn=DBConnection(os.path.join('database','database.sqlite'))
logging.info('Connected with "database/database.sqlite" database.') 

#### Checking organization existance

In [None]:
for org in range(len(organizations)):
    try:
        organizations[org] = conn.get_rows('organization',{'url':'https://github.com/{}'.format(organizations[org])})[0]
        logging.info('Found organization "{}" in the database!'.format(organizations[org]['user_name']))
    except IndexError as err:
        logging.warning('Cannot find organization "{}" in the database!'.format(organizations[org]))
        organizations.remove(organizations[org])
        continue

#### Grype execution for each repository of the organization:

In [None]:
import os
from pathlib import PureWindowsPath, PurePosixPath
from lib.vuln_utils import get_grype_vulns

repositories = list()
for organization in organizations:
    repositories.extend([(repo,organization) for repo in conn.get_rows('repository', {'organization':organization['url']})])

grype_vulns = []
if not os.path.exists(os.path.join('grype_db','vulnerability.db')):
    logging.warning('Cannot find Grype "vulnerability.db" in "grype_db" folder. Execution will continue without storing additional information for vulnerabilities!')
else:
    grype_db_conn=DBConnection(os.path.join('grype_db','vulnerability.db'))
    grype_vulns = grype_db_conn.get_rows('vulnerability')

logging.info('Starting Grype vulnerabilities collection for organization "{}"'.format(organization['user_name']))
all_vulns = list()

for repo,organization in repositories:
    logging.info('Collecting Grype vulnerabilities for repository "{}" ...'.format(repo['name']))

    sbom_path = None
    man_sbom_path = os.path.join('sbom','manifest',organization['user_name'],'man_{}_{}_sbom.json'.format(organization['user_name'],repo['name']))
    par_sbom_path = os.path.join('sbom','manifest',organization['user_name'],'par_{}_{}_sbom.json'.format(organization['user_name'],repo['name']))
    
    if os.path.isfile(par_sbom_path):
        sbom_path = par_sbom_path
    elif os.path.isfile(man_sbom_path):
        sbom_path = man_sbom_path
    else:
        logging.warning('Cannot find SBoM files for repository "{}". Skipping to the next repository...'.format(repo['name']))
        continue

    packages = [dict({'name':p[0],'purl':p[1]}) for p in conn.query("""SELECT p.name,p.purl FROM 
                                                                      (SELECT * FROM manifest_dependency UNION SELECT * FROM parsed_dependency) dep
                                                                       LEFT JOIN package p ON p.purl = dep.package
                                                                       WHERE dep.repository = "{}" """.format(repo['url']))]
    vulns,affections,cpe_vulns,cpe_affections = [],[],[],[]

    try:
        vulns, affections = get_grype_vulns(sbom_path=str(PurePosixPath(PureWindowsPath(sbom_path))), packages=packages, extended_vulns=grype_vulns)
    except Exception as e:
        logging.warning('An error occur while executing Grype for "{}" repository. Execution will continue with no Grype vulnerabilities for that repository. Error: \n{}'.format(repo['name'],e))
    try:
        cpe_vulns, cpe_affections = get_grype_vulns(sbom_path=str(PurePosixPath(PureWindowsPath(sbom_path))), packages=packages, extended_vulns=grype_vulns, add_cpes_if_none=True)
    except Exception as e:
        logging.warning('An error occur while executing Grype with -add-cpes-if-none parameter for "{}" repository. Execution will continue with no Grype_CPE vulnerabilities for that repository. Error: \n {}'.format(repo['name'],e))

    vulns.extend(cpe_vulns)
    all_vulns.extend(vulns)
    
    
    logging.info('Storing Grype vulnerabilities for repository "{}" in the database ...'.format(repo['name']))
    for vuln in vulns:
        conn.add_or_update('vulnerability',vuln)
    logging.info('Storing Grype affections for repository "{}" in the database ...'.format(repo['name']))
    for affection in affections:
        conn.add_or_update('grype_potential_affection',affection)
    logging.info('Storing Grype CPE affections for repository "{}" in the database ...'.format(repo['name']))
    for cpe_affection in cpe_affections:
        conn.add_or_update('grype_cpe_potential_affection',cpe_affection)


#### Drop duplicate vulnerabilities

In [None]:
all_vulns = list({x['id']:x for x in all_vulns}.values())

#### Use NVD API to store more info about vulnerabilities collected with Grype

In [None]:
from lib.vuln_utils import extend_vulns_with_nvdapi
logging.info('Getting more info about vulnerabilities with OSV API')
all_vulns = extend_vulns_with_nvdapi(all_vulns,wait_time=0.6 if nvd_api_key!='' else 6, logger=logging,nvd_api_key=nvd_api_key if nvd_api_key!='' else None)

#### Update database

In [None]:
for vuln in all_vulns:
    conn.add_or_update('vulnerability',vuln)

#### Close databases

In [None]:
conn.close()
grype_db_conn.close()