Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
Fix: pep8 errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Tuukka Mustonen committed May 18, 2016
1 parent 11e01ea commit 0a934a4
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 66 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
@@ -1 +1,2 @@
setuptools-scm >= 1.11.0
pep8 >= 1.7.0
4 changes: 2 additions & 2 deletions features/authenticate.py.template
Expand Up @@ -15,7 +15,7 @@ def authenticate(context, auth_flow_id=None, acquire_new_authenticator=False):
flows, if the tests need different ones.
"""

####
####
# This is the place which you need to modify if your target
# requires authorisation. (If you don't, you can just copy this
# template as features/authenticate.py and you should be set.)
Expand Down Expand Up @@ -50,7 +50,7 @@ def authenticate(context, auth_flow_id=None, acquire_new_authenticator=False):
# implement it. If you use the auth methods Requests provides, do
# not return the CustomAuth object - instead, return one of the
# Requests library built-in authentication objects (see the doc
# link above).
# link above).
###

# Reuse an existing authenticator unless explicitly requested to
Expand Down
36 changes: 19 additions & 17 deletions mittn/headlessscanner/dbtools.py
Expand Up @@ -36,23 +36,25 @@ def open_database(context):
# be big.
db_metadata = MetaData()
db_metadata.bind = db_engine
context.headlessscanner_issues = Table('headlessscanner_issues',
db_metadata,
Column('new_issue', types.Boolean),
Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement
Column('timestamp', types.DateTime(timezone=True)),
Column('test_runner_host', types.Text),
Column('scenario_id', types.Text),
Column('url', types.Text),
Column('severity', types.Text),
Column('issuetype', types.Text),
Column('issuename', types.Text),
Column('issuedetail', types.Text),
Column('confidence', types.Text),
Column('host', types.Text),
Column('port', types.Text),
Column('protocol', types.Text),
Column('messages', types.LargeBinary))
context.headlessscanner_issues = Table(
'headlessscanner_issues',
db_metadata,
Column('new_issue', types.Boolean),
Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement
Column('timestamp', types.DateTime(timezone=True)),
Column('test_runner_host', types.Text),
Column('scenario_id', types.Text),
Column('url', types.Text),
Column('severity', types.Text),
Column('issuetype', types.Text),
Column('issuename', types.Text),
Column('issuedetail', types.Text),
Column('confidence', types.Text),
Column('host', types.Text),
Column('port', types.Text),
Column('protocol', types.Text),
Column('messages', types.LargeBinary)
)

# Create the table if it doesn't exist
# and otherwise no effect
Expand Down
6 changes: 4 additions & 2 deletions mittn/headlessscanner/proxy_comms.py
Expand Up @@ -53,12 +53,14 @@ def start_burp(context):
proxy_message = read_next_json(burpprocess)
if proxy_message is None:
kill_subprocess(burpprocess)
assert False, "Starting Burp Suite and extension failed or timed out. Is extension output set as stdout? Command line was: %s" % context.burp_cmdline
assert False, "Starting Burp Suite and extension failed or timed " \
"out. Is extension output set as stdout? Command line " \
"was: %s" % context.burp_cmdline
if proxy_message.get("running") != 1:
kill_subprocess(burpprocess)
assert False, "Burp Suite extension responded with an unrecognised JSON message"
# In some cases, it takes some time for the proxy listener to actually
# have an open port; I have been unable to pin down a specific time
# so we just wait a bit.
time.sleep(5)
return burpprocess
return burpprocess
26 changes: 20 additions & 6 deletions mittn/headlessscanner/steps.py
Expand Up @@ -25,14 +25,20 @@

@given(u'a baseline database for scanner findings')
def step_impl(context):
"""Test that we can connect to a database. As a side effect, open_database(9 also creates the necessary table(s) that are required."""
"""Test that we can connect to a database.
As a side effect, open_database(9 also creates the necessary table(s)
that are required.
"""
if hasattr(context, 'dburl') is False:
assert False, "Database URI not specified"
dbconn = scandb.open_database(context)
if dbconn is None:
assert False, "Cannot open database %s" % context.dburl
dbconn.close()


@given(u'a working Burp Suite installation')
def step_impl(context):
"""Test that we have a correctly installed Burp Suite and the scanner driver available"""
Expand All @@ -54,7 +60,9 @@ def step_impl(context):
proxy_message = read_next_json(burpprocess)
if proxy_message is None:
kill_subprocess(burpprocess)
assert False, "Timed out communicating to headless-scanner-driver extension over %s. Is something else running there?" % context.burp_proxy_address
assert False, "Timed out communicating to headless-scanner-driver " \
"extension over %s. Is something else running there?" \
% context.burp_proxy_address

# Shut down Burp Suite. Again, see the scanner driver plugin docs for further info.

Expand Down Expand Up @@ -117,18 +125,23 @@ def step_impl(context, timeout):
# Go through scan item statuses statuses
if proxy_message is None: # Extension did not respond
kill_subprocess(burpprocess)
assert False, "Timed out retrieving scan status information from Burp Suite over %s" % context.burp_proxy_address
assert False, "Timed out retrieving scan status information from " \
"Burp Suite over %s" % context.burp_proxy_address
finished = True
if proxy_message == []: # No scan items were started by extension
kill_subprocess(burpprocess)
assert False, "No scan items were started by Burp. Check web test case and suite scope."
for status in proxy_message:
if not re_finished.match(status):
finished = False
if hasattr(context, 'fail_on_abandoned_scans'): # In some test setups, abandoned scans are failures, and this has been set
# In some test setups, abandoned scans are failures, and this has been set
if hasattr(context, 'fail_on_abandoned_scans'):
if re_abandoned.match(status):
kill_subprocess(burpprocess)
assert False, "Burp Suite reports an abandoned scan, but you wanted all scans to succeed. DNS problem or non-Target Scope hosts targeted in a test scenario?"
assert False, "Burp Suite reports an abandoned scan, " \
"but you wanted all scans to succeed. DNS " \
"problem or non-Target Scope hosts " \
"targeted in a test scenario?"
if finished is True: # All scan statuses were in state "finished"
break
if (time.time() - scan_start_time) > (timeout * 60):
Expand Down Expand Up @@ -185,5 +198,6 @@ def step_impl(context):
unprocessed_items = scandb.number_of_new_in_database(context)

if unprocessed_items > 0:
assert False, "Unprocessed findings in database. %s new issue(s), total %s issue(s)." % (new_items, unprocessed_items)
assert False, "Unprocessed findings in database. %s new issue(s), " \
"total %s issue(s)." % (new_items, unprocessed_items)
assert True
37 changes: 19 additions & 18 deletions mittn/headlessscanner/test_dbtools.py
Expand Up @@ -57,23 +57,25 @@ def test_add_false_positive(self):
db_engine = sqlalchemy.create_engine(self.context.dburl)
dbconn = db_engine.connect()
db_metadata = sqlalchemy.MetaData()
headlessscanner_issues = Table('headlessscanner_issues',
db_metadata,
Column('new_issue', types.Boolean),
Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement
Column('timestamp', types.DateTime(timezone=True)),
Column('test_runner_host', types.Text),
Column('scenario_id', types.Text),
Column('url', types.Text),
Column('severity', types.Text),
Column('issuetype', types.Text),
Column('issuename', types.Text),
Column('issuedetail', types.Text),
Column('confidence', types.Text),
Column('host', types.Text),
Column('port', types.Text),
Column('protocol', types.Text),
Column('messages', types.LargeBinary))
headlessscanner_issues = Table(
'headlessscanner_issues',
db_metadata,
Column('new_issue', types.Boolean),
Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement
Column('timestamp', types.DateTime(timezone=True)),
Column('test_runner_host', types.Text),
Column('scenario_id', types.Text),
Column('url', types.Text),
Column('severity', types.Text),
Column('issuetype', types.Text),
Column('issuename', types.Text),
Column('issuedetail', types.Text),
Column('confidence', types.Text),
Column('host', types.Text),
Column('port', types.Text),
Column('protocol', types.Text),
Column('messages', types.LargeBinary)
)
db_select = sqlalchemy.sql.select([headlessscanner_issues])
db_result = dbconn.execute(db_select)
result = db_result.fetchone()
Expand Down Expand Up @@ -166,7 +168,6 @@ def test_false_positive_detection(self):
issue),
True, "A duplicate case not detected")


def tearDown(self):
try:
os.unlink(self.db_file)
Expand Down
4 changes: 2 additions & 2 deletions mittn/httpfuzzer/dbtools.py
Expand Up @@ -102,8 +102,8 @@ def known_false_positive(context, response):
context.httpfuzzer_issues.c.scenario_id == response['scenario_id'], # Text
context.httpfuzzer_issues.c.server_protocol_error == response['server_protocol_error'], # Text
context.httpfuzzer_issues.c.resp_statuscode == str(response['resp_statuscode']), # Text
context.httpfuzzer_issues.c.server_timeout == response['server_timeout'], # Boolean
context.httpfuzzer_issues.c.server_error_text_detected == response['server_error_text_detected'])) # Boolean
context.httpfuzzer_issues.c.server_timeout == response['server_timeout'], # Bool
context.httpfuzzer_issues.c.server_error_text_detected == response['server_error_text_detected'])) # Bool

db_result = dbconn.execute(db_select)

Expand Down
4 changes: 2 additions & 2 deletions mittn/httpfuzzer/dictwalker.py
Expand Up @@ -100,8 +100,8 @@ def dictwalk(branch, anomaly_dict, anomaly_key=None):
fuzzed_branch.append(fuzzdict)
return fuzzed_branch
# A leaf node; return just a list of anomalies for a value
if isinstance(branch, (int, str, unicode, float)) or branch in (
True, False, None):
if isinstance(branch, (int, str, unicode, float)) or \
branch in (True, False, None):
# Get the anomaly to be injected from the anomaly_dict.
anomaly = anomaly_dict.get(anomaly_key)
if anomaly is None:
Expand Down
18 changes: 9 additions & 9 deletions mittn/httpfuzzer/injector.py
Expand Up @@ -53,13 +53,13 @@ def inject(context, injection_list):
context.proxy_address = None

responses += send_http(context, form_string,
timeout=context.timeout,
proxy=context.proxy_address,
method=method,
content_type=context.content_type,
scenario_id=context.scenario_id,
auth=authenticate(context,
context.authentication_id))
timeout=context.timeout,
proxy=context.proxy_address,
method=method,
content_type=context.content_type,
scenario_id=context.scenario_id,
auth=authenticate(context,
context.authentication_id))

# Here, I'd really like to send out unencoded (invalid)
# JSON too, but the json library barfs too easily, so
Expand All @@ -84,9 +84,9 @@ def test_valid_submission(context, injected_submission=None):
:param injected_submission: Request body to be sent
"""

# to avoid errors re/ uninitialized object
# to avoid errors re/ uninitialized object
proxydict = {}

if injected_submission is None:
injected_submission = "(None)" # For user readability only

Expand Down
6 changes: 3 additions & 3 deletions mittn/httpfuzzer/number_ranges.py
Expand Up @@ -22,8 +22,8 @@ def unpack_integer_range(integerrange):
assert False, "Number range %s in the feature file is invalid. Must " \
"contain just numbers, commas, and hyphens" % integerrange
integerrange.replace(" ", "")
rangeparts = integerrange.split(',') # One or more integer ranges
# separated by commas
rangeparts = integerrange.split(',') # One+ comma-separated int ranges

for rangepart in rangeparts:
rangemaxmin = rangepart.split('-') # Range is defined with a hyphen
if len(rangemaxmin) == 1: # This was a single value
Expand All @@ -50,4 +50,4 @@ def unpack_integer_range(integerrange):
assert False, "Number range %s in the feature file is invalid. " \
"Incorrect range specifier" % \
integerrange
return sorted(integers)
return sorted(integers)
1 change: 1 addition & 0 deletions mittn/httpfuzzer/static_anomalies.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
# pep8: disable=E501
# pylint: disable=line-too-long
"""List of static anomalies that can be injected. Before using,
replace mittn.org domain references with something you have control over.
Expand Down
12 changes: 9 additions & 3 deletions mittn/httpfuzzer/steps.py
Expand Up @@ -20,14 +20,20 @@

@given(u'a baseline database for injection findings')
def step_impl(context):
"""Test that we can connect to a database. As a side effect, open_database(9 also creates the necessary table(s) that are required."""
"""Test that we can connect to a database.
As a side effect, open_database(9 also creates the necessary table(s) that
are required.
"""
if hasattr(context, 'dburl') is False:
assert False, "Database URI not specified"
dbconn = fuzzdb.open_database(context)
if dbconn is None:
assert False, "Cannot open database %s" % context.dburl
dbconn.close()


@given(u'an authentication flow id "{auth_id}"')
def step_impl(context, auth_id):
"""Store the authentication flow identifier. Tests in the feature file
Expand Down Expand Up @@ -348,6 +354,6 @@ def step_impl(context):
assert False, "%s new findings were found." % context.new_findings
old_findings = fuzzdb.number_of_new_in_database(context)
if old_findings > 0:
assert False, "No new findings found, but %s unprocessed findings from past runs found in database." % old_findings
assert False, "No new findings found, but %s unprocessed findings " \
"from past runs found in database." % old_findings
assert True

1 change: 0 additions & 1 deletion mittn/httpfuzzer/test_dbtools.py
Expand Up @@ -180,7 +180,6 @@ def test_false_positive_detection(self):
response),
True, "A duplicate case not detected")


def tearDown(self):
try:
os.unlink(self.db_file)
Expand Down
4 changes: 3 additions & 1 deletion mittn/tlschecker/steps.py
Expand Up @@ -102,7 +102,7 @@ def step_impl(context, groupsize):
assert False, "No stored TLS connection result set was found."
keyexchange = root.find(".//keyExchange")
if keyexchange is None:
# Kudos bro!
# Kudos bro!
return
keytype = keyexchange.get('Type')
realgroupsize = keyexchange.get('GroupSize')
Expand Down Expand Up @@ -302,6 +302,7 @@ def step_impl(context):
assert hsts.get('isSupported') == 'True', \
"HTTP Strict Transport Security header not observed"


@step(u'server has no Heartbleed vulnerability')
def step_impl(context):
try:
Expand All @@ -312,6 +313,7 @@ def step_impl(context):
assert heartbleed.get('isVulnerable') == 'False', \
"Server is vulnerable for Heartbleed"


@step(u'certificate does not use SHA-1')
def step_impl(context):
try:
Expand Down

0 comments on commit 0a934a4

Please sign in to comment.