Skip to content

Commit

Permalink
some code quality improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
KissPeter committed Feb 9, 2020
1 parent ead9cfd commit 6d2023a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
56 changes: 32 additions & 24 deletions apifuzzer/fuzzer_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kitty.targets.server import ServerTarget

from apifuzzer.apifuzzer_report import Apifuzzer_Report as Report
from apifuzzer.utils import set_class_logger, try_b64encode
from apifuzzer.utils import set_class_logger, try_b64encode, container_name_to_param


class Return():
Expand All @@ -33,6 +33,8 @@ def __init__(self, name, base_url, report_dir, auth_headers, logger):
self.logger = logger
self.logger.info('Logger initialized')
self.resp_headers = dict()
self.chop_left = True
self.chop_right = True

def pre_test(self, test_num):
"""
Expand Down Expand Up @@ -61,7 +63,7 @@ def compile_headers(self, fuzz_header=None):
)
if isinstance(fuzz_header, dict):
for k, v in fuzz_header.items():
fuzz_header_name = k.split('|')[-1]
fuzz_header_name = container_name_to_param(k)
self.logger.debug('Adding fuzz header: {}->{}'.format(fuzz_header_name, v))
_header[fuzz_header_name] = v
if isinstance(self.auth_headers, list):
Expand Down Expand Up @@ -109,22 +111,23 @@ def format_pycurl_query_param(self, url, query_params):
_dummy_curl = pycurl.Curl()
_tmp_query_params = dict()
for k, v in query_params.items():
original_value = v
iteration = 0
self.chop_left = True
self.chop_right = True
while True:
iteration = iteration + 1
_test_query_params = _tmp_query_params.copy()
_query_param_name = k.split('|')[-1]
_query_param_name = container_name_to_param(k)
_test_query_params[_query_param_name] = v
try:
_dummy_curl.setopt(pycurl.URL, '{}{}'.format(url, self.dict_to_query_string(_test_query_params)))
_tmp_query_params[_query_param_name] = v
break
except (UnicodeEncodeError, ValueError) as e:
self.logger.exception(e)
self.logger.debug('{} Problem adding ({}) as query param. Issue was:{}'.format(iteration, k, e))
if len(v):
self.logger.debug('Removing first character from query param, current length: %s', len(v))
v = v[1:]
v = self.chop_fuzz_value(original_fuzz_value=original_value, fuzz_value=v)
else:
self.logger.info('The whole query param was removed, using empty string instead')
_tmp_query_params[_query_param_name] = ""
Expand All @@ -145,7 +148,10 @@ def format_pycurl_url(self, url):
_tmp_url_list = list()
for part in url_fields:
self.logger.debug('Processing URL part: {}'.format(part))
original_value = part
iteration = 0
self.chop_left = True
self.chop_right = True
while True:
iteration = iteration + 1
try:
Expand All @@ -159,8 +165,7 @@ def format_pycurl_url(self, url):
except (UnicodeEncodeError, ValueError) as e:
self.logger.debug('{} Problem adding ({}) to the url. Issue was:{}'.format(iteration, part, e))
if len(part):
self.logger.debug('Removing first character from part, current length: %s', len(part))
part = part[1:]
part = self.chop_fuzz_value(original_fuzz_value=original_value, fuzz_value=part)
else:
self.logger.info('The whole url part was removed, using empty string instead')
_tmp_url_list.append("-")
Expand All @@ -171,6 +176,20 @@ def format_pycurl_url(self, url):
self.logger.info('URL to be used: %s', _return)
return _return

def chop_fuzz_value(self, original_fuzz_value, fuzz_value):
if self.chop_left:
self.logger.debug('Remoe first character from value, current length: %s', len(fuzz_value))
fuzz_value = fuzz_value[1:]
if len(fuzz_value) == 0:
self.chop_left = False
fuzz_value = original_fuzz_value
elif self.chop_right:
self.logger.debug('Remove last character from value, current length: %s', len(fuzz_value))
fuzz_value = fuzz_value[:-1]
if len(fuzz_value) == 1:
self.chop_left = False
return fuzz_value

def format_pycurl_header(self, headers):
"""
Pycurl and other http clients are picky, so this function tries to put everyting into the field as it can.
Expand All @@ -184,8 +203,8 @@ def format_pycurl_header(self, headers):
for k, v in headers.items():
original_value = v
iteration = 0
chop_left = True
chop_right = True
self.chop_left = True
self.chop_right = True
while True:

iteration = iteration + 1
Expand All @@ -196,17 +215,7 @@ def format_pycurl_header(self, headers):
except ValueError as e:
self.logger.debug('{} Problem at adding {} to the header. Issue was:{}'.format(iteration, k, e))
if len(v):
if chop_left:
self.logger.debug('Removing first character from value, current length: %s', len(v))
v = v[1:]
if len(v) == 0:
chop_left = False
v = original_value
elif chop_right:
self.logger.debug('Removing last character from value, current length: %s', len(v))
v = v[:-1]
if len(v) == 1:
chop_left = False
v = self.chop_fuzz_value(original_fuzz_value=original_value, fuzz_value=v)
else:
self.logger.info('The whole header value was removed, using empty string instead')
_tmp[k] = ""
Expand Down Expand Up @@ -324,8 +333,7 @@ def transmit(self, **kwargs):
def fix_data(data):
new_data = {}
for data_key, data_value in data.items():
new_key = data_key.split('|')[-1]
new_data[new_key] = data_value
new_data[container_name_to_param(data_key)] = data_value
return new_data

def post_test(self, test_num):
Expand Down Expand Up @@ -358,7 +366,7 @@ def expand_path_variables(self, url, path_parameters):
_temporally_url_list = list()
for path_key, path_value in path_parameters.items():
self.logger.debug('Processing: path_key: {} , path_variable: {}'.format(path_key, path_value))
path_parameter = path_key.split('|')[-1]
path_parameter = container_name_to_param(path_key)
url_path_paramter = '{%PATH_PARAM%}'.replace('%PATH_PARAM%', path_parameter)
splitter = '(%PATH_PARAM%)'.replace('%PATH_PARAM%', url_path_paramter)
url_list = re.split(splitter, url)
Expand Down
11 changes: 10 additions & 1 deletion apifuzzer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_sample_data_by_type(param_type):
u'integer': 1,
u'number': 667.5,
u'boolean': False,
u'array': [1, 2, 3] # transform_data_to_bytes complains when this array contains strings.
u'array': [1, 2, 3] # transform_data_to_bytes complains when this array contains strings.
}
return types.get(param_type, b'\x00')

Expand All @@ -72,6 +72,7 @@ def set_logger(level='warning', basic_output=False):
logger.setLevel(level=level.upper())
return logger


def transform_data_to_bytes(data_in):
if isinstance(data_in, float):
return bytes(int(data_in))
Expand All @@ -93,3 +94,11 @@ def try_b64encode(data_in):
return b64encode(data_in)
except (TypeError, Error):
return data_in


def param_to_container_name(normalized_url, method, param):
return '{}|{}|{}'.format(normalized_url, method, param)


def container_name_to_param(container_name):
return container_name.split('|')[-1]

0 comments on commit 6d2023a

Please sign in to comment.