Skip to content

Commit

Permalink
Merge 0a20bce into fee0363
Browse files Browse the repository at this point in the history
  • Loading branch information
dmuhs committed Jan 28, 2020
2 parents fee0363 + 0a20bce commit 4e148e5
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 116 deletions.
2 changes: 1 addition & 1 deletion docs/usage.rst
Expand Up @@ -16,7 +16,7 @@ Using API Tokens
~~~~~~~~~~~~~~~~

This is the recommended way of authenticating with the MythX smart contract
analysis API. In the *Profile* section there is an elements labeled "MythX API Key".
analysis API. In the *Tools* section there is an elements labeled "MythX API Key".
To generate a new API key, the account password must be entered:

.. image:: _static/img/api-key-password.png
Expand Down
38 changes: 28 additions & 10 deletions mythx_cli/cli.py
Expand Up @@ -5,16 +5,24 @@
from glob import glob
from os.path import abspath, commonpath
from pathlib import Path
from typing import List, Optional, Tuple

import click
from mythx_models.response import (
AnalysisInputResponse,
AnalysisListResponse,
DetectedIssuesResponse,
GroupCreationResponse,
GroupListResponse,
)
from pythx import Client, MythXAPIError
from pythx.middleware.group_data import GroupDataMiddleware
from pythx.middleware.toolname import ClientToolNameMiddleware

from mythx_cli import __version__
from mythx_cli.formatter import FORMAT_RESOLVER, util
from mythx_cli.formatter.base import BaseFormatter
from mythx_cli.payload import generate_bytecode_payload, generate_solidity_payload, generate_truffle_payload
from mythx_models.response import AnalysisListResponse, DetectedIssuesResponse, GroupCreationResponse, GroupListResponse
from pythx import Client, MythXAPIError
from pythx.middleware.group_data import GroupDataMiddleware
from pythx.middleware.toolname import ClientToolNameMiddleware

LOGGER = logging.getLogger("mythx-cli")
logging.basicConfig(level=logging.WARNING)
Expand Down Expand Up @@ -321,17 +329,23 @@ def analyze(
write_or_print("\n".join(uuids))
return

issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]] = []
formatter: BaseFormatter = FORMAT_RESOLVER[ctx["fmt"]]
for uuid in uuids:
while not ctx["client"].analysis_ready(uuid):
# TODO: Add poll interval option
time.sleep(3)
resp: DetectedIssuesResponse = ctx["client"].report(uuid)
inp = ctx["client"].request_by_uuid(uuid)
inp: Optional[AnalysisInputResponse] = ctx["client"].request_by_uuid(
uuid
) if formatter.report_requires_input else None

util.filter_report(resp, min_severity=min_severity, swc_blacklist=swc_blacklist, swc_whitelist=swc_whitelist)
ctx["uuid"] = uuid
write_or_print(FORMAT_RESOLVER[ctx["fmt"]].format_detected_issues(resp, inp))
# extend response with job UUID to keep formatter logic isolated
resp.uuid = uuid
issues_list.append((resp, inp))

write_or_print(formatter.format_detected_issues(issues_list))
sys.exit(ctx["retval"])


Expand Down Expand Up @@ -491,13 +505,17 @@ def analysis_report(ctx, uuids, min_severity, swc_blacklist, swc_whitelist):
:return:
"""

issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]] = []
formatter: BaseFormatter = FORMAT_RESOLVER[ctx["fmt"]]
for uuid in uuids:
resp = ctx["client"].report(uuid)
inp = ctx["client"].request_by_uuid(uuid)
ctx["uuid"] = uuid
inp = ctx["client"].request_by_uuid(uuid) if formatter.report_requires_input else None

util.filter_report(resp, min_severity=min_severity, swc_blacklist=swc_blacklist, swc_whitelist=swc_whitelist)
write_or_print(FORMAT_RESOLVER[ctx["fmt"]].format_detected_issues(resp, inp))
resp.uuid = uuid
issues_list.append((resp, inp))

write_or_print(formatter.format_detected_issues(issues_list))
sys.exit(ctx["retval"])


Expand Down
5 changes: 4 additions & 1 deletion mythx_cli/formatter/base.py
@@ -1,6 +1,7 @@
"""This module contains the base formatter interface."""

import abc
from typing import List, Optional, Tuple

from mythx_models.response import (
AnalysisInputResponse,
Expand All @@ -14,6 +15,8 @@
class BaseFormatter(abc.ABC):
"""The base formatter interface for printing various response types."""

report_requires_input = False

@staticmethod
@abc.abstractmethod
def format_analysis_list(obj: AnalysisListResponse):
Expand All @@ -29,7 +32,7 @@ def format_analysis_status(resp: AnalysisStatusResponse) -> str:

@staticmethod
@abc.abstractmethod
def format_detected_issues(obj: DetectedIssuesResponse, inp: AnalysisInputResponse):
def format_detected_issues(issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]]):
"""Format an issue report response."""

pass # pragma: no cover
Expand Down
25 changes: 17 additions & 8 deletions mythx_cli/formatter/json.py
@@ -1,8 +1,8 @@
"""This module contains the compressed and pretty-printing JSON formatters."""

import json
from typing import List, Optional, Tuple

from mythx_cli.formatter.base import BaseFormatter
from mythx_models.response import (
AnalysisInputResponse,
AnalysisListResponse,
Expand All @@ -13,8 +13,12 @@
VersionResponse,
)

from mythx_cli.formatter.base import BaseFormatter


class JSONFormatter(BaseFormatter):
report_requires_input = False

@staticmethod
def format_group_status(resp: GroupStatusResponse):
"""Format a group status response as compressed JSON."""
Expand All @@ -40,10 +44,13 @@ def format_analysis_status(resp: AnalysisStatusResponse) -> str:
return resp.to_json()

@staticmethod
def format_detected_issues(resp: DetectedIssuesResponse, inp: AnalysisInputResponse) -> str:
def format_detected_issues(
issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]]
) -> str:
"""Format an issue report response as compressed JSON."""

return resp.to_json()
output = [resp.to_dict(as_list=True) for resp, _ in issues_list]
return json.dumps(output)

@staticmethod
def format_version(resp: VersionResponse) -> str:
Expand All @@ -53,13 +60,15 @@ def format_version(resp: VersionResponse) -> str:


class PrettyJSONFormatter(BaseFormatter):
report_requires_input = False

@staticmethod
def _print_as_json(obj):
def _print_as_json(obj, report_mode=False):
"""Pretty-print the given object's JSON representation."""

json_args = {"indent": 2, "sort_keys": True}
if type(obj) == DetectedIssuesResponse:
return json.dumps(obj.to_dict(as_list=True), **json_args)
if report_mode:
return json.dumps([resp.to_dict(as_list=True) for resp, _ in obj], **json_args)
return json.dumps(obj.to_dict(), **json_args)

@staticmethod
Expand Down Expand Up @@ -87,10 +96,10 @@ def format_analysis_status(obj: AnalysisStatusResponse) -> str:
return PrettyJSONFormatter._print_as_json(obj)

@staticmethod
def format_detected_issues(obj: DetectedIssuesResponse, inp: AnalysisInputResponse):
def format_detected_issues(issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]]):
"""Format an issue report response as pretty-printed JSON."""

return PrettyJSONFormatter._print_as_json(obj)
return PrettyJSONFormatter._print_as_json(issues_list, report_mode=True)

@staticmethod
def format_version(obj: VersionResponse):
Expand Down
56 changes: 30 additions & 26 deletions mythx_cli/formatter/simple_stdout.py
@@ -1,6 +1,6 @@
"""This module contains a simple text formatter class printing a subset of the response data."""

import click
from typing import List, Optional, Tuple

from mythx_models.response import (
AnalysisInputResponse,
Expand All @@ -17,6 +17,8 @@


class SimpleFormatter(BaseFormatter):
report_requires_input = True

@staticmethod
def format_analysis_list(resp: AnalysisListResponse) -> str:
"""Format an analysis list response to a simple text representation."""
Expand Down Expand Up @@ -70,34 +72,36 @@ def format_analysis_status(resp: AnalysisStatusResponse) -> str:
return "\n".join(res)

@staticmethod
def format_detected_issues(resp: DetectedIssuesResponse, inp: AnalysisInputResponse) -> str:
def format_detected_issues(
issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]]
) -> str:
"""Format an issue report to a simple text representation."""

res = []
ctx = click.get_current_context()
# TODO: Sort by file
for report in resp.issue_reports:
for issue in report.issues:
res.append(generate_dashboard_link(ctx.obj["uuid"]))
res.append("Title: {} ({})".format(issue.swc_title or "-", issue.severity))
res.append("Description: {}".format(issue.description_long.strip()))

for loc in issue.locations:
comp = loc.source_map.components[0]
source_list = loc.source_list or report.source_list
if source_list and 0 >= comp.file_id < len(source_list):
filename = source_list[comp.file_id]
if not inp.sources or filename not in inp.sources:
# Skip files we don't have source for
# (e.g. with unresolved bytecode hashes)
res.append("")
continue
line = get_source_location_by_offset(inp.sources[filename]["source"], comp.offset)
snippet = inp.sources[filename]["source"].split("\n")[line - 1]
res.append("{}:{}".format(filename, line))
res.append("\t" + snippet.strip())

res.append("")
for resp, inp in issues_list:
res = []
for report in resp.issue_reports:
for issue in report.issues:
res.append(generate_dashboard_link(resp.uuid))
res.append("Title: {} ({})".format(issue.swc_title or "-", issue.severity))
res.append("Description: {}".format(issue.description_long.strip()))

for loc in issue.locations:
comp = loc.source_map.components[0]
source_list = loc.source_list or report.source_list
if source_list and 0 >= comp.file_id < len(source_list):
filename = source_list[comp.file_id]
if not inp.sources or filename not in inp.sources:
# Skip files we don't have source for
# (e.g. with unresolved bytecode hashes)
res.append("")
continue
line = get_source_location_by_offset(inp.sources[filename]["source"], comp.offset)
snippet = inp.sources[filename]["source"].split("\n")[line - 1]
res.append("{}:{}".format(filename, line))
res.append("\t" + snippet.strip())

res.append("")

return "\n".join(res)

Expand Down
51 changes: 29 additions & 22 deletions mythx_cli/formatter/sonarqube.py
@@ -1,34 +1,41 @@
import json
from typing import List, Optional, Tuple

from mythx_cli.formatter.json import JSONFormatter
from mythx_models.response import AnalysisInputResponse, DetectedIssuesResponse
from mythx_models.response.issue import Severity, SourceType

from mythx_cli.formatter.json import JSONFormatter


class SonarQubeFormatter(JSONFormatter):
report_requires_input = False

@staticmethod
def format_detected_issues(resp: DetectedIssuesResponse, inp: AnalysisInputResponse) -> str:
def format_detected_issues(
issues_list: List[Tuple[DetectedIssuesResponse, Optional[AnalysisInputResponse]]]
) -> str:
new_reports = []
for report in resp.issue_reports:
for issue in report:
new_issue = {}
for loc in issue.decoded_locations:
for raw_loc in issue.locations:
if raw_loc.source_type != SourceType.SOLIDITY_FILE:
continue
new_issue["onInputFile"] = raw_loc.source_list[raw_loc.source_map.components[0].file_id]
new_issue["atLineNr"] = loc.start_line
for resp, _ in issues_list:
for report in resp.issue_reports:
for issue in report:
new_issue = {}
for loc in issue.decoded_locations:
for raw_loc in issue.locations:
if raw_loc.source_type != SourceType.SOLIDITY_FILE:
continue
new_issue["onInputFile"] = raw_loc.source_list[raw_loc.source_map.components[0].file_id]
new_issue["atLineNr"] = loc.start_line

new_issue.update(
{
"linterName": "mythx",
"forRule": issue.swc_id,
"ruleType": issue.severity.name,
"remediationEffortMinutes": 0,
"severity": "vulnerability" if issue.severity == Severity.HIGH else issue.severity.name,
"message": issue.description_long,
}
)
new_reports.append(new_issue)
new_issue.update(
{
"linterName": "mythx",
"forRule": issue.swc_id,
"ruleType": issue.severity.name,
"remediationEffortMinutes": 0,
"severity": "vulnerability" if issue.severity == Severity.HIGH else issue.severity.name,
"message": issue.description_long,
}
)
new_reports.append(new_issue)

return json.dumps(new_reports)

0 comments on commit 4e148e5

Please sign in to comment.