Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first cut cve pipeline and refactor fetcher #2

Open
wants to merge 18 commits into
base: add_structure
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions cve_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from cvereporter import fetch_vulnerabilities
from cvereporter import report
from cvereporter import nist_enhance
"""
this was the first cut downloader to retrieve vulnerabilities and create a vdr based off of a single report (on a given date) from OJVG
"""
bom = report.get_base_bom()
#todo: take date as arg or figure out other way to seed
vulns = fetch_vulnerabilities.fetch_cves('2023-01-17')
#todo: decorate vulnerabilities with info from NIST and others here
nist_enhance.enhance(vulns)
for vuln in vulns:
bom.vulnerabilities.add(vuln)
print(report.serialize_to_json(bom))
146 changes: 146 additions & 0 deletions cvereporter/fetch_vulnerabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#!/usr/bin/env python3

import argparse
import json
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from cyclonedx.model.impact_analysis import ImpactAnalysisAffectedStatus
from cyclonedx.model.vulnerability import Vulnerability, VulnerabilitySource,VulnerabilityScoreSource, VulnerabilityRating, VulnerabilitySeverity, BomTarget, BomTargetVersionRange
"""
Utilities to fetch data from OJVG and convert it to intermediate representations/CycloneDX structure
"""
def fetch_cves(date: str)->list[Vulnerability]:
return dict_to_vulns(fetch_dicts(date))

def fetch_dicts(date:str):
cve_text = retrieve_cves_from_internet(date)
dicts = parse_to_dict(cve_text, date)
return dicts

def retrieve_cves_from_internet(date: str) -> str:
# fetch the CVEs for the given date
url = 'https://openjdk.org/groups/vulnerability/advisories/' + date
print(url)
try:
r = requests.get(url, timeout=5, headers={"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/109.0",
"Accept-Language":"en-US,en;q=0.5",
"Accept-Encoding":"gzip, deflate, br",
"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Referer":"http://www.google.com/"})
print(r)
except requests.exceptions.ReadTimeout:
return None
if r.status_code == 404:
return None
resp_text = r.text
# todo: make this configurable
with open("data/open_jvg_dump_"+date+".html", "w") as dump:
dump.write(resp_text)
return resp_text

def parse_to_cyclone(resp_text: str, date: str) -> list[Vulnerability]:
dicts = parse_to_dict(resp_text, date)
return dict_to_vulns(dicts)

def parse_to_dict(resp_text: str, date: str) -> list[dict]:
if resp_text is None:
return None
soup = BeautifulSoup(resp_text, 'html.parser')

#find the versions affected
header_string = soup.find(name="p")
extracted_affected = extract_affected(header_string.text)

# find the table with the CVEs
table = soup.find('table', attrs={'class': 'risk-matrix'})

# find all the rows in the table
rows = table.find_all('tr')
dicts = []
# fetch CVE data from first td in each row
for row in rows:

# find the versions in the first row
header = row.find('th')
versions = []
if header is not None:
component = header.find_next_sibling('th')
if component.text == 'Component':
score = component.find_next_sibling('th')
while (score.find_next_sibling('th') is not None):
versions.append(score.find_next_sibling('th').text)
score = score.find_next_sibling('th')

cve = row.find('td')
if cve is not None:
id = cve.text
if(cve.text == "None"):
continue
link = cve.find('a')['href']
componentsTD = cve.find_next_sibling('td')
component = componentsTD.text.replace('\n', '')
scoreTD = componentsTD.find_next_sibling('td')
score = scoreTD.text

versionCheck = scoreTD
affected_versions = []
affected_versions+= extracted_affected #todo - maybe just the extracted ones
for version in versions:
versionCheck = versionCheck.find_next_sibling('td')
if versionCheck.text == '•':
affected_versions.append(int(version))


parsed_data = {}
parsed_data["id"] = id
parsed_data["url"] = link
parsed_data["date"] = date
parsed_data["component"] = component
parsed_data["affected"] = affected_versions
print(json.dumps(parsed_data))
dicts.append(parsed_data)

return dicts
def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]:
vulnerabilities = []
for parsed_data in dicts:
affects = BomTarget(
ref=parsed_data["component"]
)
# for v in parsed_data["affected"]:
#todo: this is not actually true - the affected versions are just for the whole report
# we need to extract affected versions on a per cve basis, not a per ojvg report basis
# affects.versions.add(v)
vuln = Vulnerability(
id=parsed_data["id"],
source=VulnerabilitySource(name="National Vulnerability Database", url=parsed_data["url"]),
#todo: dummy date
published=datetime.fromisoformat(parsed_data["date"]),
updated=datetime.fromisoformat(parsed_data["date"]),
description="",
recommendation=""
)
vuln.affects.add(affects)
vulnerabilities.append(vuln)
# print(vuln)
return vulnerabilities

def extract_affected(header_string: str) -> list[str]:
header_string = header_string.replace("\r", "").replace("\n"," ")
# print(header_string)
affected = []
start_vulns = "The affected versions are "
end_vulns = "Please note that defense-in-depth issues"
if start_vulns not in header_string or end_vulns not in header_string:
return []
vulns_sub = header_string[header_string.index(start_vulns)+len(start_vulns):header_string.index(end_vulns)]
#print(vulns_sub)
for ver in vulns_sub.split(","):
ver = ver.strip()
if "earlier" not in ver:
affected.append(ver)
# print(affected)
return affected

# fetch_cves('2023-01-17')
88 changes: 88 additions & 0 deletions cvereporter/nist_enhance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from cyclonedx.model.impact_analysis import ImpactAnalysisAffectedStatus
from cyclonedx.model.vulnerability import Vulnerability, VulnerabilitySource,VulnerabilityScoreSource, VulnerabilityRating, VulnerabilitySeverity, BomTarget, BomTargetVersionRange
import requests
import json

"""
this file has the utilities for downloading data about cves from NIST and updating Vulnerability objects with the data
"""
def fetch_nist(url: str, id: str) -> dict:
data = None
nist_resp = requests.get(url)
if nist_resp.status_code != 200:
print("error fetching {}; status code: {}; text: {}".format(id, nist_resp.status_code, nist_resp.text))
"""
the most frequently seen error response is:
error fetching CVE-2020-2805; status code: 403; text: <html><body><h1>403 Forbidden</h1> Request forbidden by administrative rules.
"""
else:
data = nist_resp.json()
with open("data/nist_"+id+".json", "w") as dest:
json.dump({"url":url, "data": data}, dest, indent=True)
return data
def extract_relevant_parts(nist_resp: dict) -> dict:
# todo: this can use a unit test at some point
resp_dict = {}
ratings = []
cve = nist_resp["vulnerabilities"][0]["cve"]
#todo: do we have more than 1 cve in a resp?
description = ""
for desc in cve["descriptions"]:
if(desc["lang"] == "en"):
description = desc["value"]

for metrics in cve["metrics"]["cvssMetricV31"]:
#todo: do we need recommendations from NIST as well?
relevant = {}
relevant["source"] = metrics["source"]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @tellison is this the kind of intermediate data structure you were thinking about for representing the data before populating it into the BOM itself? (I know this is on the nist side, i can eventually move the ojvg side to a similar thing as well)

relevant["score"] = metrics["cvssData"]["baseScore"]
relevant["severity"] = metrics["cvssData"]["baseSeverity"]
relevant["method"] = "CVSSv3" #is this always true?
relevant["vector"] = metrics["cvssData"]["vectorString"]
ratings.append(relevant)
resp_dict["ratings"] = ratings
resp_dict["description"] = description
resp_dict["versions"] = extract_versions(cve["configurations"])
return resp_dict
def extract_versions(cve_configs):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tellison i think this is dubious, but i don't have a better way of finding an affects version. I'm open to suggestions here (i've basically manually parsed out the oracle jdk version, minus update, but we can special case that, and i'm assuming it's 1:1 with open jdk). There's code to extract it from openjvg, but they publish it at the top of the webpage, and the webpage can contain multiple cves, so i'm not sure that's the best place to get information

vers = []
for config in cve_configs:
oracle_jdk_start = "oracle:jdk:" #todo: do we care about non oracle
for node in config["nodes"]:
for match in node["cpeMatch"]:
crit = match["criteria"]

if oracle_jdk_start in crit:
ver = crit[crit.index(oracle_jdk_start)+len(oracle_jdk_start):]
ver = ver[:ver.index(":")] #todo: this truncates update version
vers.append(ver)
return vers
def enhance(vulns: list[Vulnerability]):
count = 0
for vuln in vulns:
count +=1
id = vuln.id
url = 'https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=' + id
nist_resp = fetch_nist(url, id)
if nist_resp is None:
continue
try:
relevant = extract_relevant_parts(nist_resp)
except KeyError:
continue
print("\n\n\n\n\n\nvuln: {} index {} ".format(id, count))
# print(json.dumps(relevant, indent=True))
for rating in relevant["ratings"]:
#todo: convert the ratings into the cyclonedx enums?
vr = VulnerabilityRating(
source = VulnerabilitySource(url=rating["source"]),
score = rating["score"],
vector = rating["vector"] ,
method = VulnerabilityScoreSource.CVSS_V3_1
)
vuln.ratings.add(vr)
vuln.description = relevant["description"]
for affects in vuln.affects:
for ver in relevant["versions"]:
affects.versions.add(ver)
# print(vuln)
95 changes: 59 additions & 36 deletions cvereporter/report.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,73 @@
from cyclonedx.exception import MissingOptionalDependencyException
from cyclonedx.factory.license import LicenseFactory
from cyclonedx.model import OrganizationalEntity, XsUri
from cyclonedx.model import OrganizationalEntity, XsUri, ExternalReferenceType
from cyclonedx.model.bom import Bom
from cyclonedx.model.component import Component, ComponentType
from cyclonedx.model.component import Component, ComponentType, ExternalReference
from cyclonedx.model.impact_analysis import ImpactAnalysisAffectedStatus
from cyclonedx.model.vulnerability import Vulnerability, VulnerabilitySource,VulnerabilityScoreSource, VulnerabilityRating, VulnerabilitySeverity, BomTarget, BomTargetVersionRange
from cyclonedx.output import make_outputter, LATEST_SUPPORTED_SCHEMA_VERSION
from cyclonedx.output.json import JsonV1Dot4
from cyclonedx.schema import SchemaVersion, OutputFormat
from cyclonedx.validation.json import JsonStrictValidator
from cyclonedx.validation import make_schemabased_validator
from datetime import datetime
"""
utilities to create the CycloneDX BOM objects and serialize it to JSON
"""
def get_base_bom() -> Bom:
lc_factory = LicenseFactory()
bom = Bom()
bom.metadata.component = root_component = Component(
name='Eclipse Temurin',
type=ComponentType.APPLICATION,
licenses=[lc_factory.make_from_string('GPL-2.0 WITH Classpath-exception-2.0')],
bom_ref='temurin-vdr',
supplier="Eclipse foundation",
external_references=[ExternalReference(type=ExternalReferenceType.DISTRIBUTION, url = XsUri("http://www.adoptium.net"))]
)
return bom

# based on sample code from https://cyclonedx-python-library.readthedocs.io/en/latest/examples.html
lc_factory = LicenseFactory()
bom = Bom()
bom.metadata.component = root_component = Component(
name='Eclipse Temurin',
type=ComponentType.APPLICATION,
licenses=[lc_factory.make_from_string('GPL v2')],
bom_ref='temurin-vdr',
)
def serialize_to_json(bom: Bom) -> str:
my_json_outputter: 'JsonOutputter' = JsonV1Dot4(bom)
serialized_json = my_json_outputter.output_as_string(indent=2)
print("\n\n\n")
print(serialized_json)
return serialized_json


def sbom_creation_test():
# based on sample code from https://cyclonedx-python-library.readthedocs.io/en/latest/examples.html
lc_factory = LicenseFactory()
bom = Bom()
bom.metadata.component = root_component = Component(
name='Eclipse Temurin',
type=ComponentType.APPLICATION,
licenses=[lc_factory.make_from_string('GPL v2')],
bom_ref='temurin-vdr',
)

vuln1 = Vulnerability(
id="CVE-2-23-25193",
source=VulnerabilitySource(name="NVD", url="https://nvd.nist.gov/vuln/detail/CVE-2023-25193"),
published=datetime.strptime("2023-02-04T20:15:08.027", "%Y-%m-%dT%H:%M:%S.%f"),
updated=datetime.strptime("2023-07-25T15:15:13.163", "%Y-%m-%dT%H:%M:%S.%f"),
description="hb-ot-layout-gsubgpos.hh in HarfBuzz through 6.0.0 allows attackers to trigger O(n^2) growth via consecutive marks during the process of looking back for base glyphs when attaching marks.",
recommendation="Upgrade to the latest version of Eclipse Temurin."
)

rating1 = VulnerabilityRating(
source=VulnerabilitySource(url= "https://openjdk.org/groups/vulnerability/advisories", name="OJVG"),
score=3.7,
severity=VulnerabilitySeverity.LOW,
method=VulnerabilityScoreSource.CVSS_V3_1,
vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H"
)
vuln1.ratings.add(rating1)
bom.vulnerabilities.add(vuln1)
affects1_range = [BomTargetVersionRange(range="vers:semver/<=1.8.0.update_382|<=11.0.20|<=17.0.8|<=20.0.2", status=ImpactAnalysisAffectedStatus.AFFECTED)]
affects1 = BomTarget(ref="temurin-vdr")
vuln1.affects.add(affects1)
vuln1 = Vulnerability(
id="CVE-2-23-25193",
source=VulnerabilitySource(name="NVD", url="https://nvd.nist.gov/vuln/detail/CVE-2023-25193"),
published=datetime.strptime("2023-02-04T20:15:08.027", "%Y-%m-%dT%H:%M:%S.%f"),
updated=datetime.strptime("2023-07-25T15:15:13.163", "%Y-%m-%dT%H:%M:%S.%f"),
description="hb-ot-layout-gsubgpos.hh in HarfBuzz through 6.0.0 allows attackers to trigger O(n^2) growth via consecutive marks during the process of looking back for base glyphs when attaching marks.",
recommendation="Upgrade to the latest version of Eclipse Temurin."
)

my_json_outputter: 'JsonOutputter' = JsonV1Dot4(bom)
serialized_json = my_json_outputter.output_as_string(indent=2)
print(serialized_json)
rating1 = VulnerabilityRating(
source=VulnerabilitySource(url= "https://openjdk.org/groups/vulnerability/advisories", name="OJVG"),
score=3.7,
severity=VulnerabilitySeverity.LOW,
method=VulnerabilityScoreSource.CVSS_V3_1,
vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H"
)
vuln1.ratings.add(rating1)
bom.vulnerabilities.add(vuln1)
affects1_range = [BomTargetVersionRange(range="vers:semver/<=1.8.0.update_382|<=11.0.20|<=17.0.8|<=20.0.2", status=ImpactAnalysisAffectedStatus.AFFECTED)]
affects1 = BomTarget(ref="temurin-vdr")
vuln1.affects.add(affects1)

my_json_outputter: 'JsonOutputter' = JsonV1Dot4(bom)
serialized_json = my_json_outputter.output_as_string(indent=2)
print("\n\n\n")
print(serialized_json)