Skip to content

Commit

Permalink
black formatter + handle vulture findings
Browse files Browse the repository at this point in the history
  • Loading branch information
KissPeter committed Feb 17, 2022
1 parent 814539c commit 59898b0
Show file tree
Hide file tree
Showing 16 changed files with 562 additions and 347 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ jobs:
./APIFuzzer -h
- name: Lint
run: |
black apifuzzer/ || true
vulture --min-confidence 100 apifuzzer/ || true
black apifuzzer/
vulture --min-confidence 100 apifuzzer/
python3 test/test_application.py&
vulture --min-confidence 100 apifuzzer/ || true
- name: Test
run: |
pytest -x --hypothesis-show-statistics --cov-report html --cov=apifuzzer --html=pytest.html --self-contained-html --durations=10 --show-capture=stdout -vv -rP test
Expand Down
2 changes: 1 addition & 1 deletion apifuzzer/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.12'
__version__ = "0.9.12"
5 changes: 2 additions & 3 deletions apifuzzer/apifuzzerreport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


class ApifuzzerReport(Report):

def __init__(self, name):
super().__init__(name)

Expand All @@ -14,9 +13,9 @@ def is_failed(self):
.. deprecated:: 0.6.7
use :func:`~kitty.data.report.Report.get_status`
"""
raise NotImplementedError('API was changed, use get_status instead')
raise NotImplementedError("API was changed, use get_status instead")

def to_dict(self, encoding='base64'):
def to_dict(self, encoding="base64"):
"""
Return a dictionary version of the report
Expand Down
39 changes: 21 additions & 18 deletions apifuzzer/base_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@


class BaseTemplate(object):

def __init__(self, name):
self.logger = get_logger(self.__class__.__name__)
self.name = name
self.content_type = ''
self.content_type = ""
self.method = None
self.url = None
self.params = set()
Expand All @@ -19,20 +18,20 @@ def __init__(self, name):
self.query = set()
self.cookies = set()
self.field_to_param = {
'params': self.params,
'headers': self.headers,
'data': self.data,
'path_variables': self.path_variables,
'cookies': self.cookies,
'query': self.query,
'content_type': self.content_type
"params": self.params,
"headers": self.headers,
"data": self.data,
"path_variables": self.path_variables,
"cookies": self.cookies,
"query": self.query,
"content_type": self.content_type,
}
self.place_to_field = {
'path': self.path_variables,
'query': self.query,
'header': self.headers,
'cookie': self.query,
'body': self.data
"path": self.path_variables,
"query": self.query,
"header": self.headers,
"cookie": self.query,
"body": self.data,
}
"""
Possible paramters from request docs:
Expand All @@ -49,19 +48,23 @@ def get_stat(self):
total = 0
for field in self.field_to_param.values():
total += len(field)
self.logger.info(f'Template size: {total}, content: {self.field_to_param}')
self.logger.info(f"Template size: {total}, content: {self.field_to_param}")
return total

def compile_template(self):
_url = Static(name='url', value=self.url)
_method = Static(name='method', value=self.method)
_url = Static(name="url", value=self.url)
_method = Static(name="method", value=self.method)
template = Template(name=self.name, fields=[_url, _method])
for name, field in self.field_to_param.items():
if list(field):
try:
template.append_fields([Container(name=name, fields=field)])
except KittyException as e:
self.logger.warning('Failed to add {} because {}, continue processing...'.format(name, e))
self.logger.warning(
"Failed to add {} because {}, continue processing...".format(
name, e
)
)
return template

def get_content_type(self):
Expand Down
54 changes: 38 additions & 16 deletions apifuzzer/custom_fuzzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


class APIFuzzerGroup(Group):

def __init__(self, name, value):
super().__init__(values=value, name=name, fuzzable=True)

Expand All @@ -18,7 +17,7 @@ def accept_list_as_value():


class Utf8Chars(BaseField):
'''
"""
This custom fuzzer iterates through the UTF8 chars and gives back random section between min and max length
Highly relies on random numbers so most probably will give you different values each time to run it.
Expand All @@ -30,11 +29,19 @@ class Utf8Chars(BaseField):
>>> except (UnicodeEncodeError, ValueError):
>>> pass
Above 1114111 chars started to getting unprocessable so this is the upper limit for now.
'''
"""

MAX = 1114111

def __init__(self, value, name, fuzzable=True, min_length=20, max_length=None, num_mutations=80):
def __init__(
self,
value,
name,
fuzzable=True,
min_length=20,
max_length=None,
num_mutations=80,
):
super(BaseField, self).__init__(name=name) # pylint: disable=E1003
self.logger = self.logger = get_logger(self.__class__.__name__)
self.name = name
Expand Down Expand Up @@ -63,7 +70,7 @@ def str_to_bytes(value):
"""
kassert.is_of_types(value, (bytes, bytearray, six.string_types))
if isinstance(value, six.string_types):
return value.encode(encoding='utf-8')
return value.encode(encoding="utf-8")
if isinstance(value, bytearray):
return bytes(value)
return value
Expand All @@ -82,10 +89,10 @@ def _mutate(self):
self.position = self.init_position()

def __str__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"

def __repr__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"


class RandomBitsField(RandomBits):
Expand All @@ -96,34 +103,48 @@ class RandomBitsField(RandomBits):
"""

def not_implemented(self, func_name):
_ = func_name
pass

def __init__(self, value, name, fuzzable=True):
self.name = name
self.value = value
super(RandomBitsField, self).__init__(name=name, value=value, min_length=0, max_length=len(value) * 2,
fuzzable=fuzzable, num_mutations=80)
super(RandomBitsField, self).__init__(
name=name,
value=value,
min_length=0,
max_length=len(value) * 2,
fuzzable=fuzzable,
num_mutations=80,
)

def _mutate(self):
if self._step:
length = self._min_length + self._step * self._current_index
else:
length = self._random.randint(self._min_length, self._max_length)
current_bytes = ''
current_bytes = ""
for _ in range(length // 8 + 1):
current_bytes += chr(self._random.randint(0, 255))
self._current_value = Bits(bytes=strToBytes(current_bytes))[:length]

def __str__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"

def __repr__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"


class UnicodeStrings(String):

def __init__(self, value, name, min_length=0, max_length=None, num_mutations=80, fuzzable=True):
def __init__(
self,
value,
name,
min_length=0,
max_length=None,
num_mutations=80,
fuzzable=True,
):
self.min_length = min_length
self.max_length = max_length if max_length else len(value) * 2
self._num_mutations = num_mutations
Expand All @@ -132,10 +153,11 @@ def __init__(self, value, name, min_length=0, max_length=None, num_mutations=80,
super(UnicodeStrings, self).__init__(name=name, value=value, fuzzable=fuzzable)

def not_implemented(self, func_name):
_ = func_name
pass

def __str__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"

def __repr__(self):
return f'{self.name}->{self.value}'
return f"{self.name}->{self.value}"
3 changes: 1 addition & 2 deletions apifuzzer/fuzz_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


class APIFuzzerModel(GraphModel):

def __init__(self, name='GraphModel', content_type=None):
def __init__(self, name="GraphModel", content_type=None):
super().__init__(name)
self.content_type = content_type
85 changes: 46 additions & 39 deletions apifuzzer/fuzz_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,35 @@
from ruamel.yaml import YAML
from ruamel.yaml.scanner import ScannerError

from apifuzzer.custom_fuzzers import RandomBitsField, Utf8Chars, UnicodeStrings, APIFuzzerGroup
from apifuzzer.custom_fuzzers import (
RandomBitsField,
Utf8Chars,
UnicodeStrings,
APIFuzzerGroup,
)
from apifuzzer.exceptions import FailedToParseFileException
from apifuzzer.utils import download_file, secure_randint


def get_sample_data_by_type(param_type):
types = {
u'name': '012',
u'string': 'asd',
u'integer': 1,
u'number': 667.5,
u'boolean': False,
u'array': [1, 2, 3] # transform_data_to_bytes complains when this array contains strings.
"name": "012",
"string": "asd",
"integer": 1,
"number": 667.5,
"boolean": False,
"array": [
1,
2,
3,
], # transform_data_to_bytes complains when this array contains strings.
}
return types.get(param_type, b'\x00')
return types.get(param_type, b"\x00")


def get_field_type_by_method(http_method):
fields = {
'GET': 'params',
'POST': 'data',
'PUT': 'data'
}
return fields.get(http_method, 'data')
fields = {"GET": "params", "POST": "data", "PUT": "data"}
return fields.get(http_method, "data")


def get_fuzz_type_by_param_type(fuzz_type):
Expand All @@ -36,28 +41,28 @@ def get_fuzz_type_by_param_type(fuzz_type):
string_types = [UnicodeStrings, RandomBitsField, Utf8Chars]
number_types = [UnicodeStrings, RandomBitsField]
types = {
'integer': number_types,
'float': number_types,
'double': number_types,
'int32': number_types,
'int64': number_types,
'number': number_types,
'string': string_types,
'email': string_types,
'uuid': string_types,
'uri': string_types,
'hostname': string_types,
'ipv4': string_types,
'ipv6': string_types,
'boolean': string_types,
'enum': [APIFuzzerGroup]
"integer": number_types,
"float": number_types,
"double": number_types,
"int32": number_types,
"int64": number_types,
"number": number_types,
"string": string_types,
"email": string_types,
"uuid": string_types,
"uri": string_types,
"hostname": string_types,
"ipv4": string_types,
"ipv6": string_types,
"boolean": string_types,
"enum": [APIFuzzerGroup],
}
fuzzer_list = types.get(fuzz_type, string_types)
return fuzzer_list[secure_randint(0, max(len(fuzzer_list) - 1, 1))]


def container_name_to_param(container_name):
return container_name.split('|')[-1]
return container_name.split("|")[-1]


def get_api_definition_from_file(src_file, logger=None):
Expand All @@ -66,21 +71,23 @@ def get_api_definition_from_file(src_file, logger=None):
else:
print_func = print
try:
with open(src_file, mode='rb') as f:
with open(src_file, mode="rb") as f:
api_definition = f.read()
# try loading as JSON first, then YAML
try:
return json.loads(api_definition.decode('utf-8'))
return json.loads(api_definition.decode("utf-8"))
except ValueError as e:
print_func(f'Failed to load input ({src_file}) as JSON because ({e}), maybe YAML?')
print_func(
f"Failed to load input ({src_file}) as JSON because ({e}), maybe YAML?"
)
try:
yaml = YAML(typ='safe')
return yaml.load(api_definition.decode('utf-8'))
yaml = YAML(typ="safe")
return yaml.load(api_definition.decode("utf-8"))
except (TypeError, ScannerError) as e:
print_func(f'Failed to load input ({src_file}) as YAML:{e}')
print_func(f"Failed to load input ({src_file}) as YAML:{e}")
raise e
except (Exception, FileNotFoundError) as e:
print_func(f'Failed to parse input file ({src_file}), because: ({e}) exit')
print_func(f"Failed to parse input file ({src_file}), because: ({e}) exit")
raise FailedToParseFileException


Expand All @@ -97,5 +104,5 @@ def get_base_url_form_api_src(url):
:param url: url like https://example.com/api/v1/api.json
:return: url like https://example.com/api/v1
"""
splited_url = url.split('/')
return "/".join(splited_url[:len(splited_url) - 1])
splited_url = url.split("/")
return "/".join(splited_url[: len(splited_url) - 1])

0 comments on commit 59898b0

Please sign in to comment.