In [25]:
import os
import json
import pprint
import re
import argparse
from statistics import mean
from collections import Counter, defaultdict

import numpy as np
import matplotlib.pyplot as plt


# Tools
sca_tools = ['Grype.txt', 'Snyk.txt', 'Trivy.txt']

# RICs
rics = ['ONOS', 'OSC']

repoWithError = []

# Packages to exclude in the RIC repos
test_package = re.compile(r'test/')
benchmark_package = re.compile(r'benchmark')
examples_package = re.compile(r'examples/')
testapplication_package = re.compile(r'testapplication/')
cert_package = re.compile(r'certs/')


# First we normalize the results from each tool
def format_sca_tool_data(repository, tool):
    if tool == "Grype.txt":
        return formatGrype(repository)
    elif tool == "Snyk.txt":
        return formatSnyk(repository)
    elif tool == "Trivy.txt":
        return formatTrivy(repository)

# This gets all the vulnerabilities in a normalized way.
# (In a list which includes all the vulnerabilities, which are not contained in a test package)
def formatGrype(repository):
    GrypeRepo = json.loads(repository)
    vulnArray = []
    for vuln in GrypeRepo["matches"]:
        path = vuln.get("artifact").get("locations")[0].get("path")
        if test_package.search(path) is not None:
            continue
        elif benchmark_package.search(path) is not None:
            continue
        elif examples_package.search(path) is not None:
            continue
        elif testapplication_package.search(path) is not None:
            continue
        else:
            vulnArray.append(vuln)
    return vulnArray

def formatSnyk(repository):
    content = json.loads(repository)
    vulnArray = []
    if "error" not in content:
        for target in content:
            if not isinstance(target, str):
                vulnList = target.get('vulnerabilities')
                path = target.get('displayTargetFile')
                if test_package.search(path) is not None:
                    print("Snyk: Skipping:" + path)
                    continue
                elif benchmark_package.search(path) is not None:
                    print("Snyk: Skipping:" + path)
                    continue
                elif examples_package.search(path) is not None:
                    print("Snyk: Skipping:" + path)
                    continue
                elif testapplication_package.search(path) is not None:
                    print("Snyk: Skipping:" + path)
                    continue
                else:
                    for vuln in vulnList:
                        vuln.pop('semver')
                        vulnArray.append(vuln)
            else:
                if target == 'vulnerabilities':
                    vulnList = content.get('vulnerabilities')
                    path = content.get('displayTargetFile')
                    print("Snyk path: {}".format(path))
                    if test_package.search(path) is not None:
                        print("Snyk: Skipping:" + path)
                        continue
                    elif benchmark_package.search(path) is not None:
                        print("Snyk: Skipping:" + path)
                        continue
                    elif examples_package.search(path) is not None:
                        print("Snyk: Skipping:" + path)
                        continue
                    elif testapplication_package.search(path) is not None:
                        print("Snyk: Skipping:" + path)
                        continue
                    else:
                        for vuln in vulnList:
                            vuln.pop('semver')
                            vulnArray.append(vuln)
                            print("1")
    else:
        global repoWithError
        repoWithError.append(os.path.basename(content['path']))
    return vulnArray

def formatTrivy(repository):
    index = repository.find("{")
    repo = repository[index:]
    TrivyRepo = json.loads(repo)
    results = TrivyRepo.get("Results")
    vulnArray = []
    if results is not None:
        for target in results:
            path = target.get("Target")
            if test_package.search(path) is not None:
                print("Trivy: Skipping:" + path)
                continue
            elif benchmark_package.search(path) is not None:
                print("Trivy: Skipping:" + path)
                continue
            elif examples_package.search(path) is not None:
                print("Trivy: Skipping:" + path)
                continue
            else:
                vulnTarget = target.get("Vulnerabilities", [])
                if not vulnTarget:
                    continue
                for vuln in vulnTarget:
                    vuln["Path"] = path
                vulnArray.extend(vulnTarget)
    return vulnArray

def get_vulnerabilities_by_directory(formatted_data, tool):
        vulnerabilities_by_directory = defaultdict(list)
        if tool == "Grype.txt":
            for vuln in formatted_data:
                path = vuln.get("artifact").get("locations")[0].get("path")
                directory = os.path.dirname(path)
                vulnerabilities_by_directory[directory].append(vuln)
            return vulnerabilities_by_directory
        elif tool == "Trivy.txt":
            for vuln in formatted_data:
                path = vuln.get("Target")
                directory = os.path.dirname(path)
                vulnerabilities_by_directory[directory].append(vuln)
            return vulnerabilities_by_directory
        elif tool == "Snyk.txt":     
            return vulnerabilities_by_directory

def save_vulnerabilities_by_directory(vulnerabilities_by_directory, tool, base_dir="./ONOS"):
    for directory, vulnerabilities in vulnerabilities_by_directory.items():
        
        clean_directory = re.sub(r'[^a-zA-Z0-9_\-]', '', directory)
        dir_path = os.path.join(base_dir, clean_directory)
        os.makedirs(dir_path, exist_ok=True)
        
        filename = f"{tool}"
        filepath = os.path.join(dir_path, filename)
        
        with open(filepath, 'w') as json_file:
            json.dump(vulnerabilities, json_file, separators=(',', ':'))
        
    

def dump_scan_results(rics, sca_tools):
    scan_results = dict.fromkeys(rics)
    onos_repos = []
    osc_repos = []
    for ric in rics:
        ric_dir = "./" + ric
        if not os.path.exists(ric_dir):
            os.makedirs(ric_dir)
        for repository in sorted(os.listdir(ric_dir)):
            if ric == "ONOS":
                onos_repos.append(repository)
            elif ric == "OSC":
                osc_repos.append(repository)
    for ric in rics:
        if ric == "ONOS":
            scan_results[ric] = dict.fromkeys(onos_repos)
        elif ric == "OSC":
            scan_results[ric] = dict.fromkeys(osc_repos)
        for repository in sorted(os.listdir("./" + ric)):
            scan_results[ric][repository] = dict.fromkeys(sca_tools)
            path_to_repository = os.path.join("./" + ric, repository)
            for sca_tool_file in sorted(os.listdir(path_to_repository)):
                sca_tool_file_path = os.path.join(path_to_repository, sca_tool_file)
                with open(sca_tool_file_path) as file:
                    vuln = file.read()
                scan_results[ric][repository][sca_tool_file] = vuln
    with open('sca_results.json', 'w') as file:
        json.dump(scan_results, file)
    print("Finished writing: " + 'sca_results.json')
    pprint.pprint(scan_results)
    return scan_results

In [70]:
with open('trivy-results-11.json', 'r') as file:
        data = file.read()



In [71]:
formatted_data = format_sca_tool_data(data, "Trivy.txt")
print(formatted_data)


Trivy: Skipping:onos-a1t/test/utils/xapp/go.mod
Trivy: Skipping:onos-config/benchmark/go.mod
Trivy: Skipping:onos-config/test/go.mod
Trivy: Skipping:onos-e2t/test/go.mod
Trivy: Skipping:onos-topo/test/go.mod
[{'VulnerabilityID': 'CVE-2021-43816', 'PkgID': 'github.com/containerd/containerd@v1.5.7', 'PkgName': 'github.com/containerd/containerd', 'PkgIdentifier': {'PURL': 'pkg:golang/github.com/containerd/containerd@1.5.7', 'UID': '968c48862f657096'}, 'InstalledVersion': '1.5.7', 'FixedVersion': '1.5.9', 'Status': 'fixed', 'Layer': {}, 'SeveritySource': 'ghsa', 'PrimaryURL': 'https://avd.aquasec.com/nvd/cve-2021-43816', 'DataSource': {'ID': 'ghsa', 'Name': 'GitHub Security Advisory Go', 'URL': 'https://github.com/advisories?query=type%3Areviewed+ecosystem%3Ago'}, 'Title': 'containerd: Unprivileged pod may bind mount any privileged regular file on disk', 'Description': 'containerd is an open source container runtime. On installations using SELinux, such as EL8 (CentOS, RHEL), Fedora, or SU

In [72]:


def get_vulnerabilities_by_directory(data, tool):
        # For grype we can work with the formatted data directly
        if tool == "Grype.txt":
            formatted_data = format_sca_tool_data(data, "Grype.txt")
        
        # For trivy we need to load the data into a json object first.
    # WE CAN USE THE FORMATTADED DATA DIRECTLY CHECK THIS 
    # IT could be done with reading the the path from the path key which is created after the format function!!!
        elif tool == "Trivy.txt":
            repo = json.loads(data)
            results = repo.get("Results")

        # For snyk we dont know yet
        elif tool == "Snyk.txt":
            skip = True

        vulnerabilities_by_directory = defaultdict(list)
        
        if tool == "Grype.txt":
            for vuln in formatted_data:
                path = vuln.get("artifact").get("locations")[0].get("path")
                directory = os.path.dirname(path)
                vulnerabilities_by_directory[directory].append(vuln)
            return vulnerabilities_by_directory
        
        elif tool == "Trivy.txt":
            for vuln in results:
                path = vuln.get("Target")
                if test_package.search(path) is not None:
                    continue
                elif benchmark_package.search(path) is not None:
                    continue
                elif examples_package.search(path) is not None:
                    continue
                elif cert_package.search(path) is not None:
                    continue
                else:
                    vulnTarget = vuln.get("Vulnerabilities", [])
                    if not vulnTarget:
                        continue
                    for vuln in vulnTarget:
                        vuln["Path"] = path
                        directory = os.path.dirname(path)
                        vulnerabilities_by_directory[directory].append(vuln)
            return vulnerabilities_by_directory
        elif tool == "Snyk.txt":
       
            return vulnerabilities_by_directory
        
        
get_vulnerabilities_by_directory(data, "Trivy.txt")

defaultdict(list,
            {'onos-a1t': [{'VulnerabilityID': 'CVE-2021-43816',
               'PkgID': 'github.com/containerd/containerd@v1.5.7',
               'PkgName': 'github.com/containerd/containerd',
               'PkgIdentifier': {'PURL': 'pkg:golang/github.com/containerd/containerd@1.5.7',
                'UID': '968c48862f657096'},
               'InstalledVersion': '1.5.7',
               'FixedVersion': '1.5.9',
               'Status': 'fixed',
               'Layer': {},
               'SeveritySource': 'ghsa',
               'PrimaryURL': 'https://avd.aquasec.com/nvd/cve-2021-43816',
               'DataSource': {'ID': 'ghsa',
                'Name': 'GitHub Security Advisory Go',
                'URL': 'https://github.com/advisories?query=type%3Areviewed+ecosystem%3Ago'},
               'Title': 'containerd: Unprivileged pod may bind mount any privileged regular file on disk',
               'Description': 'containerd is an open source container runtime. On install

In [74]:
def save_vulnerabilities_by_directory(vulnerabilities_by_directory, tool, base_dir="./ONOS"):
    for directory, vulnerabilities in vulnerabilities_by_directory.items():
        
        clean_directory = re.sub(r'[^a-zA-Z0-9_\-]', '', directory)
        dir_path = os.path.join(base_dir, clean_directory)
        os.makedirs(dir_path, exist_ok=True)
        
        filename = f"{tool}"
        filepath = os.path.join(dir_path, filename)
        
        with open(filepath, 'w') as json_file:
            json.dump(vulnerabilities, json_file, separators=(',', ':'))
        
        

vulnerabilities_by_directory = get_vulnerabilities_by_directory(data, "Trivy.txt")

save_vulnerabilities_by_directory(vulnerabilities_by_directory, "Trivy.txt")


dump_scan_results(['ONOS', 'OSC'], ['Trivy.txt'])


Finished writing: sca_results.json
{'ONOS': {'onos-a1t': {'Grype.txt': '[{"vulnerability":{"id":"GHSA-mvff-h3cj-wj9c","dataSource":"https://github.com/advisories/GHSA-mvff-h3cj-wj9c","namespace":"github:language:go","severity":"High","urls":["https://github.com/advisories/GHSA-mvff-h3cj-wj9c"],"description":"Unprivileged '
                                    'pod using `hostPath` can side-step active '
                                    'LSM when it is '
                                    'SELinux","cvss":[{"version":"3.1","vector":"CVSS:3.1/AV:N/AC:H/PR:H/UI:N/S:C/C:H/I:H/A:H","metrics":{"baseScore":8,"exploitabilityScore":1.3,"impactScore":6},"vendorMetadata":{"base_severity":"High","status":"N/A"}}],"fix":{"versions":["1.5.9"],"state":"fixed"},"advisories":[]},"relatedVulnerabilities":[{"id":"CVE-2021-43816","dataSource":"https://nvd.nist.gov/vuln/detail/CVE-2021-43816","namespace":"nvd:cpe","severity":"Critical","urls":["https://github.com/containerd/containerd/commit/a731039238c

{'ONOS': {'onos-a1t': {'Trivy.txt': '[{"VulnerabilityID":"CVE-2021-43816","PkgID":"github.com/containerd/containerd@v1.5.7","PkgName":"github.com/containerd/containerd","PkgIdentifier":{"PURL":"pkg:golang/github.com/containerd/containerd@1.5.7","UID":"968c48862f657096"},"InstalledVersion":"1.5.7","FixedVersion":"1.5.9","Status":"fixed","Layer":{},"SeveritySource":"ghsa","PrimaryURL":"https://avd.aquasec.com/nvd/cve-2021-43816","DataSource":{"ID":"ghsa","Name":"GitHub Security Advisory Go","URL":"https://github.com/advisories?query=type%3Areviewed+ecosystem%3Ago"},"Title":"containerd: Unprivileged pod may bind mount any privileged regular file on disk","Description":"containerd is an open source container runtime. On installations using SELinux, such as EL8 (CentOS, RHEL), Fedora, or SUSE MicroOS, with containerd since v1.5.0-beta.0 as the backing container runtime interface (CRI), an unprivileged pod scheduled to the node may bind mount, via hostPath volume, any privileged, regular fil

In [47]:
import json
import pprint

# Define the get_cves_cvss_dependencies function
def get_cves_cvss_dependencies(sca_tool, sca_tool_data):
    cves_cvss_dependencies = []
    cves = []
    cvss = []
    packages = []
    if sca_tool == "Grype.txt":
        for vulnerability in sca_tool_data:
            if vulnerability.get("vulnerability").get("id") not in cves:
                cves.append(vulnerability.get("vulnerability").get("id"))
                cvss_info = vulnerability.get("vulnerability").get("cvss")
                if cvss_info and len(cvss_info) > 0:
                    cvss.append(cvss_info[0].get("metrics").get("baseScore"))
                else:
                    cvss.append(None)  # or handle as you prefer
                    print(f"Vulnerability without CVSS: {vulnerability.get('vulnerability').get('id')}")
                vulnerability_match_details = vulnerability.get("matchDetails")
                for match_detail in vulnerability_match_details:
                    if "package" in match_detail["searchedBy"].keys():
                        packages.append(match_detail["searchedBy"]["package"]["name"])
                    elif "Package" in match_detail["searchedBy"].keys():
                        packages.append(match_detail["searchedBy"]["Package"]["name"])
            else:
                continue
        cves_cvss_dependencies = [cves, cvss, packages]
        return cves_cvss_dependencies
    elif sca_tool == "Snyk.txt":
        for vulnerability in sca_tool_data:
            if len(vulnerability.get("identifiers").get("CVE")) == 0:
                continue
            else:
                if vulnerability.get("identifiers").get("CVE")[0] not in cves:
                    cves.append(vulnerability.get("identifiers").get("CVE")[0])
                    cvss.append(vulnerability.get("cvssScore"))
                    packages.append(vulnerability.get("moduleName"))
        cves_cvss_dependencies = [cves, cvss, packages]
        return cves_cvss_dependencies
    elif sca_tool == "Trivy.txt":
        for vulnerability in sca_tool_data:
            print(vulnerability)
            if vulnerability.get("VulnerabilityID") not in cves:
                if vulnerability.get("CVSS") is not None:
                    cves.append(vulnerability.get("VulnerabilityID"))
                    packages.append(vulnerability.get("PkgName"))
                    nvd = vulnerability.get("CVSS").get("nvd")
                    ghsa = vulnerability.get("CVSS").get("ghsa")
                    if nvd is not None:
                        cvss.append(nvd.get("V3Score"))
                        continue
                    elif ghsa is not None:
                        cvss.append(ghsa.get("V3Score"))
                        continue
                else:
                    print(f"Vulnerability without CVSS: {vulnerability.get('VulnerabilityID')}")
                    continue
        cves_cvss_dependencies = [cves, cvss, packages]
        return cves_cvss_dependencies
    else:
        print("Unknown tool")
    return cves_cvss_dependencies

# Loading the sca_results.json and calling get_cves_cvss_dependencies
with open('sca_results.json', 'r') as file:
    sca_results = json.load(file)

# Specify the RIC, repository, and tool you want to analyze
ric = "ONOS"  # Replace with the actual RIC you are interested in
repository = "onos-a1t"  # Replace with your repository name
tool = "Trivy.txt"  # Replace with the actual tool name you are analyzing

# Extract the relevant data from the JSON
sca_tool_data_str = sca_results[ric][repository][tool]

# Parse the extracted JSON string
sca_tool_data = json.loads(sca_tool_data_str)

# Call the function with the parsed data
result = get_cves_cvss_dependencies(tool, sca_tool_data)
print(result)

{'VulnerabilityID': 'CVE-2021-43816', 'PkgID': 'github.com/containerd/containerd@v1.5.7', 'PkgName': 'github.com/containerd/containerd', 'PkgIdentifier': {'PURL': 'pkg:golang/github.com/containerd/containerd@1.5.7', 'UID': '968c48862f657096'}, 'InstalledVersion': '1.5.7', 'FixedVersion': '1.5.9', 'Status': 'fixed', 'Layer': {}, 'SeveritySource': 'ghsa', 'PrimaryURL': 'https://avd.aquasec.com/nvd/cve-2021-43816', 'DataSource': {'ID': 'ghsa', 'Name': 'GitHub Security Advisory Go', 'URL': 'https://github.com/advisories?query=type%3Areviewed+ecosystem%3Ago'}, 'Title': 'containerd: Unprivileged pod may bind mount any privileged regular file on disk', 'Description': 'containerd is an open source container runtime. On installations using SELinux, such as EL8 (CentOS, RHEL), Fedora, or SUSE MicroOS, with containerd since v1.5.0-beta.0 as the backing container runtime interface (CRI), an unprivileged pod scheduled to the node may bind mount, via hostPath volume, any privileged, regular file on 