Skip to content

Commit

Permalink
#36 related testing
Browse files Browse the repository at this point in the history
  • Loading branch information
KissPeter committed May 3, 2020
1 parent 3ee6f29 commit 8e203ee
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 13 deletions.
22 changes: 22 additions & 0 deletions apifuzzer/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import json
import logging
import os
Expand All @@ -7,6 +8,7 @@
from logging import Formatter
from logging.handlers import SysLogHandler
from random import randint
from typing import Optional

import pycurl
from bitstring import Bits
Expand Down Expand Up @@ -158,3 +160,23 @@ def get_api_definition_from_file(src_file):
except Exception:
print('Failed to parse input file, exit')
exit()


def json_data(arg_string: Optional[str]) -> dict:
"""
Transforms input string to JSON. Input must be dict or list of dicts like string
:type arg_string: str
:rtype dict
"""
if isinstance(arg_string, dict) or isinstance(arg_string, list): # support testing
arg_string = json.dumps(arg_string)
try:
_return = json.loads(arg_string)
if hasattr(_return, 'append') or hasattr(_return, 'keys'):
return _return
else:
raise TypeError('not list or dict')
except (TypeError, json.decoder.JSONDecodeError):
msg = '%s is not JSON', arg_string
print('Debugging: %s', arg_string.replace(' ', '_'))
raise argparse.ArgumentTypeError(msg)
10 changes: 1 addition & 9 deletions fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import print_function

import argparse
import json
import signal
import sys
import tempfile
Expand All @@ -14,7 +13,7 @@
from apifuzzer.fuzzer_target.fuzz_request_sender import FuzzerTarget
from apifuzzer.server_fuzzer import OpenApiServerFuzzer
from apifuzzer.swagger_template_generator import SwaggerTemplateGenerator
from apifuzzer.utils import set_logger, get_api_definition_from_file, save_api_definition
from apifuzzer.utils import set_logger, get_api_definition_from_file, save_api_definition, json_data


class Fuzzer(object):
Expand Down Expand Up @@ -72,13 +71,6 @@ def signal_handler(sig, frame):
sys.exit(0)


def json_data(arg_string):
try:
return json.loads(arg_string)
except Exception as e:
raise argparse.ArgumentError('{} is not JSON'.format(arg_string))


parser = argparse.ArgumentParser(description='API fuzzer configuration',
formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=20))
parser.add_argument('-s', '--src_file',
Expand Down
1 change: 1 addition & 0 deletions test/requirements_for_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ sphinx==2.4.1
sphinx_rtd_theme
pytest-html==2.0.1
coverage==5.0.3
hypothesis==5.10.4
43 changes: 39 additions & 4 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import os
import random
import string
import tempfile

import requests
Expand Down Expand Up @@ -28,6 +30,9 @@ def setup_class(cls):
with open(src_file, 'r') as f:
cls.swagger = json.loads(f.read())

def setup_method(self, method):
self.auth_headers = None

def teardown_method(self, method):
"""
Clears the report directory at the end of each test run
Expand All @@ -47,19 +52,48 @@ def query_last_call(self):
assert _resp.status_code == 200, 'Response headers: {}, response body: {}'.format(_resp.headers, _resp.content)
return json.loads(_resp.content.decode("utf-8"))

def fuzz(self, api_resources):
def random_string(self, length=8):
return ''.join(random.choice(string.ascii_lowercase) for i in range(length))

def generate_random_auth_headers(self):
if bool(random.getrandbits(1)):
self.auth_headers = [{self.random_string(): self.random_string()}]
else:
self.auth_headers = {self.random_string(): self.random_string()}

def check_auth_header_in_request(self, request_headers):
header_found = False
assert len(request_headers)
if isinstance(self.auth_headers, list):
req_header = self.auth_headers[0]
else:
req_header = self.auth_headers
auth_header_key = list(req_header.keys())[0]
auth_header_value = list(req_header.values())[0]
for header in json.loads(request_headers).items():
if header[0] == auth_header_key:
assert header[1] == auth_header_value
header_found = True
assert header_found

def fuzz(self, api_resources, headers):
"""
Call APIFuzzer with the given api definition
:type api_resources: dict
:param headers: headers to add fuzz request
"""
if headers is None:
self.generate_random_auth_headers()
else:
self.auth_headers = headers
prog = Fuzzer(api_resources=api_resources,
report_dir=self.report_dir,
test_level=1,
alternate_url=self.test_app_url,
test_result_dst=None,
log_level='Debug',
basic_output=False,
auth_headers={}
auth_headers=self.auth_headers
)
prog.prepare()
prog.run()
Expand All @@ -70,11 +104,11 @@ def get_last_report_file(self):
with open("{}/{}".format(self.report_dir, self.report_files[-1]), mode='r', encoding='utf-8') as f:
return json.load(f)

def fuzz_and_get_last_call(self, api_path, api_def):
def fuzz_and_get_last_call(self, api_path, api_def, headers=None):
self.swagger.pop('paths')
self.swagger['paths'] = {}
self.swagger['paths'][api_path] = api_def
self.fuzz(self.swagger)
self.fuzz(self.swagger, headers)
last_call = self.query_last_call()
assert last_call['resp_status'] == 500, '{} received, full response: {}'.format(last_call['resp_status'],
last_call)
Expand All @@ -90,3 +124,4 @@ def repot_basic_check(self):
assert field in last_report.keys(), assert_msg
if last_report.get('parsed_status_code') is not None:
assert last_report['parsed_status_code'] == 500, assert_msg
self.check_auth_header_in_request(last_report.get('request_headers'))
25 changes: 25 additions & 0 deletions test/unit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import argparse

import pytest
from hypothesis import given, strategies as st

from apifuzzer.utils import json_data
from test.unit_test_composer import dict_str, list_of_dicts


@given(st.text(min_size=1))
def test_json_data_invalid(data):
with pytest.raises(argparse.ArgumentTypeError):
json_data(data)


@given(data=dict_str())
def test_json_data_dict_valid(data):
res = json_data(data)
assert isinstance(res, dict)


@given(data=list_of_dicts())
def test_json_data_list_valid(data):
res = json_data(data)
assert isinstance(res, list)
11 changes: 11 additions & 0 deletions test/unit_test_composer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from hypothesis import strategies as st


@st.composite
def dict_str(draw):
return draw(st.dictionaries(st.text(min_size=1), st.text(min_size=1), min_size=1))


@st.composite
def list_of_dicts(draw):
return draw(st.lists(dict_str()))

0 comments on commit 8e203ee

Please sign in to comment.