diff --git a/src/offat/__main__.py b/src/offat/__main__.py index 6d64219..9836e69 100644 --- a/src/offat/__main__.py +++ b/src/offat/__main__.py @@ -8,7 +8,8 @@ def banner(): - print(r''' + print( + r''' _/| |\_ / | | \ | \ / | @@ -19,12 +20,13 @@ def banner(): \_ \ / _/ \__ | __/ \ _ / - _/ \_ - / _/|\_ \ - / | \ + _/ \_ + / _/|\_ \ + / | \ / v \ OFFAT - ''') + ''' + ) def start(): @@ -32,24 +34,91 @@ def start(): banner() parser = ArgumentParser(prog='offat') - parser.add_argument('-f', '--file', dest='fpath', type=str, - help='path or url of openapi/swagger specification file', required=True) - parser.add_argument('-v', '--version', action='version', - version=f'%(prog)s {get_package_version()}') - parser.add_argument('-rl', '--rate-limit', dest='rate_limit', - help='API requests rate limit per second', type=float, default=60, required=False) - parser.add_argument('-pr', '--path-regex', dest='path_regex_pattern', type=str, - help='run tests for paths matching given regex pattern', required=False, default=None) - parser.add_argument('-o', '--output', dest='output_file', type=str, - help='path to store test results in specified format. Default format is html', required=False, default=None) - parser.add_argument('-of', '--format', dest='output_format', type=str, choices=[ - 'json', 'yaml', 'html', 'table'], help='Data format to save (json, yaml, html, table). Default: table', required=False, default='table') - parser.add_argument('-H', '--headers', dest='headers', type=str, - help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*') - parser.add_argument('-tdc', '--test-data-config', dest='test_data_config', - help='YAML file containing user test data for tests', required=False, type=str) - parser.add_argument('-p', '--proxy', dest='proxy', - help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str) + parser.add_argument( + '-f', + '--file', + dest='fpath', + type=str, + help='path or url of openapi/swagger specification file', + required=True, + ) + parser.add_argument( + '-v', '--version', action='version', version=f'%(prog)s {get_package_version()}' + ) + parser.add_argument( + '-rl', + '--rate-limit', + dest='rate_limit', + help='API requests rate limit per second', + type=float, + default=60, + required=False, + ) + parser.add_argument( + '-pr', + '--path-regex', + dest='path_regex_pattern', + type=str, + help='run tests for paths matching given regex pattern', + required=False, + default=None, + ) + parser.add_argument( + '-o', + '--output', + dest='output_file', + type=str, + help='path to store test results in specified format. Default format is html', + required=False, + default=None, + ) + parser.add_argument( + '-of', + '--format', + dest='output_format', + type=str, + choices=['json', 'yaml', 'html', 'table'], + help='Data format to save (json, yaml, html, table). Default: table', + required=False, + default='table', + ) + parser.add_argument( + '-H', + '--headers', + dest='headers', + type=str, + help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', + required=False, + default=None, + action='append', + nargs='*', + ) + parser.add_argument( + '-tdc', + '--test-data-config', + dest='test_data_config', + help='YAML file containing user test data for tests', + required=False, + type=str, + ) + parser.add_argument( + '-p', + '--proxy', + dest='proxies_list', + help='Proxy server URL to route HTTP requests through (e.g. "http://proxyserver:port")', + action='append', + required=False, + type=str, + default=None, + ) + parser.add_argument( + '-s', + '--ssl', + dest='ssl', + required=False, + action='store_true', + help='Enable SSL Verification', + ) args = parser.parse_args() # convert req headers str to dict @@ -75,7 +144,8 @@ def start(): req_headers=headers_dict, rate_limit=rate_limit, test_data_config=test_data_config, - proxy=args.proxy, + proxies=args.proxies_list, + ssl=args.ssl, ) diff --git a/src/offat/config_data_handler.py b/src/offat/config_data_handler.py index 1cabcba..8fec71a 100644 --- a/src/offat/config_data_handler.py +++ b/src/offat/config_data_handler.py @@ -2,6 +2,37 @@ from .logger import logger +def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]: + """ + Update values in list1 based on the corresponding "name" values in list2. + + Args: + list1 (list of dict): The list of dictionaries to be updated. + list2 (list of dict): The list of dictionaries containing values to update from. + + Returns: + list of dict: The updated list1 with values from list2. + + Example: + ```python + list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': 'old@example.com'}] + list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}] + updated_list = update_values(list1, list2) + print(updated_list) + # Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}] + ``` + """ + # Create a dictionary for faster lookup + lookup_dict = {item['name']: item['value'] for item in list2} + + # Update values in list1 using index lookup + for item in list1: + if item['name'] in lookup_dict: + item['value'] = lookup_dict[item['name']] + + return list1 + + def validate_config_file_data(test_config_data: dict): if not isinstance(test_config_data, dict): logger.warning('Invalid data format') @@ -11,7 +42,9 @@ def validate_config_file_data(test_config_data: dict): logger.warning('Error Occurred While reading file: %s', test_config_data) return False - if not test_config_data.get('actors', ): + if not test_config_data.get( + 'actors', + ): logger.warning('actors are required') return False @@ -36,15 +69,21 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]): request_headers[header.get('name')] = header.get('value') for test in tests: - # replace key and value instead of appending - test['body_params'] += body_params - test['query_params'] += query_params - test['path_params'] += path_params + test['body_params'] = overwrite_user_params( + deepcopy(test['body_params']), body_params + ) + test['query_params'] = overwrite_user_params( + deepcopy(test['query_params']), query_params + ) + test['path_params'] += overwrite_user_params( + deepcopy(test['path_params']), path_params + ) # for post test processing tests such as broken authentication test['test_actor_name'] = actor_name if test.get('kwargs', {}).get('headers', {}).items(): test['kwargs']['headers'] = dict( - test['kwargs']['headers'], **request_headers) + test['kwargs']['headers'], **request_headers + ) else: test['kwargs']['headers'] = request_headers diff --git a/src/offat/http.py b/src/offat/http.py index 3fe29e7..39a6277 100644 --- a/src/offat/http.py +++ b/src/offat/http.py @@ -1,4 +1,10 @@ +''' +module for interacting with HTTP layer +''' +from random import choice from os import name as os_name +from urllib.parse import urlparse + from aiohttp import ClientSession, ClientTimeout from aiolimiter import AsyncLimiter from tenacity import retry, stop_after_attempt, retry_if_not_exception_type @@ -11,12 +17,54 @@ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) +class Proxies: + ''' + class for handling proxies + ''' + + def __init__(self, proxies: list[str] | None) -> None: + self.p_list = proxies + + def validate_proxy(self, proxy_url: str | None): + """ + Validates a proxy URL based on format and attempts a basic connection. + + Args: + proxy_url: The URL of the proxy server. + + Returns: + True if the proxy URL seems valid and a basic connection can be established, False otherwise. + """ + # Check for valid URL format + parsed_url = urlparse(proxy_url) + if all([parsed_url.scheme, parsed_url.netloc]): + return True + + return False + + def get_random_proxy(self) -> str | None: + ''' + Returns random proxy from the list + ''' + if not self.p_list: + return None + return choice(self.p_list) + + class AsyncRequests: ''' AsyncRequests class helps to send HTTP requests with rate limiting options. ''' - def __init__(self, rate_limit: float = 50, headers: dict | None = None, proxy: str | None = None, allow_redirects: bool = True, timeout: float = 60) -> None: + def __init__( + self, + rate_limit: float = 50, + headers: dict | None = None, + proxies: list[str] | None = [], + allow_redirects: bool = True, + timeout: float = 60, + ssl: bool = False, + ) -> None: '''AsyncRequests class constructor Args: @@ -25,30 +73,39 @@ def __init__(self, rate_limit: float = 50, headers: dict | None = None, proxy: s headers (dict): overrides default headers while sending HTTP requests proxy (str): proxy URL to be used while sending requests timeout (float): total timeout parameter of aiohttp.ClientTimeout + ssl (bool): enforces tls/ssl verification if True Returns: None ''' self._headers = headers - self._proxy = proxy if proxy else None + self._proxy = Proxies(proxies=proxies) self._allow_redirects = allow_redirects self._limiter = AsyncLimiter(max_rate=rate_limit, time_period=1) self._timeout = ClientTimeout(total=timeout) + self._ssl = ssl - @retry(stop=stop_after_attempt(3), retry=retry_if_not_exception_type(KeyboardInterrupt or asyncio.exceptions.CancelledError)) - async def request(self, url: str, method: str = 'GET', *args, **kwargs) -> dict: + @retry( + stop=stop_after_attempt(3), + retry=retry_if_not_exception_type( + KeyboardInterrupt or asyncio.exceptions.CancelledError + ), + ) + async def request(self, url: str, *args, method: str = 'GET', **kwargs) -> dict: '''Send HTTP requests asynchronously Args: url (str): URL of the webpage/endpoint - method (str): HTTP methods (default: GET) supports GET, POST, + method (str): HTTP methods (default: GET) supports GET, POST, PUT, HEAD, OPTIONS, DELETE Returns: dict: returns request and response data as dict ''' async with self._limiter: - async with ClientSession(headers=self._headers, timeout=self._timeout) as session: + async with ClientSession( + headers=self._headers, timeout=self._timeout + ) as session: method = str(method).upper() match method: case 'GET': @@ -68,16 +125,23 @@ async def request(self, url: str, method: str = 'GET', *args, **kwargs) -> dict: case _: req_method = session.get - async with req_method(url, proxy=self._proxy, allow_redirects=self._allow_redirects, *args, **kwargs) as response: + async with req_method( + url, + allow_redirects=self._allow_redirects, + proxy=self._proxy.get_random_proxy(), + ssl=self._ssl, + *args, + **kwargs, + ) as response: resp_data = { - "status": response.status, - "req_url": str(response.request_info.real_url), - "query_url": str(response.url), - "req_method": response.request_info.method, - "req_headers": dict(**response.request_info.headers), - "res_redirection": str(response.history), - "res_headers": dict(response.headers), - "res_body": await response.text(), + 'status': response.status, + 'req_url': str(response.request_info.real_url), + 'query_url': str(response.url), + 'req_method': response.request_info.method, + 'req_headers': dict(**response.request_info.headers), + 'res_redirection': str(response.history), + 'res_headers': dict(response.headers), + 'res_body': await response.text(), } return resp_data diff --git a/src/offat/parsers/__init__.py b/src/offat/parsers/__init__.py index 8ca0ef3..c9c3974 100644 --- a/src/offat/parsers/__init__.py +++ b/src/offat/parsers/__init__.py @@ -8,23 +8,32 @@ from ..logger import logger -def create_parser(fpath_or_url: str, spec: dict = None) -> SwaggerParser | OpenAPIv3Parser: +def create_parser( + fpath_or_url: str, spec: dict | None = None +) -> SwaggerParser | OpenAPIv3Parser | None: '''returns parser based on doc file''' if fpath_or_url and is_valid_url(fpath_or_url): - res = http_get(fpath_or_url) + res = http_get(fpath_or_url, timeout=3) if res.status_code != 200: logger.error( - "server returned status code %d offat expects 200 status code", res.status_code) + 'server returned status code %d offat expects 200 status code', + res.status_code, + ) exit(-1) try: spec = json_load(res.text) fpath_or_url = None except JSONDecodeError: - logger.error("Invalid json data spec file url") + logger.error('Invalid json data spec file url') exit(-1) - parser = BaseParser(file_or_url=fpath_or_url, spec=spec) + try: + parser = BaseParser(file_or_url=fpath_or_url, spec=spec) + except OSError: + logger.error('File Not Found') + exit(-1) + if parser.is_v3: return OpenAPIv3Parser(file_or_url=fpath_or_url, spec=spec) diff --git a/src/offat/tester/post_test_processor.py b/src/offat/tester/post_test_processor.py index 611d300..d793ba8 100644 --- a/src/offat/tester/post_test_processor.py +++ b/src/offat/tester/post_test_processor.py @@ -13,21 +13,25 @@ class PostTestFiltersEnum(Enum): class PostRunTests: '''class Includes tests that should be ran after running all the active test''' + @staticmethod - def run_broken_access_control_tests(results: list[dict], test_data_config: dict) -> list[dict]: + def run_broken_access_control_tests( + results: list[dict], test_data_config: dict + ) -> list[dict]: ''' Runs tests for broken access control Args: results (list[dict]): list of dict for tests results ran - test_data_config (dict): user based config for running tests + test_data_config (dict): user based config for running tests Returns: - list[dict]: list of results + list[dict]: list of results Raises: Any Exception occurred during the test. ''' + def re_match(patterns: list[str], endpoint: str) -> bool: '''Matches endpoint for specified patterns @@ -52,8 +56,7 @@ def re_match(patterns: list[str], endpoint: str) -> bool: actor_names = [] for actor in actors: actor_name = list(actor.keys())[-1] - unauth_endpoint_regex = actor[actor_name].get( - 'unauthorized_endpoints', []) + unauth_endpoint_regex = actor[actor_name].get('unauthorized_endpoints', []) for result in results: if result.get('test_actor_name') != actor_name: @@ -69,18 +72,16 @@ def re_match(patterns: list[str], endpoint: str) -> bool: actor_test_result['test_name'] = 'Broken Access Control' actor_test_result['result_details'] = { True: 'Endpoint might not vulnerable to BAC', # passed - # failed - False: f'BAC: Endpoint is accessible to {actor_name}', + False: f'BAC: Endpoint is accessible to {actor_name}', # failed } - actor_based_tests.append( - PostRunTests.filter_status_code_based_results(actor_test_result)) + actor_based_tests.append(actor_test_result) - return actor_based_tests + return PostRunTests.filter_status_code_based_results(actor_based_tests) @staticmethod def detect_data_exposure(results: list[dict]) -> list[dict]: - '''Detects data exposure against sensitive data regex - patterns and returns dict of matched results + '''Detects data exposure against sensitive data regex + patterns and returns dict of matched results Args: data (str): data to be analyzed for exposure @@ -88,6 +89,7 @@ def detect_data_exposure(results: list[dict]) -> list[dict]: Returns: dict: dictionary with tag as dict key and matched pattern as dict value ''' + def detect_exposure(data: str) -> dict: # Dictionary to store detected data exposures detected_exposures = {} @@ -112,6 +114,7 @@ def detect_exposure(data: str) -> dict: # take a list and filter all at once def filter_status_code_based_results(results: list[dict]) -> list[dict]: new_results = [] + for result in results: new_result = deepcopy(result) response_status_code = result.get('response_status_code') @@ -142,7 +145,8 @@ def update_result_details(results: list[dict]): for result in results: new_result = deepcopy(result) new_result['result_details'] = result['result_details'].get( - result['result']) + result['result'] + ) new_results.append(new_result) @@ -154,13 +158,13 @@ def matcher(results: list[dict]): Args: results (list[dict]): list of dict for tests results ran - match_location (ResponseMatchLocation): Search for match at - specified location (`ResponseMatchLocation.BODY`, + match_location (ResponseMatchLocation): Search for match at + specified location (`ResponseMatchLocation.BODY`, `ResponseMatchLocation.HEADER`,`ResponseMatchLocation.STATUS_CODE`). match_regex (str): regex to match as string Returns: - list[dict]: list of results + list[dict]: list of results Raises: Any Exception occurred during the test. diff --git a/src/offat/tester/test_runner.py b/src/offat/tester/test_runner.py index 57532e4..3dca968 100644 --- a/src/offat/tester/test_runner.py +++ b/src/offat/tester/test_runner.py @@ -15,18 +15,27 @@ class PayloadFor(Enum): class TestRunner: - def __init__(self, rate_limit: float = 60, headers: dict | None = None, proxy: str | None = None) -> None: + def __init__( + self, + rate_limit: float = 60, + headers: dict | None = None, + proxies: list[str] | None = None, + ssl: bool = False, + ) -> None: self._client = AsyncRequests( - rate_limit=rate_limit, headers=headers, proxy=proxy) + rate_limit=rate_limit, headers=headers, proxies=proxies, ssl=ssl + ) self.progress = Progress(console=console) self.progress_task_id: TaskID | None = None - def _generate_payloads(self, params: list[dict], payload_for: PayloadFor = PayloadFor.BODY): + def _generate_payloads( + self, params: list[dict], payload_for: PayloadFor = PayloadFor.BODY + ): '''Generate body payload from passed data for HTTP body and query. Args: params (list[dict]): list of containing payload parameters - payload_for (PayloadFor): PayloadFor constant indicating + payload_for (PayloadFor): PayloadFor constant indicating for which payload is be generated, default: `PayloadFor.BODY` Returns: @@ -37,13 +46,13 @@ def _generate_payloads(self, params: list[dict], payload_for: PayloadFor = Paylo ''' if payload_for not in [PayloadFor.BODY, PayloadFor.QUERY]: raise ValueError( - '`payload_for` arg only supports `PayloadFor.BODY, PayloadFor.QUERY` value') + '`payload_for` arg only supports `PayloadFor.BODY, PayloadFor.QUERY` value' + ) body_payload = {} query_payload = {} for param in params: - param_in = param.get('in') param_name = param.get('name') param_value = param.get('value') @@ -77,15 +86,19 @@ async def send_request(self, test_task): if body_params and str(http_method).upper() not in ['GET', 'OPTIONS']: kwargs['json'] = self._generate_payloads( - body_params, payload_for=PayloadFor.BODY) + body_params, payload_for=PayloadFor.BODY + ) if query_params: kwargs['params'] = self._generate_payloads( - query_params, payload_for=PayloadFor.QUERY) + query_params, payload_for=PayloadFor.QUERY + ) test_result = test_task try: - response = await self._client.request(url=url, method=http_method, *args, **kwargs) + response = await self._client.request( + url=url, method=http_method, *args, **kwargs + ) # add request headers to result test_result['request_headers'] = response.get('req_headers', []) # append response headers and body for analyzing data leak @@ -122,7 +135,8 @@ async def run_tests(self, test_tasks: list, description: str | None): '''run tests generated from test generator module''' self.progress.start() self.progress_task_id = self.progress.add_task( - f'[orange] {description}', total=len(test_tasks)) + f'[orange] {description}', total=len(test_tasks) + ) tasks = [] for test_task in test_tasks: @@ -132,11 +146,17 @@ async def run_tests(self, test_tasks: list, description: str | None): results = await gather(*tasks) return results - except (KeyboardInterrupt, CancelledError,): - logger.error("[!] User Interruption Detected!") + except ( + KeyboardInterrupt, + CancelledError, + ): + logger.error('[!] User Interruption Detected!') exit(-1) except Exception as e: - logger.error("[*] Exception occurred while gathering results: %s", - e, exc_info=exc_info()) + logger.error( + '[*] Exception occurred while gathering results: %s', + e, + exc_info=exc_info(), + ) return [] diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index 0c10317..cd0ff98 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -2,8 +2,8 @@ OWASP OFFAT Tester Utils Module """ from http import client as http_client +from sys import exc_info from typing import Optional -from sys import exc_info, exit from asyncio import run from asyncio.exceptions import CancelledError from re import search as regex_search @@ -21,18 +21,18 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool: - '''checks whether the host from openapi doc is available or not. + '''checks whether the host from openapi doc is available or not. Returns True is host is available else returns False''' - tokens = openapi_parser.host.split(":") + tokens = openapi_parser.host.split(':') match len(tokens): case 1: host = tokens[0] - port = 443 if openapi_parser.http_scheme == "https" else 80 + port = 443 if openapi_parser.http_scheme == 'https' else 80 case 2: host = tokens[0] port = tokens[1] case _: - logger.warning("Invalid host: %s", openapi_parser.host) + logger.warning('Invalid host: %s', openapi_parser.host) return False host = host.split('/')[0] @@ -43,29 +43,35 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool: case _: proto = http_client.HTTPConnection - logger.info("Checking whether host %s:%s is available", host, port) + logger.info('Checking whether host %s:%s is available', host, port) try: conn = proto(host=host, port=port, timeout=5) - conn.request("GET", "/") + conn.request('GET', '/') res = conn.getresponse() - logger.info("Host returned status code: %d", res.status) + logger.info('Host returned status code: %d', res.status) return res.status in range(200, 499) except Exception as e: - logger.error("Unable to connect to host %s:%d due to error: %s", host, port, repr(e)) + logger.error( + 'Unable to connect to host %s:%d due to error: %s', host, port, repr(e) + ) return False -def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False, description: Optional[str] = None) -> list: +def run_test( + test_runner: TestRunner, + tests: list[dict], + regex_pattern: Optional[str] = None, + skip_test_run: Optional[bool] = False, + post_run_matcher_test: Optional[bool] = False, + description: Optional[str] = None, +) -> list: '''Run tests and print result on console''' logger.info('Tests Generated: %d', len(tests)) # filter data if regex is passed if regex_pattern: tests = list( - filter( - lambda x: regex_search(regex_pattern, x.get('endpoint', '')), - tests - ) + filter(lambda x: regex_search(regex_pattern, x.get('endpoint', '')), tests) ) try: @@ -74,12 +80,17 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional else: test_results = run(test_runner.run_tests(tests, description)) - except (KeyboardInterrupt, CancelledError,): - logger.error("[!] User Interruption Detected!") + except ( + KeyboardInterrupt, + CancelledError, + ): + logger.error('[!] User Interruption Detected!') exit(-1) except Exception as e: - logger.error("[*] Exception occurred while running tests: %s", e, exc_info=exc_info()) + logger.error( + '[*] Exception occurred while running tests: %s', e, exc_info=exc_info() + ) return [] if post_run_matcher_test: @@ -98,19 +109,33 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional # Note: redirects are allowed by default making it easier for pentesters/researchers -def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pattern: Optional[str] = None, output_file: Optional[str] = None, output_file_format: Optional[str] = None, rate_limit: Optional[int] = None, req_headers: Optional[dict] = None, proxy: Optional[str] = None, test_data_config: Optional[dict] = None): - global test_table_generator, logger - +def generate_and_run_tests( + api_parser: SwaggerParser | OpenAPIv3Parser, + regex_pattern: str | None = None, + output_file: str | None = None, + output_file_format: str | None = None, + rate_limit: int | None = None, + req_headers: dict | None = None, + proxies: list[str] | None = None, + test_data_config: dict | None = None, + ssl: bool = False, +): + ''' + Generates and runs tests for provied OAS/Swagger file. + ''' if not is_host_up(openapi_parser=api_parser): - logger.error("Stopping tests due to unavailibility of host: %s", api_parser.host) + logger.error( + 'Stopping tests due to unavailibility of host: %s', api_parser.host + ) return - logger.info("Host %s is up", api_parser.host) + logger.info('Host %s is up', api_parser.host) test_runner = TestRunner( rate_limit=rate_limit, headers=req_headers, - proxy=proxy, + proxies=proxies, + ssl=ssl, ) results: list = [] @@ -119,13 +144,14 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for Unsupported HTTP Methods/Verbs:' logger.info(test_name) unsupported_http_endpoint_tests = test_generator.check_unsupported_http_methods( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=unsupported_http_endpoint_tests, regex_pattern=regex_pattern, - description='(FUZZED) ' + test_name + description='(FUZZED) ' + test_name, ) # sqli fuzz test @@ -153,7 +179,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for OS Command Injection Vulnerability with fuzzed params and checking response body:' logger.info(test_name) os_command_injection_tests = test_generator.os_command_injection_fuzz_params_test( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=os_command_injection_tests, @@ -166,7 +193,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body:' logger.info(test_name) os_command_injection_tests = test_generator.xss_html_injection_fuzz_params_test( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=os_command_injection_tests, @@ -179,31 +207,36 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for BOLA in PATH using fuzzed params:' logger.info(test_name) bola_fuzzed_path_tests = test_generator.bola_fuzz_path_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bola_fuzzed_path_tests, regex_pattern=regex_pattern, - description='(FUZZED) Checking for BOLA in PATH:' + description='(FUZZED) Checking for BOLA in PATH:', ) # BOLA path test with fuzzed data + trailing slash - test_name = 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:' + test_name = ( + 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:' + ) logger.info(test_name) bola_trailing_slash_path_tests = test_generator.bola_fuzz_trailing_slash_path_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bola_trailing_slash_path_tests, regex_pattern=regex_pattern, - description='(FUZZED) Checking for BOLA in PATH with trailing slash:' + description='(FUZZED) Checking for BOLA in PATH with trailing slash:', ) # Mass Assignment / BOPLA test_name = 'Checking for Mass Assignment Vulnerability with fuzzed params and checking response status codes:' logger.info(test_name) bopla_tests = test_generator.bopla_fuzz_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bopla_tests, @@ -213,10 +246,12 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa # Tests with User provided Data if bool(test_data_config): - logger.info('[bold]Testing with user provided data[/bold]') + logger.info('[bold] Testing with user provided data [/bold]') - # BOLA path tests with fuzzed + user provided data - test_name = 'Checking for BOLA in PATH using fuzzed and user provided params:', + # # BOLA path tests with fuzzed + user provided data + test_name = ( + 'Checking for BOLA in PATH using fuzzed and user provided params:', + ) logger.info(test_name) bola_fuzzed_user_data_tests = test_generator.test_with_user_data( test_data_config, @@ -283,7 +318,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for Broken Access Control:' logger.info(test_name) bac_results = PostRunTests.run_broken_access_control_tests( - results, test_data_config) + results, test_data_config + ) results += run_test( test_runner=test_runner, tests=bac_results, diff --git a/src/offat/utils.py b/src/offat/utils.py index 823ac69..e9fdac1 100644 --- a/src/offat/utils.py +++ b/src/offat/utils.py @@ -22,46 +22,46 @@ def get_package_version(): def read_yaml(file_path: str) -> dict: - '''Reads YAML file and returns as python dict. + '''Reads YAML file and returns as python dict. returns file not found or yaml errors as dict. Args: file_path (str): path of yaml file Returns: - dict: YAML contents as dict else returns error + dict: YAML contents as dict else returns error ''' if not file_path: - return {"error": "ValueError, path cannot be of None type"} + return {'error': 'ValueError, path cannot be of None type'} if not isfile(file_path): - return {"error": "File Not Found"} + return {'error': 'File Not Found'} - with open(file_path, "r", encoding="utf-8") as f: + with open(file_path, 'r', encoding='utf-8') as f: try: return safe_load(f.read()) except YAMLError: - return {"error": "YAML error"} + return {'error': 'YAML error'} def read_json(file_path: str) -> dict: - '''Reads JSON file and returns as python dict. + '''Reads JSON file and returns as python dict. returns file not found or JSON errors as dict. Args: file_path (str): path of yaml file Returns: - dict: YAML contents as dict else returns error + dict: YAML contents as dict else returns error ''' if not isfile(file_path): - return {"error": "File Not Found"} + return {'error': 'File Not Found'} - with open(file_path, "r", encoding="utf-8") as f: + with open(file_path, 'r', encoding='utf-8') as f: try: return json_load(f.read()) except JSONDecodeError: - return {"error": "JSON error"} + return {'error': 'JSON error'} def read_openapi_file(file_path: str) -> dict: @@ -72,10 +72,10 @@ def read_openapi_file(file_path: str) -> dict: file_path (str): path of openapi file Returns: - dict: YAML contents as dict else returns error + dict: YAML contents as dict else returns error ''' if not isfile(file_path): - return {"error": "File Not Found"} + return {'error': 'File Not Found'} file_ext = file_path.split('.')[-1] match file_ext: @@ -84,7 +84,7 @@ def read_openapi_file(file_path: str) -> dict: case 'yaml': return read_yaml(file_path) case _: - return {"error": "Invalid file extension"} + return {'error': 'Invalid file extension'} def write_json_to_file(json_data: dict, file_path: str): @@ -96,7 +96,7 @@ def write_json_to_file(json_data: dict, file_path: str): Returns: bool: True is `json_data` is written into `file_path` else - returns False (in case of any exception) + returns False (in case of any exception) Raises: Any exception occurred during operation @@ -115,13 +115,15 @@ def write_json_to_file(json_data: dict, file_path: str): logger.error('Invalid JSON data, error while writing to %s file.', file_path) except Exception as e: - logger.error('Unable to write JSON data to file due to below exception:\n%s', repr(e)) + logger.error( + 'Unable to write JSON data to file due to below exception:\n%s', repr(e) + ) return False def str_to_dict(key_values: str) -> dict: - '''Takes string object and converts to dict + '''Takes string object and converts to dict String should in `Key1:Value1,Key2:Value2,Key3:Value3` format Args: @@ -146,12 +148,12 @@ def str_to_dict(key_values: str) -> dict: return new_dict -def headers_list_to_dict(headers_list_list: list[list[str]]) -> dict | None: - '''Takes list object and converts to dict +def headers_list_to_dict(headers_list_list: list[list[str]] | None) -> dict: + '''Takes list object and converts to dict String should in `[['Key1:Value1'],['Key2:Value2'],['Key3:Value3']]` format Args: - headers_list_list (list): headers value as list[list[str]], where str + headers_list_list (list): headers value as list[list[str]], where str is in `key:value` format Returns: @@ -161,7 +163,7 @@ def headers_list_to_dict(headers_list_list: list[list[str]]) -> dict | None: Any exception occurred during operation ''' if not headers_list_list: - return None + return {} response_headers_dict: dict = dict() @@ -176,7 +178,7 @@ def headers_list_to_dict(headers_list_list: list[list[str]]) -> dict | None: def is_valid_url(url: str) -> bool: - '''Accepts string as an parameter and returns bool + '''Accepts string as an parameter and returns bool whether str is url or not Args: @@ -188,7 +190,5 @@ def is_valid_url(url: str) -> bool: Raises: Any exception occurred during operation ''' - url_regex = re_compile( - r'https?:\/\/[a-z.-]+(:\d+)?.*' - ) + url_regex = re_compile(r'https?:\/\/[a-z.-]+(:\d+)?.*') return bool(match(url_regex, url)) diff --git a/src/pyproject.toml b/src/pyproject.toml index a5b01cb..46e802b 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "offat" -version = "0.15.5" +version = "0.16.0" description = "Offensive API tester tool automates checks for common API vulnerabilities" authors = ["Dhrumil Mistry "] license = "MIT"