Skip to content

Commit

Permalink
fix: query CVEs without CPE match (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocervell committed Apr 23, 2024
1 parent 6b55e99 commit d02e09c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
2 changes: 2 additions & 0 deletions secator/output_types/vulnerability.py
Expand Up @@ -85,6 +85,8 @@ def __repr__(self):
s += f' \[[cyan]{tags_str}[/]]'
if data:
s += f' \[[yellow]{str(data)}[/]]'
if self.confidence == 'low':
s = f'[dim]{s}[/]'
return rich_to_ansi(s)

# def __gt__(self, other):
Expand Down
51 changes: 38 additions & 13 deletions secator/tasks/_categories.py
Expand Up @@ -139,6 +139,25 @@ def lookup_local_cve(cve_id):
# return None
# return cve_info

@staticmethod
def match_cpes(fs1, fs2):
"""Check if two CPEs match. Partial matches consisting of <vendor>:<product>:<version> are considered a match.
Args:
fs1 (str): Format string 1.
fs2 (str): Format string 2.
Returns:
bool: True if the two CPEs match, False otherwise.
"""
if fs1 == fs2:
return True
split_fs1 = fs1.split(':')
split_fs2 = fs2.split(':')
tup1 = split_fs1[3], split_fs1[4], split_fs1[5]
tup2 = split_fs2[3], split_fs2[4], split_fs2[5]
return tup1 == tup2

@staticmethod
def lookup_cve(cve_id, cpes=[]):
"""Search for a CVE in local db or using cve.circl.lu and return vulnerability data.
Expand Down Expand Up @@ -181,14 +200,15 @@ def lookup_cve(cve_id, cpes=[]):
cpe_fs = cpe_obj.as_fs()
# cpe_version = cpe_obj.get_version()[0]
vulnerable_fs = cve_info['vulnerable_product']
debug(f'Matching CPE {cpe} against {len(vulnerable_fs)} vulnerable products for {cve_id}', sub='cve')
for fs in vulnerable_fs:
if fs == cpe_fs:
debug(f'Found matching CPE FS {cpe_fs} ! The CPE is vulnerable to CVE {cve_id}', sub='cve')
# debug(f'{cve_id}: Testing {cpe_fs} against {fs}', sub='cve') # for hardcore debugging
if Vuln.match_cpes(cpe_fs, fs):
debug(f'{cve_id}: CPE match found for {cpe}.', sub='cve')
cpe_match = True
tags.append('cpe-match')
if not cpe_match:
return None
break
if not cpe_match:
debug(f'{cve_id}: no CPE match found for {cpe}.', sub='cve')

# Parse CVE id and CVSS
name = id = cve_info['id']
Expand Down Expand Up @@ -227,14 +247,7 @@ def lookup_cve(cve_id, cpes=[]):
# Set vulnerability severity based on CVSS score
severity = None
if cvss:
if cvss < 4:
severity = 'low'
elif cvss < 7:
severity = 'medium'
elif cvss < 9:
severity = 'high'
else:
severity = 'critical'
severity = Vuln.cvss_to_severity(cvss)

# Set confidence
confidence = 'low' if not cpe_match else 'high'
Expand Down Expand Up @@ -276,6 +289,18 @@ def lookup_ghsa(ghsa_id):
return data
return None

@staticmethod
def cvss_to_severity(cvss):
if cvss < 4:
severity = 'low'
elif cvss < 7:
severity = 'medium'
elif cvss < 9:
severity = 'high'
else:
severity = 'critical'
return severity


class VulnHttp(Vuln):
input_type = HOST
Expand Down
18 changes: 11 additions & 7 deletions secator/tasks/nmap.py
Expand Up @@ -10,10 +10,11 @@
HEADER, HOST, ID, IP, MATCHED_AT, NAME,
OPT_NOT_SUPPORTED, OUTPUT_PATH, PORT, PORTS, PROVIDER,
PROXY, RATE_LIMIT, REFERENCE, REFERENCES,
RETRIES, SCRIPT, SERVICE_NAME, STATE, TAGS,
RETRIES, SCRIPT, SERVICE_NAME, SEVERITY, STATE, TAGS,
THREADS, TIMEOUT, USER_AGENT)
from secator.output_types import Exploit, Port, Vulnerability
from secator.tasks._categories import VulnMulti
from secator.utils import debug

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -160,7 +161,7 @@ def __iter__(self):
EXTRA_DATA: extra_data,
}
if not func:
# logger.debug(f'Script output parser for "{script_id}" is not supported YET.')
debug(f'Script output parser for "{script_id}" is not supported YET.', sub='cve')
continue
for vuln in func(output, cpes=cpes):
vuln.update(metadata)
Expand Down Expand Up @@ -281,7 +282,7 @@ def _parse_vulscan_output(self, out, cpes=[]):
vuln.update(vuln_data)
yield vuln
else:
# logger.debug(f'Vulscan provider {provider_name} is not supported YET.')
debug(f'Vulscan provider {provider_name} is not supported YET.', sub='cve')
continue

def _parse_vulners_output(self, out, **kwargs):
Expand All @@ -292,7 +293,7 @@ def _parse_vulners_output(self, out, **kwargs):
continue
line = line.strip()
if line.startswith('cpe:'):
cpes.append(line)
cpes.append(line.rstrip(':'))
continue
elems = tuple(line.split('\t'))
vuln = {}
Expand All @@ -319,23 +320,26 @@ def _parse_vulners_output(self, out, **kwargs):

elif len(elems) == 3: # vuln
vuln_id, vuln_cvss, reference_url = tuple(line.split('\t'))
vuln_cvss = float(vuln_cvss)
vuln_id = vuln_id.split(':')[-1]
vuln_type = vuln_id.split('-')[0]
vuln = {
ID: vuln_id,
NAME: vuln_id,
PROVIDER: provider_name,
CVSS_SCORE: vuln_cvss,
SEVERITY: VulnMulti.cvss_to_severity(vuln_cvss),
REFERENCES: [reference_url],
TAGS: [],
CONFIDENCE: 'low'
}
if vuln_type == 'CVE':
if vuln_type == 'CVE' or vuln_type == 'PRION:CVE':
vuln[TAGS].append('cve')
vuln_data = VulnMulti.lookup_cve(vuln_id, cpes=cpes)
if vuln_data:
vuln.update(vuln_data)
yield vuln
else:
logger.debug(f'Vulners parser for "{vuln_type}" is not implemented YET.')
debug(f'Vulners parser for "{vuln_type}" is not implemented YET.', sub='cve')
else:
logger.error(f'Unrecognized vulners output: {elems}')
debug(f'Unrecognized vulners output: {elems}', sub='cve')

0 comments on commit d02e09c

Please sign in to comment.