Skip to content

Commit

Permalink
✨ merge acunetix and acunetix360 (#9522)
Browse files Browse the repository at this point in the history
* ✨ merge acunetix and acunetix360

* move unittests together

* merge unittests

* fix unittests

* remove acunetix360

* update docs

* update get_description_for_scan_types

* flake8

* 🐛 fix unittests

* add db migrations

* flake8,rufflinter

* resolve db migrations problem in advance

* fix db migrations according to latest dev

* 🐛 fix, see PR 9606

* basic structure update

* update acunetix xml

* update acunetix360 json

* 🐛 fix

* flake8

* update

* 🚧 db migration revert option

* revert last commit

* remove deduplication setting

* update db migrations

* adapt db migrations

* fix db migrations according to latest dev
  • Loading branch information
manuel-sommer committed Mar 27, 2024
1 parent aafdc41 commit 150b4b4
Show file tree
Hide file tree
Showing 16 changed files with 368 additions and 351 deletions.
2 changes: 1 addition & 1 deletion docs/content/en/integrations/parsers/file/acunetix.md
Expand Up @@ -2,7 +2,7 @@
title: "Acunetix Scanner"
toc_hide: true
---
XML format
This parser imports the Acunetix Scanner with xml output or Acunetix 360 Scanner with JSON output.

### Sample Scan Data
Sample Acunetix Scanner scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/acunetix).
8 changes: 0 additions & 8 deletions docs/content/en/integrations/parsers/file/acunetix360.md

This file was deleted.

55 changes: 55 additions & 0 deletions dojo/db_migrations/0208_merge_acunetix.py
@@ -0,0 +1,55 @@
from django.db import migrations
import logging


logger = logging.getLogger(__name__)


PARSER_REFERENCES = ['Acunetix360 Scan']


def update_parser_test(test, parser_test_type) -> None:
if test.test_type.name in PARSER_REFERENCES or test.scan_type in PARSER_REFERENCES:
test.test_type = parser_test_type
test.scan_type = parser_test_type.name
test.save()


# Update the found_by field to remove Acunetix360 and add Acunetix
def update_parser_finding(finding, newparser_test_type, parser_test_type) -> None:
# Check if nessus is in found by list and remove
if parser_test_type in finding.found_by.all():
finding.found_by.remove(parser_test_type.id)
# Check if tenable is already in list somehow before adding it
if newparser_test_type not in finding.found_by.all():
finding.found_by.add(newparser_test_type.id)
finding.save()


# Update all finding objects that came from Acunetix360 reports
def forward_merge_parser(apps, schema_editor):
finding_model = apps.get_model('dojo', 'Finding')
test_type_model = apps.get_model('dojo', 'Test_Type')
# Get or create Acunetix Scan Test Type and fetch the Acunetix360 Scan test types
newparser_test_type, _ = test_type_model.objects.get_or_create(name="Acunetix Scan", defaults={"active": True})
parser_test_type = test_type_model.objects.filter(name="Acunetix360 Scan").first()
# Get all the findings found by Acunetix360 Scan
findings = finding_model.objects.filter(test__scan_type__in=PARSER_REFERENCES)
logger.warning(f'We identified {findings.count()} Acunetix360 Scan findings to migrate to Acunetix Scan findings')
# Iterate over all findings and change
for finding in findings:
# Update the found by field
update_parser_finding(finding, newparser_test_type, parser_test_type)
# Update the test object
update_parser_test(finding.test, newparser_test_type)


class Migration(migrations.Migration):

dependencies = [
('dojo', '0207_alter_sonarqube_issue_key'),
]

operations = [
migrations.RunPython(forward_merge_parser),
]
3 changes: 0 additions & 3 deletions dojo/settings/settings.dist.py
Expand Up @@ -1201,7 +1201,6 @@ def saml2_attrib_map_format(dict):
'Symfony Security Check': ['title', 'vulnerability_ids'],
'DSOP Scan': ['vulnerability_ids'],
'Acunetix Scan': ['title', 'description'],
'Acunetix360 Scan': ['title', 'description'],
'Terrascan Scan': ['vuln_id_from_tool', 'title', 'severity', 'file_path', 'line', 'component_name'],
'Trivy Operator Scan': ['title', 'severity', 'vulnerability_ids'],
'Trivy Scan': ['title', 'severity', 'vulnerability_ids', 'cwe', 'description'],
Expand Down Expand Up @@ -1289,7 +1288,6 @@ def saml2_attrib_map_format(dict):
'Qualys Scan': True,
'DSOP Scan': True,
'Acunetix Scan': True,
'Acunetix360 Scan': True,
'Trivy Operator Scan': True,
'Trivy Scan': True,
'SpotBugs Scan': False,
Expand Down Expand Up @@ -1389,7 +1387,6 @@ def saml2_attrib_map_format(dict):
'Qualys Scan': DEDUPE_ALGO_HASH_CODE,
'PHP Symfony Security Check': DEDUPE_ALGO_HASH_CODE,
'Acunetix Scan': DEDUPE_ALGO_HASH_CODE,
'Acunetix360 Scan': DEDUPE_ALGO_HASH_CODE,
'Clair Scan': DEDUPE_ALGO_HASH_CODE,
# 'Qualys Webapp Scan': DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, # Must also uncomment qualys webapp line in hashcode fields per scanner
'Veracode Scan': DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
Expand Down
@@ -1,28 +1,19 @@
import json
from dateutil import parser
import html2text

from cvss import parser as cvss_parser
from dateutil import parser
from dojo.models import Finding, Endpoint

from dojo.models import Endpoint, Finding

class Acunetix360Parser(object):
def get_scan_types(self):
return ["Acunetix360 Scan"]

def get_label_for_scan_types(self, scan_type):
return "Acunetix360 Scan"

def get_description_for_scan_types(self, scan_type):
return "Acunetix360 JSON format."

class AcunetixJSONParser(object):
"""This parser is written for Acunetix JSON Findings."""
def get_findings(self, filename, test):
dupes = dict()
data = json.load(filename)
dupes = dict()
scan_date = parser.parse(data["Generated"])
text_maker = html2text.HTML2Text()
text_maker.body_width = 0

for item in data["Vulnerabilities"]:
title = item["Name"]
findingdetail = text_maker.handle(item.get("Description", ""))
Expand Down Expand Up @@ -53,7 +44,6 @@ def get_findings(self, filename, test):
response = item["HttpResponse"]["Content"]
if response is None or len(response) <= 0:
response = "Response Not Found"

finding = Finding(
title=title,
test=test,
Expand All @@ -66,7 +56,6 @@ def get_findings(self, filename, test):
cwe=cwe,
static_finding=True,
)

if (
(item["Classification"] is not None)
and (item["Classification"]["Cvss"] is not None)
Expand All @@ -86,19 +75,15 @@ def get_findings(self, filename, test):
elif "FalsePositive" in state:
finding.false_p = True
finding.active = False

finding.unsaved_req_resp = [{"req": request, "resp": response}]
finding.unsaved_endpoints = [Endpoint.from_uri(url)]

if item.get("FirstSeenDate"):
parseddate = parser.parse(item["FirstSeenDate"])
finding.date = parseddate

if dupe_key in dupes:
find = dupes[dupe_key]
find.unsaved_req_resp.extend(finding.unsaved_req_resp)
find.unsaved_endpoints.extend(finding.unsaved_endpoints)
else:
dupes[dupe_key] = finding

return list(dupes.values())
176 changes: 176 additions & 0 deletions dojo/tools/acunetix/parse_acunetix_xml.py
@@ -0,0 +1,176 @@
import hashlib
import dateutil
import html2text
import logging
import hyperlink
from cvss import parser as cvss_parser
from defusedxml.ElementTree import parse
from dojo.models import Endpoint, Finding
logger = logging.getLogger(__name__)


class AcunetixXMLParser(object):
"""This parser is written for Acunetix XML reports"""
def get_findings(self, filename, test):
dupes = dict()
root = parse(filename).getroot()
for scan in root.findall("Scan"):
start_url = scan.findtext("StartURL")
if ":" not in start_url:
start_url = "//" + start_url
# get report date
if scan.findtext("StartTime") and "" != scan.findtext("StartTime"):
report_date = dateutil.parser.parse(
scan.findtext("StartTime")
).date()
for item in scan.findall("ReportItems/ReportItem"):
finding = Finding(
test=test,
title=item.findtext("Name"),
severity=self.get_severity(item.findtext("Severity")),
description=html2text.html2text(
item.findtext("Description")
).strip(),
false_p=self.get_false_positive(
item.findtext("IsFalsePositive")
),
static_finding=True,
dynamic_finding=False,
nb_occurences=1,
)
if item.findtext("Impact") and "" != item.findtext("Impact"):
finding.impact = item.findtext("Impact")
if item.findtext("Recommendation") and "" != item.findtext(
"Recommendation"
):
finding.mitigation = item.findtext("Recommendation")
if report_date:
finding.date = report_date
if item.findtext("CWEList/CWE"):
finding.cwe = self.get_cwe_number(
item.findtext("CWEList/CWE")
)
references = []
for reference in item.findall("References/Reference"):
url = reference.findtext("URL")
db = reference.findtext("Database") or url
references.append(" * [{}]({})".format(db, url))
if len(references) > 0:
finding.references = "\n".join(references)
if item.findtext("CVSS3/Descriptor"):
cvss_objects = cvss_parser.parse_cvss_from_text(
item.findtext("CVSS3/Descriptor")
)
if len(cvss_objects) > 0:
finding.cvssv3 = cvss_objects[0].clean_vector()
# more description are in "Details"
if (
item.findtext("Details")
and len(item.findtext("Details").strip()) > 0
):
finding.description += "\n\n**Details:**\n{}".format(
html2text.html2text(item.findtext("Details"))
)
if (
item.findtext("TechnicalDetails")
and len(item.findtext("TechnicalDetails").strip()) > 0
):
finding.description += (
"\n\n**TechnicalDetails:**\n\n{}".format(
item.findtext("TechnicalDetails")
)
)
# add requests
finding.unsaved_req_resp = list()
if len(item.findall("TechnicalDetails/Request")):
finding.dynamic_finding = (
True # if there is some requests it's dynamic
)
finding.static_finding = (
False # if there is some requests it's dynamic
)
for request in item.findall("TechnicalDetails/Request"):
finding.unsaved_req_resp.append(
{"req": (request.text or ""), "resp": ""}
)
# manage the endpoint
url = hyperlink.parse(start_url)
endpoint = Endpoint(
host=url.host,
port=url.port,
path=item.findtext("Affects"),
)
if url.scheme is not None and "" != url.scheme:
endpoint.protocol = url.scheme
finding.unsaved_endpoints = [endpoint]
dupe_key = hashlib.sha256(
"|".join(
[
finding.title,
str(finding.impact),
str(finding.mitigation),
]
).encode("utf-8")
).hexdigest()
if dupe_key in dupes:
find = dupes[dupe_key]
# add details for the duplicate finding
if (
item.findtext("Details")
and len(item.findtext("Details").strip()) > 0
):
find.description += (
"\n-----\n\n**Details:**\n{}".format(
html2text.html2text(item.findtext("Details"))
)
)
find.unsaved_endpoints.extend(finding.unsaved_endpoints)
find.unsaved_req_resp.extend(finding.unsaved_req_resp)
find.nb_occurences += finding.nb_occurences
logger.debug(
"Duplicate finding : {defectdojo_title}".format(
defectdojo_title=finding.title
)
)
else:
dupes[dupe_key] = finding
return list(dupes.values())

def get_cwe_number(self, cwe):
"""
Returns cwe number.
:param cwe:
:return: cwe number
"""
if cwe is None:
return None
else:
return int(cwe.split("-")[1])

def get_severity(self, severity):
"""
Returns Severity as per DefectDojo standards.
:param severity:
:return:
"""
if severity == "high":
return "High"
elif severity == "medium":
return "Medium"
elif severity == "low":
return "Low"
elif severity == "informational":
return "Info"
else:
return "Critical"

def get_false_positive(self, false_p):
"""
Returns True, False for false positive as per DefectDojo standards.
:param false_p:
:return:
"""
if false_p:
return True
else:
return False

0 comments on commit 150b4b4

Please sign in to comment.