diff --git a/dev-requirements.txt b/dev-requirements.txt index 44c477d..e929af6 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1 +1,2 @@ setuptools-scm >= 1.11.0 +pep8 >= 1.7.0 diff --git a/features/authenticate.py.template b/features/authenticate.py.template index 320cd18..d021269 100644 --- a/features/authenticate.py.template +++ b/features/authenticate.py.template @@ -15,7 +15,7 @@ def authenticate(context, auth_flow_id=None, acquire_new_authenticator=False): flows, if the tests need different ones. """ - #### + #### # This is the place which you need to modify if your target # requires authorisation. (If you don't, you can just copy this # template as features/authenticate.py and you should be set.) @@ -50,7 +50,7 @@ def authenticate(context, auth_flow_id=None, acquire_new_authenticator=False): # implement it. If you use the auth methods Requests provides, do # not return the CustomAuth object - instead, return one of the # Requests library built-in authentication objects (see the doc - # link above). + # link above). ### # Reuse an existing authenticator unless explicitly requested to diff --git a/mittn/headlessscanner/dbtools.py b/mittn/headlessscanner/dbtools.py index 1207683..1cc8895 100644 --- a/mittn/headlessscanner/dbtools.py +++ b/mittn/headlessscanner/dbtools.py @@ -36,23 +36,25 @@ def open_database(context): # be big. db_metadata = MetaData() db_metadata.bind = db_engine - context.headlessscanner_issues = Table('headlessscanner_issues', - db_metadata, - Column('new_issue', types.Boolean), - Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement - Column('timestamp', types.DateTime(timezone=True)), - Column('test_runner_host', types.Text), - Column('scenario_id', types.Text), - Column('url', types.Text), - Column('severity', types.Text), - Column('issuetype', types.Text), - Column('issuename', types.Text), - Column('issuedetail', types.Text), - Column('confidence', types.Text), - Column('host', types.Text), - Column('port', types.Text), - Column('protocol', types.Text), - Column('messages', types.LargeBinary)) + context.headlessscanner_issues = Table( + 'headlessscanner_issues', + db_metadata, + Column('new_issue', types.Boolean), + Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement + Column('timestamp', types.DateTime(timezone=True)), + Column('test_runner_host', types.Text), + Column('scenario_id', types.Text), + Column('url', types.Text), + Column('severity', types.Text), + Column('issuetype', types.Text), + Column('issuename', types.Text), + Column('issuedetail', types.Text), + Column('confidence', types.Text), + Column('host', types.Text), + Column('port', types.Text), + Column('protocol', types.Text), + Column('messages', types.LargeBinary) + ) # Create the table if it doesn't exist # and otherwise no effect diff --git a/mittn/headlessscanner/proxy_comms.py b/mittn/headlessscanner/proxy_comms.py index 9b3c2a5..3ac2395 100644 --- a/mittn/headlessscanner/proxy_comms.py +++ b/mittn/headlessscanner/proxy_comms.py @@ -53,7 +53,9 @@ def start_burp(context): proxy_message = read_next_json(burpprocess) if proxy_message is None: kill_subprocess(burpprocess) - assert False, "Starting Burp Suite and extension failed or timed out. Is extension output set as stdout? Command line was: %s" % context.burp_cmdline + assert False, "Starting Burp Suite and extension failed or timed " \ + "out. Is extension output set as stdout? Command line " \ + "was: %s" % context.burp_cmdline if proxy_message.get("running") != 1: kill_subprocess(burpprocess) assert False, "Burp Suite extension responded with an unrecognised JSON message" @@ -61,4 +63,4 @@ def start_burp(context): # have an open port; I have been unable to pin down a specific time # so we just wait a bit. time.sleep(5) - return burpprocess \ No newline at end of file + return burpprocess diff --git a/mittn/headlessscanner/steps.py b/mittn/headlessscanner/steps.py index db9d37b..f7e964c 100644 --- a/mittn/headlessscanner/steps.py +++ b/mittn/headlessscanner/steps.py @@ -25,7 +25,12 @@ @given(u'a baseline database for scanner findings') def step_impl(context): - """Test that we can connect to a database. As a side effect, open_database(9 also creates the necessary table(s) that are required.""" + """Test that we can connect to a database. + + As a side effect, open_database(9 also creates the necessary table(s) + that are required. + + """ if hasattr(context, 'dburl') is False: assert False, "Database URI not specified" dbconn = scandb.open_database(context) @@ -33,6 +38,7 @@ def step_impl(context): assert False, "Cannot open database %s" % context.dburl dbconn.close() + @given(u'a working Burp Suite installation') def step_impl(context): """Test that we have a correctly installed Burp Suite and the scanner driver available""" @@ -54,7 +60,9 @@ def step_impl(context): proxy_message = read_next_json(burpprocess) if proxy_message is None: kill_subprocess(burpprocess) - assert False, "Timed out communicating to headless-scanner-driver extension over %s. Is something else running there?" % context.burp_proxy_address + assert False, "Timed out communicating to headless-scanner-driver " \ + "extension over %s. Is something else running there?" \ + % context.burp_proxy_address # Shut down Burp Suite. Again, see the scanner driver plugin docs for further info. @@ -117,7 +125,8 @@ def step_impl(context, timeout): # Go through scan item statuses statuses if proxy_message is None: # Extension did not respond kill_subprocess(burpprocess) - assert False, "Timed out retrieving scan status information from Burp Suite over %s" % context.burp_proxy_address + assert False, "Timed out retrieving scan status information from " \ + "Burp Suite over %s" % context.burp_proxy_address finished = True if proxy_message == []: # No scan items were started by extension kill_subprocess(burpprocess) @@ -125,10 +134,14 @@ def step_impl(context, timeout): for status in proxy_message: if not re_finished.match(status): finished = False - if hasattr(context, 'fail_on_abandoned_scans'): # In some test setups, abandoned scans are failures, and this has been set + # In some test setups, abandoned scans are failures, and this has been set + if hasattr(context, 'fail_on_abandoned_scans'): if re_abandoned.match(status): kill_subprocess(burpprocess) - assert False, "Burp Suite reports an abandoned scan, but you wanted all scans to succeed. DNS problem or non-Target Scope hosts targeted in a test scenario?" + assert False, "Burp Suite reports an abandoned scan, " \ + "but you wanted all scans to succeed. DNS " \ + "problem or non-Target Scope hosts " \ + "targeted in a test scenario?" if finished is True: # All scan statuses were in state "finished" break if (time.time() - scan_start_time) > (timeout * 60): @@ -185,5 +198,6 @@ def step_impl(context): unprocessed_items = scandb.number_of_new_in_database(context) if unprocessed_items > 0: - assert False, "Unprocessed findings in database. %s new issue(s), total %s issue(s)." % (new_items, unprocessed_items) + assert False, "Unprocessed findings in database. %s new issue(s), " \ + "total %s issue(s)." % (new_items, unprocessed_items) assert True diff --git a/mittn/headlessscanner/test_dbtools.py b/mittn/headlessscanner/test_dbtools.py index f8e9a18..d5a3f6b 100644 --- a/mittn/headlessscanner/test_dbtools.py +++ b/mittn/headlessscanner/test_dbtools.py @@ -57,23 +57,25 @@ def test_add_false_positive(self): db_engine = sqlalchemy.create_engine(self.context.dburl) dbconn = db_engine.connect() db_metadata = sqlalchemy.MetaData() - headlessscanner_issues = Table('headlessscanner_issues', - db_metadata, - Column('new_issue', types.Boolean), - Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement - Column('timestamp', types.DateTime(timezone=True)), - Column('test_runner_host', types.Text), - Column('scenario_id', types.Text), - Column('url', types.Text), - Column('severity', types.Text), - Column('issuetype', types.Text), - Column('issuename', types.Text), - Column('issuedetail', types.Text), - Column('confidence', types.Text), - Column('host', types.Text), - Column('port', types.Text), - Column('protocol', types.Text), - Column('messages', types.LargeBinary)) + headlessscanner_issues = Table( + 'headlessscanner_issues', + db_metadata, + Column('new_issue', types.Boolean), + Column('issue_no', types.Integer, primary_key=True, nullable=False), # Implicit autoincrement + Column('timestamp', types.DateTime(timezone=True)), + Column('test_runner_host', types.Text), + Column('scenario_id', types.Text), + Column('url', types.Text), + Column('severity', types.Text), + Column('issuetype', types.Text), + Column('issuename', types.Text), + Column('issuedetail', types.Text), + Column('confidence', types.Text), + Column('host', types.Text), + Column('port', types.Text), + Column('protocol', types.Text), + Column('messages', types.LargeBinary) + ) db_select = sqlalchemy.sql.select([headlessscanner_issues]) db_result = dbconn.execute(db_select) result = db_result.fetchone() @@ -166,7 +168,6 @@ def test_false_positive_detection(self): issue), True, "A duplicate case not detected") - def tearDown(self): try: os.unlink(self.db_file) diff --git a/mittn/httpfuzzer/dbtools.py b/mittn/httpfuzzer/dbtools.py index 846f9d3..d4972f1 100644 --- a/mittn/httpfuzzer/dbtools.py +++ b/mittn/httpfuzzer/dbtools.py @@ -102,8 +102,8 @@ def known_false_positive(context, response): context.httpfuzzer_issues.c.scenario_id == response['scenario_id'], # Text context.httpfuzzer_issues.c.server_protocol_error == response['server_protocol_error'], # Text context.httpfuzzer_issues.c.resp_statuscode == str(response['resp_statuscode']), # Text - context.httpfuzzer_issues.c.server_timeout == response['server_timeout'], # Boolean - context.httpfuzzer_issues.c.server_error_text_detected == response['server_error_text_detected'])) # Boolean + context.httpfuzzer_issues.c.server_timeout == response['server_timeout'], # Bool + context.httpfuzzer_issues.c.server_error_text_detected == response['server_error_text_detected'])) # Bool db_result = dbconn.execute(db_select) diff --git a/mittn/httpfuzzer/dictwalker.py b/mittn/httpfuzzer/dictwalker.py index f0cb106..0a36b34 100644 --- a/mittn/httpfuzzer/dictwalker.py +++ b/mittn/httpfuzzer/dictwalker.py @@ -100,8 +100,8 @@ def dictwalk(branch, anomaly_dict, anomaly_key=None): fuzzed_branch.append(fuzzdict) return fuzzed_branch # A leaf node; return just a list of anomalies for a value - if isinstance(branch, (int, str, unicode, float)) or branch in ( - True, False, None): + if isinstance(branch, (int, str, unicode, float)) or \ + branch in (True, False, None): # Get the anomaly to be injected from the anomaly_dict. anomaly = anomaly_dict.get(anomaly_key) if anomaly is None: diff --git a/mittn/httpfuzzer/injector.py b/mittn/httpfuzzer/injector.py index ebad986..c9b047f 100644 --- a/mittn/httpfuzzer/injector.py +++ b/mittn/httpfuzzer/injector.py @@ -53,13 +53,13 @@ def inject(context, injection_list): context.proxy_address = None responses += send_http(context, form_string, - timeout=context.timeout, - proxy=context.proxy_address, - method=method, - content_type=context.content_type, - scenario_id=context.scenario_id, - auth=authenticate(context, - context.authentication_id)) + timeout=context.timeout, + proxy=context.proxy_address, + method=method, + content_type=context.content_type, + scenario_id=context.scenario_id, + auth=authenticate(context, + context.authentication_id)) # Here, I'd really like to send out unencoded (invalid) # JSON too, but the json library barfs too easily, so @@ -84,9 +84,9 @@ def test_valid_submission(context, injected_submission=None): :param injected_submission: Request body to be sent """ - # to avoid errors re/ uninitialized object + # to avoid errors re/ uninitialized object proxydict = {} - + if injected_submission is None: injected_submission = "(None)" # For user readability only diff --git a/mittn/httpfuzzer/number_ranges.py b/mittn/httpfuzzer/number_ranges.py index d905db1..f27bd4f 100644 --- a/mittn/httpfuzzer/number_ranges.py +++ b/mittn/httpfuzzer/number_ranges.py @@ -22,8 +22,8 @@ def unpack_integer_range(integerrange): assert False, "Number range %s in the feature file is invalid. Must " \ "contain just numbers, commas, and hyphens" % integerrange integerrange.replace(" ", "") - rangeparts = integerrange.split(',') # One or more integer ranges - # separated by commas + rangeparts = integerrange.split(',') # One+ comma-separated int ranges + for rangepart in rangeparts: rangemaxmin = rangepart.split('-') # Range is defined with a hyphen if len(rangemaxmin) == 1: # This was a single value @@ -50,4 +50,4 @@ def unpack_integer_range(integerrange): assert False, "Number range %s in the feature file is invalid. " \ "Incorrect range specifier" % \ integerrange - return sorted(integers) \ No newline at end of file + return sorted(integers) diff --git a/mittn/httpfuzzer/static_anomalies.py b/mittn/httpfuzzer/static_anomalies.py index 90d679b..99db598 100644 --- a/mittn/httpfuzzer/static_anomalies.py +++ b/mittn/httpfuzzer/static_anomalies.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# pep8: disable=E501 # pylint: disable=line-too-long """List of static anomalies that can be injected. Before using, replace mittn.org domain references with something you have control over. diff --git a/mittn/httpfuzzer/steps.py b/mittn/httpfuzzer/steps.py index 2a78ef1..7442067 100644 --- a/mittn/httpfuzzer/steps.py +++ b/mittn/httpfuzzer/steps.py @@ -20,7 +20,12 @@ @given(u'a baseline database for injection findings') def step_impl(context): - """Test that we can connect to a database. As a side effect, open_database(9 also creates the necessary table(s) that are required.""" + """Test that we can connect to a database. + + As a side effect, open_database(9 also creates the necessary table(s) that + are required. + + """ if hasattr(context, 'dburl') is False: assert False, "Database URI not specified" dbconn = fuzzdb.open_database(context) @@ -28,6 +33,7 @@ def step_impl(context): assert False, "Cannot open database %s" % context.dburl dbconn.close() + @given(u'an authentication flow id "{auth_id}"') def step_impl(context, auth_id): """Store the authentication flow identifier. Tests in the feature file @@ -348,6 +354,6 @@ def step_impl(context): assert False, "%s new findings were found." % context.new_findings old_findings = fuzzdb.number_of_new_in_database(context) if old_findings > 0: - assert False, "No new findings found, but %s unprocessed findings from past runs found in database." % old_findings + assert False, "No new findings found, but %s unprocessed findings " \ + "from past runs found in database." % old_findings assert True - diff --git a/mittn/httpfuzzer/test_dbtools.py b/mittn/httpfuzzer/test_dbtools.py index 33399f1..e75091c 100644 --- a/mittn/httpfuzzer/test_dbtools.py +++ b/mittn/httpfuzzer/test_dbtools.py @@ -180,7 +180,6 @@ def test_false_positive_detection(self): response), True, "A duplicate case not detected") - def tearDown(self): try: os.unlink(self.db_file) diff --git a/mittn/tlschecker/steps.py b/mittn/tlschecker/steps.py index a30f4af..9fec04e 100644 --- a/mittn/tlschecker/steps.py +++ b/mittn/tlschecker/steps.py @@ -102,7 +102,7 @@ def step_impl(context, groupsize): assert False, "No stored TLS connection result set was found." keyexchange = root.find(".//keyExchange") if keyexchange is None: - # Kudos bro! + # Kudos bro! return keytype = keyexchange.get('Type') realgroupsize = keyexchange.get('GroupSize') @@ -302,6 +302,7 @@ def step_impl(context): assert hsts.get('isSupported') == 'True', \ "HTTP Strict Transport Security header not observed" + @step(u'server has no Heartbleed vulnerability') def step_impl(context): try: @@ -312,6 +313,7 @@ def step_impl(context): assert heartbleed.get('isVulnerable') == 'False', \ "Server is vulnerable for Heartbleed" + @step(u'certificate does not use SHA-1') def step_impl(context): try: