Skip to content

Commit

Permalink
fix: better vuln detection (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocervell committed Apr 25, 2024
1 parent 98c986c commit 150b603
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 46 deletions.
1 change: 1 addition & 0 deletions secator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Runners(StrictModel):
input_chunk_size: int = 1000
progress_update_frequency: int = 60
skip_cve_search: bool = False
skip_cve_low_confidence: bool = True


class HTTP(StrictModel):
Expand Down
2 changes: 1 addition & 1 deletion secator/output_types/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Vulnerability(OutputType):
id: str = ''
matched_at: str = ''
ip: str = field(default='', compare=False)
confidence: int = 'low'
confidence: str = 'low'
severity: str = 'unknown'
cvss_score: float = 0
tags: List[str] = field(default_factory=list)
Expand Down
32 changes: 25 additions & 7 deletions secator/tasks/_categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bs4 import BeautifulSoup
from cpe import CPE

from secator.definitions import (CIDR_RANGE, CONFIDENCE, CVSS_SCORE, DELAY, DEPTH, DESCRIPTION, FILTER_CODES,
from secator.definitions import (CIDR_RANGE, CVSS_SCORE, DELAY, DEPTH, DESCRIPTION, FILTER_CODES,
FILTER_REGEX, FILTER_SIZE, FILTER_WORDS, FOLLOW_REDIRECT, HEADER, HOST, ID,
MATCH_CODES, MATCH_REGEX, MATCH_SIZE, MATCH_WORDS, METHOD, NAME, PATH, PROVIDER, PROXY,
RATE_LIMIT, REFERENCES, RETRIES, SEVERITY, TAGS, THREADS, TIMEOUT, URL, USER_AGENT,
Expand Down Expand Up @@ -139,6 +139,26 @@ def lookup_local_cve(cve_id):
# return None
# return cve_info

@staticmethod
def create_cpe_string(product_name, version):
"""
Generate a CPE string for a given product and version.
Args:
product_name (str): The name of the product.
version (str): The version of the product.
Returns:
str: A CPE string formatted according to the CPE 2.3 specification.
"""
cpe_version = "2.3" # CPE Specification version
part = "a" # 'a' for application
vendor = product_name.lower() # Vendor name, using product name
product = product_name.lower() # Product name
version = version # Product version
cpe_string = f"cpe:{cpe_version}:{part}:{vendor}:{product}:{version}:*:*:*:*:*:*:*"
return cpe_string

@staticmethod
def match_cpes(fs1, fs2):
"""Check if two CPEs match. Partial matches consisting of <vendor>:<product>:<version> are considered a match.
Expand Down Expand Up @@ -250,7 +270,6 @@ def lookup_cve(cve_id, cpes=[]):
severity = Vuln.cvss_to_severity(cvss)

# Set confidence
confidence = 'low' if not cpe_match else 'high'
vuln = {
ID: id,
NAME: name,
Expand All @@ -260,7 +279,6 @@ def lookup_cve(cve_id, cpes=[]):
TAGS: tags,
REFERENCES: [f'https://cve.circl.lu/cve/{id}'] + references,
DESCRIPTION: description,
CONFIDENCE: confidence
}
return vuln

Expand All @@ -283,10 +301,10 @@ def lookup_ghsa(ghsa_id):
soup = BeautifulSoup(resp.text, 'lxml')
sidebar_items = soup.find_all('div', {'class': 'discussion-sidebar-item'})
cve_id = sidebar_items[2].find('div').text.strip()
data = Vuln.lookup_cve(cve_id)
if data:
data[TAGS].append('ghsa')
return data
vuln = Vuln.lookup_cve(cve_id)
if vuln:
vuln[TAGS].append('ghsa')
return vuln
return None

@staticmethod
Expand Down
118 changes: 80 additions & 38 deletions secator/tasks/nmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import xmltodict

from secator.config import CONFIG
from secator.decorators import task
from secator.definitions import (CONFIDENCE, CVSS_SCORE, DELAY,
DESCRIPTION, EXTRA_DATA, FOLLOW_REDIRECT,
Expand All @@ -13,6 +14,7 @@
RETRIES, SCRIPT, SERVICE_NAME, SEVERITY, STATE, TAGS,
THREADS, TIMEOUT, TOP_PORTS, USER_AGENT)
from secator.output_types import Exploit, Port, Vulnerability
from secator.rich import console
from secator.tasks._categories import VulnMulti
from secator.utils import debug

Expand Down Expand Up @@ -116,20 +118,18 @@ def __iter__(self):

# Get extra data
extra_data = self._get_extra_data(port)
service_name = extra_data['service_name']
version_exact = extra_data.get('version_exact', False)
conf = extra_data.get('confidence')
if not version_exact:
console.print(
f'[bold orange1]nmap could not identify an exact version for {service_name} '
f'(detection confidence is {conf}): do not blindy trust the results ![/]'
)

# Grab CPEs
cpes = extra_data.get('cpe', [])

# Grab service name
service_name = ''
if 'product' in extra_data:
service_name = extra_data['product']
elif 'name' in extra_data:
service_name = extra_data['name']
if 'version' in extra_data:
version = extra_data['version']
service_name += f'/{version}'

# Get script output
scripts = self._get_scripts(port)

Expand Down Expand Up @@ -166,6 +166,13 @@ def __iter__(self):
continue
for vuln in func(output, cpes=cpes):
vuln.update(metadata)
confidence = 'low'
if 'cpe-match' in vuln[TAGS]:
confidence = 'high' if version_exact else 'medium'
vuln[CONFIDENCE] = confidence
if (CONFIG.runners.skip_cve_low_confidence and vuln[CONFIDENCE] == 'low'):
debug(f'{vuln[ID]}: ignored (low confidence).', sub='cve')
continue
yield vuln

#---------------------#
Expand Down Expand Up @@ -200,40 +207,74 @@ def _get_ip(self, host_cfg):
return host_cfg.get('address', {}).get('@addr', None)

def _get_extra_data(self, port_cfg):
extra_datas = {
extra_data = {
k.lstrip('@'): v
for k, v in port_cfg.get('service', {}).items()
}

# Strip product / version strings
if 'product' in extra_datas:
extra_datas['product'] = extra_datas['product'].lower()

if 'version' in extra_datas:
version_split = extra_datas['version'].split(' ')
version = None
if 'product' in extra_data:
extra_data['product'] = extra_data['product'].lower()

# Get version and post-process it
version = None
if 'version' in extra_data:
vsplit = extra_data['version'].split(' ')
version_exact = True
os = None
if len(version_split) == 3:
version, os, extra_version = tuple(version_split)
if len(vsplit) == 3:
version, os, extra_version = tuple(vsplit)
if os == 'or' and extra_version == 'later':
version_exact = False
os = None
version = f'{version}-{extra_version}'
elif len(version_split) == 2:
version, os = tuple(version_split)
elif len(version_split) == 1:
version = version_split[0]
elif len(vsplit) == 2:
version, os = tuple(vsplit)
elif len(vsplit) == 1:
version = vsplit[0]
else:
version = extra_datas['version']
version = extra_data['version']
if os:
extra_datas['os'] = os
extra_data['os'] = os
if version:
extra_data['version'] = version
extra_data['version_exact'] = version_exact

# Grap service name
product = extra_data.get('name', None) or extra_data.get('product', None)
if product:
service_name = product
if version:
extra_datas['version'] = version
service_name += f'/{version}'
extra_data['service_name'] = service_name

# Grab CPEs
cpes = extra_datas.get('cpe', [])
cpes = extra_data.get('cpe', [])
if not isinstance(cpes, list):
cpes = [cpes]
extra_datas['cpe'] = cpes
extra_data['cpe'] = cpes
debug(f'Found CPEs: {",".join(cpes)}', sub='cve')

# Grab confidence
conf = int(extra_data.get('conf', 0))
if conf > 7:
confidence = 'high'
elif conf > 4:
confidence = 'medium'
else:
confidence = 'low'
extra_data['confidence'] = confidence

# Build custom CPE
if product and version:
vsplit = version.split('-')
version_cpe = vsplit[0] if not version_exact else version
cpe = VulnMulti.create_cpe_string(product, version_cpe)
if cpe not in cpes:
cpes.append(cpe)
debug(f'Added new CPE from identified product and version: {cpe}', sub='cve')

return extra_datas
return extra_data

def _get_scripts(self, port_cfg):
scripts = port_cfg.get('script', [])
Expand Down Expand Up @@ -278,16 +319,16 @@ def _parse_vulscan_output(self, out, cpes=[]):
TAGS: [vuln_id, provider_name]
}
if provider_name == 'MITRE CVE':
vuln_data = VulnMulti.lookup_cve(vuln['id'], cpes=cpes)
if vuln_data:
vuln.update(vuln_data)
data = VulnMulti.lookup_cve(vuln['id'], cpes=cpes)
if data:
vuln.update(data)
yield vuln
else:
debug(f'Vulscan provider {provider_name} is not supported YET.', sub='cve')
continue

def _parse_vulners_output(self, out, **kwargs):
cpes = []
cpes = kwargs.get('cpes', [])
provider_name = 'vulners'
for line in out.splitlines():
if not line:
Expand All @@ -309,7 +350,8 @@ def _parse_vulners_output(self, out, **kwargs):
NAME: name,
PROVIDER: provider_name,
REFERENCE: reference_url,
'_type': 'exploit'
'_type': 'exploit',
TAGS: [exploit_id, provider_name]
# CVSS_SCORE: cvss_score,
# CONFIDENCE: 'low'
}
Expand All @@ -331,14 +373,14 @@ def _parse_vulners_output(self, out, **kwargs):
CVSS_SCORE: vuln_cvss,
SEVERITY: VulnMulti.cvss_to_severity(vuln_cvss),
REFERENCES: [reference_url],
TAGS: [],
TAGS: [vuln_id, provider_name],
CONFIDENCE: 'low'
}
if vuln_type == 'CVE' or vuln_type == 'PRION:CVE':
vuln[TAGS].append('cve')
vuln_data = VulnMulti.lookup_cve(vuln_id, cpes=cpes)
if vuln_data:
vuln.update(vuln_data)
data = VulnMulti.lookup_cve(vuln_id, cpes=cpes)
if data:
vuln.update(data)
yield vuln
else:
debug(f'Vulners parser for "{vuln_type}" is not implemented YET.', sub='cve')
Expand Down

0 comments on commit 150b603

Please sign in to comment.