Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't force fuzzing auth headers for API endpoints #17358

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
27 changes: 27 additions & 0 deletions w3af/core/data/fuzzer/mutants/headers_mutant.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@

"""
from w3af.core.data.fuzzer.mutants.mutant import Mutant
import w3af.core.data.kb.knowledge_base as kb


class HeadersMutant(Mutant):

# TODO describe
fuzzed_auth_headers = set()

"""
This class is a headers mutant.
"""
Expand Down Expand Up @@ -55,6 +60,8 @@ def create_mutants(cls, freq, mutant_str_list, fuzzable_param_list,
"""
fuzzable_headers = fuzzer_config['fuzzable_headers'] + freq.get_force_fuzzing_headers()

fuzzable_headers = HeadersMutant._exclude_fuzzed_auth_headers(fuzzable_headers)

if not fuzzable_headers:
return []

Expand All @@ -64,3 +71,23 @@ def create_mutants(cls, freq, mutant_str_list, fuzzable_param_list,
return cls._create_mutants_worker(freq, cls, mutant_str_list,
fuzzable_param_list,
append, fuzzer_config)

@classmethod
def _exclude_fuzzed_auth_headers(cls, fuzzable_headers):
"""
TODO
:param fuzzable_headers:
:return:
"""
auth_headers = kb.kb.raw_read('http_data', 'auth_headers')
updated_fuzzable_headers = []
for fuzzable_header in fuzzable_headers:
if fuzzable_header in auth_headers:
if fuzzable_header in cls.fuzzed_auth_headers:
continue

cls.fuzzed_auth_headers.add(fuzzable_header)

updated_fuzzable_headers.append(fuzzable_header)

return updated_fuzzable_headers
59 changes: 57 additions & 2 deletions w3af/core/data/parsers/doc/open_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

from yaml import load

import w3af.core.data.kb.knowledge_base as kb

try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
Expand Down Expand Up @@ -149,7 +151,7 @@ def parse(self):
for data in specification_handler.get_api_information():
try:
request_factory = RequestFactory(*data)
fuzzable_request = request_factory.get_fuzzable_request(self.discover_fuzzable_headers)
fuzzable_request = request_factory.get_fuzzable_request()
except Exception, e:
#
# This is a strange situation because parsing of the OpenAPI
Expand Down Expand Up @@ -178,9 +180,62 @@ def parse(self):
if not self._should_audit(fuzzable_request):
continue

operation = data[4]
headers = self._get_parameter_headers(operation)
if self.discover_fuzzable_headers:
fuzzable_request.set_force_fuzzing_headers(headers)

# TODO move it to a separate method
# TODO should it expect multiple specs?
auth_headers = kb.kb.raw_read('http_data', 'auth_headers')
if not isinstance(auth_headers, set):
auth_headers = set()

for header in headers:
if OpenAPI._is_auth_header(header, operation.swagger_spec):
auth_headers.add(header)

kb.kb.raw_write('http_data', 'auth_headers', auth_headers)

self.api_calls.append(fuzzable_request)

def _should_audit(self, fuzzable_request):
@staticmethod
def _get_parameter_headers(operation):
"""
Looks for all parameters which are passed to the endpoint via headers.

:param operation: An instance of Operation class
which represents the API endpoint.
:return: A list of unique header names.
"""
parameter_headers = set()
for parameter_name, parameter in operation.params.iteritems():
if parameter.location == 'header':
parameter_headers.add(parameter.name)
om.out.debug('Found a parameter header for %s endpoint: %s'
% (operation.path_name, parameter.name))

return list(parameter_headers)

@staticmethod
def _is_auth_header(name, spec):
"""
TODO
:param name: Header name.
:param spec: API specification.
:return: True if this is an auth header, False otherwise.
"""
for key, auth in spec.security_definitions.iteritems():
if not hasattr(auth, 'location') or not hasattr(auth, 'name'):
continue

if auth.location == 'header' and auth.name == name:
return True

return False

@staticmethod
def _should_audit(fuzzable_request):
"""
We want to make sure that w3af doesn't delete all the items from the
REST API, so we ignore DELETE calls.
Expand Down
34 changes: 5 additions & 29 deletions w3af/core/data/parsers/doc/open_api/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,45 +56,21 @@ def __init__(self, spec, api_resource_name, resource, operation_name,
self.operation = operation
self.parameters = parameters

def get_fuzzable_request(self, discover_fuzzable_headers=False):
def get_fuzzable_request(self):
"""
Creates a fuzzable request by querying different parts of the spec
parameters, operation, etc.

:param discover_fuzzable_headers: If it's set to true,
then all fuzzable headers will be added to the fuzzable request.
:return: A fuzzable request.
"""
method = self.get_method()
uri = self.get_uri()
headers = self.get_headers()
data_container = self.get_data_container(headers)

fuzzable_request = FuzzableRequest(uri,
headers=headers,
post_data=data_container,
method=method)

if discover_fuzzable_headers:
fuzzable_request.set_force_fuzzing_headers(self._get_parameter_headers())

return fuzzable_request

def _get_parameter_headers(self):
"""
Looks for all parameters which are passed to the endpoint via headers.

:return: A list of unique header names.
"""
parameter_headers = set()
for parameter_name in self.parameters:
parameter = self.parameters[parameter_name]
if parameter.location == 'header':
parameter_headers.add(parameter.name)
om.out.debug('Found a parameter header for %s endpoint: %s'
% (self.operation.path_name, parameter.name))

return list(parameter_headers)
return FuzzableRequest(uri,
headers=headers,
post_data=data_container,
method=method)

def _bravado_construct_request(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def get_specification(self):
return file('%s/data/multiple_paths_and_headers.json' % CURRENT_PATH).read()


class PetstoreModel(object):

@staticmethod
def get_specification():
return file('%s/data/swagger.json' % CURRENT_PATH).read()


class PetstoreSimpleModel(object):

@staticmethod
Expand Down
5 changes: 4 additions & 1 deletion w3af/core/data/parsers/doc/open_api/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from w3af.core.data.parsers.doc.open_api import OpenAPI
from w3af.core.data.url.HTTPResponse import HTTPResponse

from w3af.core.data.parsers.doc.open_api.tests.example_specifications import PetstoreModel


# Order them to be able to easily assert things
def by_path(fra, frb):
Expand Down Expand Up @@ -434,7 +436,8 @@ def test_is_valid_json_or_yaml_false(self):
http_resp = self.generate_response('"', 'image/jpeg')
self.assertFalse(OpenAPI.is_valid_json_or_yaml(http_resp))

def generate_response(self, specification_as_string, content_type='application/json'):
@staticmethod
def generate_response(specification_as_string, content_type='application/json'):
url = URL('http://www.w3af.com/swagger.json')
headers = Headers([('content-type', content_type)])
return HTTPResponse(200, specification_as_string, headers,
Expand Down
5 changes: 4 additions & 1 deletion w3af/core/data/parsers/doc/open_api/tests/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@


class TestRequests(unittest.TestCase):
def generate_response(self, specification_as_string):

@staticmethod
def generate_response(specification_as_string):
url = URL('http://www.w3af.com/swagger.json')
headers = Headers([('content-type', 'application/json')])
return HTTPResponse(200, specification_as_string, headers,
Expand Down Expand Up @@ -380,3 +382,4 @@ def test_dereferenced_pet_store(self):
self.assertEqual(fuzzable_request.get_uri().url_string, e_url)
self.assertEqual(fuzzable_request.get_headers(), e_headers)
self.assertEqual(fuzzable_request.get_data(), e_data)

85 changes: 79 additions & 6 deletions w3af/plugins/tests/crawl/test_open_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
from w3af.core.data.dc.headers import Headers
from w3af.core.data.parsers.doc.open_api.tests.example_specifications import (IntParamQueryString,
NestedModel,
PetstoreModel,
PetstoreSimpleModel)


def by_path(fra, frb):
return cmp(fra.get_url().url_string, frb.get_url().url_string)


class TestOpenAPIFindAllEndpointsWithAuth(PluginTest):

target_url = 'http://w3af.org/'
Expand Down Expand Up @@ -74,9 +79,6 @@ def test_find_all_endpoints_with_auth(self):
fuzzable_requests = [f for f in fuzzable_requests if f.get_url().get_path() not in ('/swagger.json', '/')]

# Order them to be able to easily assert things
def by_path(fra, frb):
return cmp(fra.get_url().url_string, frb.get_url().url_string)

fuzzable_requests.sort(by_path)

#
Expand Down Expand Up @@ -127,6 +129,7 @@ class TestOpenAPINestedModelSpec(PluginTest):
}

class SQLIMockResponse(MockResponse):

def get_response(self, http_request, uri, response_headers):
basic = http_request.headers.get('Basic', '')
if basic != TestOpenAPINestedModelSpec.BEARER:
Expand Down Expand Up @@ -178,9 +181,6 @@ def test_find_all_endpoints_with_auth(self):
fuzzable_requests = [f for f in fuzzable_requests if f.get_url().get_path() not in ('/openapi.json', '/')]

# Order them to be able to easily assert things
def by_path(fra, frb):
return cmp(fra.get_url().url_string, frb.get_url().url_string)

fuzzable_requests.sort(by_path)

self.assertEqual(len(fuzzable_requests), 1)
Expand All @@ -204,6 +204,79 @@ def by_path(fra, frb):
self.assertEqual(len(vulns), 2)


class TestOpenAPIFuzzAuthHeaders(PluginTest):

api_key = 'zzz'
target_url = 'http://petstore.swagger.io/'

_run_configs = {
'cfg': {
'target': target_url,
'plugins': {'crawl': (PluginConfig('open_api',

('header_auth',
'api_key: %s' % api_key,
PluginConfig.HEADER),

),),
'audit': (PluginConfig('sqli'),)}
}
}

class SQLIMockResponse(MockResponse):

def get_response(self, http_request, uri, response_headers):
api_key = http_request.headers.get('api_key', '')

for payload in sqli.SQLI_STRINGS:
if payload in api_key:
return self.status, response_headers, 'PostgreSQL query failed:'

if api_key != TestOpenAPIFuzzAuthHeaders.api_key:
return 401, response_headers, ''

return self.status, response_headers, 'Sunny outside'

MOCK_RESPONSES = [MockResponse('http://petstore.swagger.io/openapi.json',
PetstoreModel().get_specification(),
content_type='application/json'),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='GET',
status=200),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='POST',
status=200),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='PUT',
status=200)
]

def test_fuzz_auth_header_only_once(self):
cfg = self._run_configs['cfg']
self._scan(cfg['target'], cfg['plugins'])

#
# Since we configured authentication we should only get one of the Info
#
infos = self.kb.get('open_api', 'open_api')
self.assertEqual(len(infos), 1, infos)

info_i = infos[0]
self.assertEqual(info_i.get_name(), 'Open API specification found')

vulns = self.kb.get('sqli', 'sqli')
self.assertEqual(len(vulns), 1)

vuln = vulns[0]
self.assertEquals('SQL injection', vuln.get_name())


class TestOpenAPIRaisesWarningIfNoAuth(PluginTest):
target_url = 'http://w3af.org/'

Expand Down