Skip to content

Commit

Permalink
🎇 refactor sonarqube and add JSON parsing for api export (#9734)
Browse files Browse the repository at this point in the history
* 🎇 refactor sonarqube and add JSON parsing for api export

* 🚧 start with api json

* continue work

* update

* update

* fix

* 🎉 also advance to support multiple files at once via zip due to pagination

* advance unittests

* advance documentation

* update documentation

* update documentation

* add tags to distinguish between findings

* :pencile: docs

* add cve

* add cwe

* add cvssscore

* 💄

* 🎉 add components

* add ghsa

* 🐛 fix for empty zip file

* empty json file

* fix documentation

* 🐛 fix for different message structure

* parse hotspots

* fix according to review

* ruff
  • Loading branch information
manuel-sommer committed Apr 3, 2024
1 parent 0dc0b22 commit f005661
Show file tree
Hide file tree
Showing 13 changed files with 984 additions and 297 deletions.
26 changes: 22 additions & 4 deletions docs/content/en/integrations/parsers/file/sonarqube.md
Expand Up @@ -2,7 +2,26 @@
title: "SonarQube"
toc_hide: true
---
## SonarQube Scan (Aggregates findings per cwe, title, description, file\_path.)
# SonarQube Scan
There are two ways to retrieve findings from SonarQube. You can either use the [soprasteria package](https://github.com/soprasteria/sonar-report) or the SonarQube REST API directly.
Both ways (**SonarQube REST API** and **Soprasteria**) are depicted below.

### Sample Scan Data
Sample SonarQube scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/sonarqube).

## SonarQube REST API
You can retrieve the JSON directly from SonarQube if you use one of the following REST API endpoint:
- `<sonarqubeurl>/api/issues/search?projects=<projectkey>`
- `<sonarqubeurl>/api/hotspots/search?projectKey=<projectkey>`

### JSON
The REST API JSON output can be uploaded to DefectDojo with "SonarQube Scan".

### ZIP
If you have too many findings in one project, you can implement a small script to handle pagination and put all JSON files in a .zip file. This zip file can also be parsed from DefectDojo with "SonarQube Scan".

## Soprasteria
### Soprasteria SonarQube Scan (Aggregates findings per cwe, title, description, file\_path.)

SonarQube output file can be imported in HTML format or JSON format. JSON format generated by options `--save-report-json` and have same behavior with HTML format.

Expand All @@ -12,7 +31,7 @@ To generate the report, see
Version: \>= 1.1.0
Recommend version for both format \>= 3.1.2

## SonarQube Scan Detailed (Import all findings from SonarQube html report.)
### Soprasteria SonarQube Scan Detailed (Import all findings from SonarQube html report.)

SonarQube output file can be imported in HTML format or JSON format. JSON format generated by options `--save-report-json` and have same behavior with HTML format.

Expand All @@ -23,5 +42,4 @@ Version: \>= 1.1.0.
Recommend version for both format \>= 3.1.2


### Sample Scan Data
Sample SonarQube scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/sonarqube).

317 changes: 24 additions & 293 deletions dojo/tools/sonarqube/parser.py
@@ -1,12 +1,11 @@
import logging
import re

from django.utils.html import strip_tags
from dojo.tools.sonarqube.soprasteria_json import SonarQubeSoprasteriaJSON
from dojo.tools.sonarqube.soprasteria_html import SonarQubeSoprasteriaHTML
from dojo.tools.sonarqube.sonarqube_restapi_json import SonarQubeRESTAPIJSON
from dojo.tools.sonarqube.sonarqube_restapi_zip import SonarQubeRESTAPIZIP
from lxml import etree
import zipfile
import json

from dojo.models import Finding

logger = logging.getLogger(__name__)


Expand All @@ -24,301 +23,33 @@ def get_label_for_scan_types(self, scan_type):

def get_description_for_scan_types(self, scan_type):
if scan_type == "SonarQube Scan":
return "Aggregates findings per cwe, title, description, file_path. SonarQube output file can be imported in HTML format or JSON format. Generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"
return "Aggregates findings per cwe, title, description, file_path. SonarQube output file can be imported in HTML format or JSON format. You can get the JSON output directly if you use the SonarQube API or generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"
else:
return "Import all findings from sonarqube html report or JSON format. SonarQube output file can be imported in HTML format or JSON format. Generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"

def get_findings(self, filename, test):
if filename.name.strip().lower().endswith(".json"):
json_content = json.load(filename)
return self.get_json_items(json_content, test, self.mode)
def get_findings(self, file, test):
if file.name.endswith(".json"):
json_content = json.load(file)
if json_content.get("date") and json_content.get("projectName") and json_content.get("hotspotKeys"):
return SonarQubeSoprasteriaJSON().get_json_items(json_content, test, self.mode)
elif json_content.get("paging") and json_content.get("components"):
return SonarQubeRESTAPIJSON().get_json_items(json_content, test, self.mode)
else:
return []
if file.name.endswith(".zip"):
if str(file.__class__) == "<class '_io.TextIOWrapper'>":
input_zip = zipfile.ZipFile(file.name, 'r')
else:
input_zip = zipfile.ZipFile(file, 'r')
zipdata = {name: input_zip.read(name) for name in input_zip.namelist()}
return SonarQubeRESTAPIZIP().get_items(zipdata, test, self.mode)
else:
parser = etree.HTMLParser()
tree = etree.parse(filename, parser)
tree = etree.parse(file, parser)
if self.mode not in [None, "detailed"]:
raise ValueError(
"Internal error: Invalid mode "
+ self.mode
+ ". Expected: one of None, 'detailed'"
)

return self.get_items(tree, test, self.mode)

def get_json_items(self, json_content, test, mode):
dupes = dict()
rules = json_content["rules"]
issues = json_content["issues"]
for issue in issues:
key = issue["key"]
line = str(issue["line"])
mitigation = issue["message"]
title = issue["description"]
file_path = issue["component"]
severity = self.convert_sonar_severity(issue["severity"])
rule_id = issue["rule"]

if title is None or mitigation is None:
raise ValueError(
"Parser ValueError: can't find a title or a mitigation for vulnerability of name "
+ rule_id
)

try:
issue_detail = rules[rule_id]
parser = etree.HTMLParser()
html_desc_as_e_tree = etree.fromstring(issue_detail["htmlDesc"], parser)
issue_description = self.get_description(html_desc_as_e_tree)
logger.debug(issue_description)
issue_references = self.get_references(
rule_id, html_desc_as_e_tree
)
issue_cwe = self.get_cwe(issue_references)
except KeyError:
issue_description = "No description provided"
issue_references = ""
issue_cwe = 0

if mode is None:
self.process_result_file_name_aggregated(
test,
dupes,
title,
issue_cwe,
issue_description,
file_path,
line,
severity,
mitigation,
issue_references,
)
else:
self.process_result_detailed(
test,
dupes,
title,
issue_cwe,
issue_description,
file_path,
line,
severity,
mitigation,
issue_references,
key,
)
return list(dupes.values())

def get_items(self, tree, test, mode):
# Check that there is at least one vulnerability (the vulnerabilities
# table is absent when no vuln are found)
detailTbody = tree.xpath(
"/html/body/div[contains(@class,'detail')]/table/tbody"
)
dupes = dict()
if len(detailTbody) == 2:
# First is "Detail of the Detected Vulnerabilities" (not present if no vuln)
# Second is "Known Security Rules"
vulnerabilities_table = list(detailTbody[0].iter("tr"))
rules_table = list(detailTbody[1].xpath("tr"))

# iterate over the rules once to get the information we need
rulesDic = dict()
for rule in rules_table:
rule_properties = list(rule.iter("td"))
rule_name = list(rule_properties[0].iter("a"))[0].text.strip()
rule_details = list(rule_properties[1].iter("details"))[0]
rulesDic[rule_name] = rule_details

for vuln in vulnerabilities_table:
vuln_properties = list(vuln.iter("td"))
rule_key = list(vuln_properties[0].iter("a"))[0].text
vuln_rule_name = rule_key and rule_key.strip()
vuln_severity = self.convert_sonar_severity(
vuln_properties[1].text and vuln_properties[1].text.strip()
)
vuln_file_path = vuln_properties[2].text and vuln_properties[2].text.strip()
vuln_line = vuln_properties[3].text and vuln_properties[3].text.strip()
vuln_title = vuln_properties[4].text and vuln_properties[4].text.strip()
vuln_mitigation = vuln_properties[5].text and vuln_properties[5].text.strip()
vuln_key = vuln_properties[6].text and vuln_properties[6].text.strip()
if vuln_title is None or vuln_mitigation is None:
raise ValueError(
"Parser ValueError: can't find a title or a mitigation for vulnerability of name "
+ vuln_rule_name
)
try:
vuln_details = rulesDic[vuln_rule_name]
vuln_description = self.get_description(vuln_details)
vuln_references = self.get_references(
vuln_rule_name, vuln_details
)
vuln_cwe = self.get_cwe(vuln_references)
except KeyError:
vuln_description = "No description provided"
vuln_references = ""
vuln_cwe = 0
if mode is None:
self.process_result_file_name_aggregated(
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
)
else:
self.process_result_detailed(
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
vuln_key,
)
return list(dupes.values())

# Process one vuln from the report for "SonarQube Scan detailed"
# Create the finding and add it into the dupes list
def process_result_detailed(
self,
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
vuln_key,
):
# vuln_key is the unique id from tool which means that there is
# basically no aggregation except real duplicates
aggregateKeys = "{}{}{}{}{}".format(
vuln_cwe, vuln_title, vuln_description, vuln_file_path, vuln_key
)
find = Finding(
title=vuln_title,
cwe=int(vuln_cwe),
description=vuln_description,
file_path=vuln_file_path,
line=vuln_line,
test=test,
severity=vuln_severity,
mitigation=vuln_mitigation,
references=vuln_references,
false_p=False,
duplicate=False,
out_of_scope=False,
mitigated=None,
impact="No impact provided",
static_finding=True,
dynamic_finding=False,
unique_id_from_tool=vuln_key,
)
dupes[aggregateKeys] = find

# Process one vuln from the report for "SonarQube Scan"
# Create the finding and add it into the dupes list
# For aggregated findings:
# - the description is enriched with each finding line number
# - the mitigation (message) is concatenated with each finding's mitigation value
def process_result_file_name_aggregated(
self,
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
):
aggregateKeys = "{}{}{}{}".format(
vuln_cwe, vuln_title, vuln_description, vuln_file_path
)
descriptionOneOccurence = "Line: {}".format(vuln_line)
if aggregateKeys not in dupes:
find = Finding(
title=vuln_title,
cwe=int(vuln_cwe),
description=vuln_description
+ "\n\n-----\nOccurences:\n"
+ descriptionOneOccurence,
file_path=vuln_file_path,
# No line number because we have aggregated different
# vulnerabilities that may have different line numbers
test=test,
severity=vuln_severity,
mitigation=vuln_mitigation,
references=vuln_references,
false_p=False,
duplicate=False,
out_of_scope=False,
mitigated=None,
impact="No impact provided",
static_finding=True,
dynamic_finding=False,
nb_occurences=1,
)
dupes[aggregateKeys] = find
else:
# We have already created a finding for this aggregate: updates the
# description, nb_occurences and mitigation (message field in the
# report which may vary for each vuln)
find = dupes[aggregateKeys]
find.description = "{}\n{}".format(
find.description, descriptionOneOccurence
)
find.mitigation = "{}\n______\n{}".format(
find.mitigation, vuln_mitigation
)
find.nb_occurences = find.nb_occurences + 1

def convert_sonar_severity(self, sonar_severity):
sev = sonar_severity.lower()
if sev == "blocker":
return "Critical"
elif sev == "critical":
return "High"
elif sev == "major":
return "Medium"
elif sev == "minor":
return "Low"
else:
return "Info"

def get_description(self, vuln_details):
rule_description = etree.tostring(
vuln_details, pretty_print=True
).decode("utf-8", errors="replace")
rule_description = rule_description.split("<h2>See", 1)[0]
rule_description = (str(rule_description)).replace("<h2>", "**")
rule_description = (str(rule_description)).replace("</h2>", "**")
rule_description = strip_tags(rule_description).strip()
return rule_description

def get_references(self, rule_name, vuln_details):
rule_references = rule_name
for a in vuln_details.iter("a"):
rule_references += "\n" + str(a.text)
return rule_references

def get_cwe(self, vuln_references):
# Match only the first CWE!
cweSearch = re.search("CWE-([0-9]*)", vuln_references, re.IGNORECASE)
if cweSearch:
return cweSearch.group(1)
else:
return 0
return SonarQubeSoprasteriaHTML().get_items(tree, test, self.mode)

0 comments on commit f005661

Please sign in to comment.