diff --git a/src/offat/__main__.py b/src/offat/__main__.py index f892e36..f36702a 100644 --- a/src/offat/__main__.py +++ b/src/offat/__main__.py @@ -9,7 +9,7 @@ def banner(): print( - r''' + r""" _/| |\_ / | | \ | \ / | @@ -25,114 +25,114 @@ def banner(): / | \ / v \ OFFAT - ''' + """ ) def start(): - '''Starts cli tool''' + """Starts cli tool""" banner() - parser = ArgumentParser(prog='offat') + parser = ArgumentParser(prog="offat") parser.add_argument( - '-f', - '--file', - dest='fpath', + "-f", + "--file", + dest="fpath", type=str, - help='path or url of openapi/swagger specification file', + help="path or url of openapi/swagger specification file", required=True, ) parser.add_argument( - '-v', '--version', action='version', version=f'%(prog)s {get_package_version()}' + "-v", "--version", action="version", version=f"%(prog)s {get_package_version()}" ) parser.add_argument( - '-rl', - '--rate-limit', - dest='rate_limit', - help='API requests rate limit per second', + "-rl", + "--rate-limit", + dest="rate_limit", + help="API requests rate limit per second", type=float, default=60, required=False, ) parser.add_argument( - '-pr', - '--path-regex', - dest='path_regex_pattern', + "-pr", + "--path-regex", + dest="path_regex_pattern", type=str, - help='run tests for paths matching given regex pattern', + help="run tests for paths matching given regex pattern", required=False, default=None, ) parser.add_argument( - '-o', - '--output', - dest='output_file', + "-o", + "--output", + dest="output_file", type=str, - help='path to store test results', + help="path to store test results", required=False, default=None, ) parser.add_argument( - '-of', - '--format', - dest='output_format', + "-of", + "--format", + dest="output_format", type=str, - choices=['json', 'yaml', 'html', 'table'], - help='Data format to save (json, yaml, html, table). Default: table', + choices=["json", "yaml", "html", "table"], + help="Data format to save (json, yaml, html, table). Default: table", required=False, - default='table', + default="table", ) parser.add_argument( - '-H', - '--headers', - dest='headers', + "-H", + "--headers", + dest="headers", type=str, - help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', + help="HTTP requests headers that should be sent during testing eg: User-Agent: offat", required=False, default=None, - action='append', - nargs='*', + action="append", + nargs="*", ) parser.add_argument( - '-tdc', - '--test-data-config', - dest='test_data_config', - help='YAML file containing user test data for tests', + "-tdc", + "--test-data-config", + dest="test_data_config", + help="YAML file containing user test data for tests", required=False, type=str, ) parser.add_argument( - '-p', - '--proxy', - dest='proxies_list', + "-p", + "--proxy", + dest="proxies_list", help='Proxy server URL to route HTTP requests through (e.g. "http://proxyserver:port")', - action='append', + action="append", required=False, type=str, default=None, ) parser.add_argument( - '-s', - '--ssl', - dest='ssl', + "-s", + "--ssl", + dest="ssl", required=False, - action='store_true', - help='Enable SSL Verification', + action="store_true", + help="Enable SSL Verification", ) parser.add_argument( - '-cf', - '--capture-failed', - dest='capture_failed', - action='store_true', - help='Captures failed requests due to any exceptions into output file', + "-cf", + "--capture-failed", + dest="capture_failed", + action="store_true", + help="Captures failed requests due to any exceptions into output file", ) parser.add_argument( - '--server', - dest='server_url', + "--server", + dest="server_url", type=str, default=None, required=False, - help='server/host base url to overwrite from OAS/Swagger file', + help="server/host base url to overwrite from OAS/Swagger file", ) args = parser.parse_args() @@ -165,5 +165,5 @@ def start(): ) -if __name__ == '__main__': +if __name__ == "__main__": start() diff --git a/src/offat/api/__main__.py b/src/offat/api/__main__.py index 1926578..f222bb6 100644 --- a/src/offat/api/__main__.py +++ b/src/offat/api/__main__.py @@ -5,16 +5,16 @@ def get_offat_installation_dir(): try: # For non-editable installation - return importlib.resources.files('offat') + return importlib.resources.files("offat") except ImportError: # For editable installation (pip install -e .) - return importlib.resources.files('.') + return importlib.resources.files(".") def start(): installation_dir = get_offat_installation_dir() run( - app='offat.api.app:app', + app="offat.api.app:app", host="0.0.0.0", port=8000, workers=2, @@ -23,5 +23,5 @@ def start(): ) -if __name__ == '__main__': +if __name__ == "__main__": start() diff --git a/src/offat/api/app.py b/src/offat/api/app.py index 0880b42..bcac2ca 100644 --- a/src/offat/api/app.py +++ b/src/offat/api/app.py @@ -3,17 +3,18 @@ from offat.api.jobs import scan_api from offat.api.models import CreateScanModel from offat.logger import logger + # from os import uname, environ -logger.info('Secret Key: %s', auth_secret_key) +logger.info("Secret Key: %s", auth_secret_key) # if uname().sysname == 'Darwin' and environ.get('OBJC_DISABLE_INITIALIZE_FORK_SAFETY') != 'YES': # logger.warning('Mac Users might need to configure OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in env\nVisit StackOverFlow link for more info: https://stackoverflow.com/questions/50168647/multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progr') -@app.get('/', status_code=status.HTTP_200_OK) +@app.get("/", status_code=status.HTTP_200_OK) async def root(): return { "name": "OFFAT API", @@ -22,63 +23,64 @@ async def root(): } -@app.post('/api/v1/scan', status_code=status.HTTP_201_CREATED) -async def add_scan_task(scan_data: CreateScanModel, request: Request, response: Response): - # for auth +@app.post("/api/v1/scan", status_code=status.HTTP_201_CREATED) +async def add_scan_task( + scan_data: CreateScanModel, request: Request, response: Response +): + # for auth client_ip = request.client.host - secret_key = request.headers.get('SECRET-KEY', None) + secret_key = request.headers.get("SECRET-KEY", None) if secret_key != auth_secret_key: # return 404 for better endpoint security response.status_code = status.HTTP_401_UNAUTHORIZED - logger.warning('INTRUSION: %s tried to create a new scan job', client_ip) + logger.warning("INTRUSION: %s tried to create a new scan job", client_ip) return {"message": "Unauthorized"} - msg = { - "msg": "Scan Task Created", - "job_id": None - } + msg = {"msg": "Scan Task Created", "job_id": None} - job = task_queue.enqueue(scan_api, scan_data, job_timeout=task_timeout) - msg['job_id'] = job.id + job = task_queue.enqueue(scan_api, scan_data, job_timeout=task_timeout) + msg["job_id"] = job.id - logger.info('SUCCESS: %s created new scan job - %s', client_ip, job.id) + logger.info("SUCCESS: %s created new scan job - %s", client_ip, job.id) return msg -@app.get('/api/v1/scan/{job_id}/result') +@app.get("/api/v1/scan/{job_id}/result") async def get_scan_task_result(job_id: str, request: Request, response: Response): # for auth client_ip = request.client.host - secret_key = request.headers.get('SECRET-KEY', None) + secret_key = request.headers.get("SECRET-KEY", None) if secret_key != auth_secret_key: # return 404 for better endpoint security response.status_code = status.HTTP_401_UNAUTHORIZED - logger.warning('INTRUSION: %s tried to access %s job scan results', client_ip, job_id) + logger.warning( + "INTRUSION: %s tried to access %s job scan results", client_ip, job_id + ) return {"message": "Unauthorized"} scan_results_job = task_queue.fetch_job(job_id=job_id) - logger.info('SUCCESS: %s accessed %s job scan results', client_ip, job_id) + logger.info("SUCCESS: %s accessed %s job scan results", client_ip, job_id) - msg = 'Task Remaining or Invalid Job Id' + msg = "Task Remaining or Invalid Job Id" results = None response.status_code = status.HTTP_202_ACCEPTED if scan_results_job and scan_results_job.is_started: - msg = 'Job In Progress' + msg = "Job In Progress" elif scan_results_job and scan_results_job.is_finished: - msg = 'Task Completed' + msg = "Task Completed" results = scan_results_job.result response.status_code = status.HTTP_200_OK elif scan_results_job and scan_results_job.is_failed: - msg = 'Task Failed. Try Creating Task Again.' + msg = "Task Failed. Try Creating Task Again." response.status_code = status.HTTP_200_OK msg = { - 'msg': msg, - 'results': results, + "msg": msg, + "results": results, } return msg diff --git a/src/offat/api/auth_utils.py b/src/offat/api/auth_utils.py index 2f2c534..ced8a03 100644 --- a/src/offat/api/auth_utils.py +++ b/src/offat/api/auth_utils.py @@ -7,6 +7,6 @@ def generate_random_secret_key_string(length=128): characters = string.ascii_letters + string.digits + "-_." # Generate a random string of the specified length - random_string = ''.join(secrets.choice(characters) for _ in range(length)) + random_string = "".join(secrets.choice(characters) for _ in range(length)) return random_string diff --git a/src/offat/api/config.py b/src/offat/api/config.py index 56bc01b..c4ebf09 100644 --- a/src/offat/api/config.py +++ b/src/offat/api/config.py @@ -10,15 +10,18 @@ load_dotenv() app = FastAPI( - title='OFFAT - API', - servers=[{ - 'url':'http://localhost:8000', - }], + title="OFFAT - API", + servers=[ + { + "url": "http://localhost:8000", + } + ], ) -auth_secret_key = environ.get( - 'AUTH_SECRET_KEY', generate_random_secret_key_string()) -redis_con = Redis(host=environ.get('REDIS_HOST', 'localhost'), - port=int(environ.get('REDIS_PORT', 6379))) -task_queue = Queue(name='offat_task_queue', connection=redis_con) +auth_secret_key = environ.get("AUTH_SECRET_KEY", generate_random_secret_key_string()) +redis_con = Redis( + host=environ.get("REDIS_HOST", "localhost"), + port=int(environ.get("REDIS_PORT", 6379)), +) +task_queue = Queue(name="offat_task_queue", connection=redis_con) task_timeout = 60 * 60 # 3600 s = 1 hour diff --git a/src/offat/api/jobs.py b/src/offat/api/jobs.py index 1dd69ba..08e6c17 100644 --- a/src/offat/api/jobs.py +++ b/src/offat/api/jobs.py @@ -18,6 +18,6 @@ def scan_api(body_data: CreateScanModel): ) return results except Exception as e: - logger.error('Error occurred while creating a job: %s', repr(e)) + logger.error("Error occurred while creating a job: %s", repr(e)) logger.debug("Debug Data:", exc_info=exc_info()) - return [{'error': str(e)}] + return [{"error": str(e)}] diff --git a/src/offat/http.py b/src/offat/http.py index 9e6620c..c7ee132 100644 --- a/src/offat/http.py +++ b/src/offat/http.py @@ -1,6 +1,6 @@ -''' +""" module for interacting with HTTP layer -''' +""" from random import choice from os import name as os_name from urllib.parse import urlparse @@ -13,14 +13,14 @@ import aiohttp.resolver aiohttp.resolver.DefaultResolver = aiohttp.resolver.AsyncResolver -if os_name == 'nt': +if os_name == "nt": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) class Proxies: - ''' + """ class for handling proxies - ''' + """ def __init__(self, proxies: list[str] | None) -> None: self.p_list = proxies @@ -44,18 +44,18 @@ def validate_proxy(self, proxy_url: str | None): return False def get_random_proxy(self) -> str | None: - ''' + """ Returns random proxy from the list - ''' + """ if not self.p_list: return None return choice(self.p_list) class AsyncRequests: - ''' + """ AsyncRequests class helps to send HTTP requests with rate limiting options. - ''' + """ def __init__( self, @@ -66,7 +66,7 @@ def __init__( timeout: float = 60, ssl: bool = False, ) -> None: - '''AsyncRequests class constructor + """AsyncRequests class constructor Args: rate_limit (int): number of requests per seconds @@ -78,7 +78,7 @@ def __init__( Returns: None - ''' + """ self._headers = headers self._proxy = Proxies(proxies=proxies) self._allow_redirects = allow_redirects @@ -92,8 +92,8 @@ def __init__( KeyboardInterrupt or asyncio.exceptions.CancelledError ), ) - async def request(self, url: str, *args, method: str = 'GET', **kwargs) -> dict: - '''Send HTTP requests asynchronously + async def request(self, url: str, *args, method: str = "GET", **kwargs) -> dict: + """Send HTTP requests asynchronously Args: url (str): URL of the webpage/endpoint @@ -102,26 +102,26 @@ async def request(self, url: str, *args, method: str = 'GET', **kwargs) -> dict: Returns: dict: returns request and response data as dict - ''' + """ async with self._limiter: async with ClientSession( headers=self._headers, timeout=self._timeout ) as session: method = str(method).upper() match method: - case 'GET': + case "GET": req_method = session.get - case 'POST': + case "POST": req_method = session.post - case 'PUT': + case "PUT": req_method = session.put - case 'PATCH': + case "PATCH": req_method = session.patch - case 'HEAD': + case "HEAD": req_method = session.head - case 'OPTIONS': + case "OPTIONS": req_method = session.options - case 'DELETE': + case "DELETE": req_method = session.delete case _: req_method = session.get @@ -135,14 +135,14 @@ async def request(self, url: str, *args, method: str = 'GET', **kwargs) -> dict: **kwargs, ) as response: resp_data = { - 'status': response.status, - 'req_url': str(response.request_info.real_url), - 'query_url': str(response.url), - 'req_method': response.request_info.method, - 'req_headers': dict(**response.request_info.headers), - 'res_redirection': str(response.history), - 'res_headers': dict(response.headers), - 'res_body': await response.text(), + "status": response.status, + "req_url": str(response.request_info.real_url), + "query_url": str(response.url), + "req_method": response.request_info.method, + "req_headers": dict(**response.request_info.headers), + "res_redirection": str(response.history), + "res_headers": dict(response.headers), + "res_body": await response.text(), } return resp_data diff --git a/src/offat/logger.py b/src/offat/logger.py index 40c588c..f96c7ed 100644 --- a/src/offat/logger.py +++ b/src/offat/logger.py @@ -10,8 +10,9 @@ logging.basicConfig( format="%(message)s", datefmt="[%X]", - handlers=[RichHandler( - console=console, rich_tracebacks=True, tracebacks_show_locals=True)], + handlers=[ + RichHandler(console=console, rich_tracebacks=True, tracebacks_show_locals=True) + ], ) logger = logging.getLogger("OWASP-OFFAT") logger.setLevel(logging.INFO) diff --git a/src/offat/parsers/__init__.py b/src/offat/parsers/__init__.py index 39dc176..aa5261e 100644 --- a/src/offat/parsers/__init__.py +++ b/src/offat/parsers/__init__.py @@ -12,12 +12,12 @@ def create_parser( spec: dict | None = None, server_url: str | None = None, ) -> SwaggerParser | OpenAPIv3Parser | None: - '''returns parser based on doc file''' + """returns parser based on doc file""" if fpath_or_url and is_valid_url(fpath_or_url): res = http_get(fpath_or_url, timeout=3) if res.status_code != 200: logger.error( - 'server returned status code %d offat expects 200 status code', + "server returned status code %d offat expects 200 status code", res.status_code, ) exit(-1) @@ -26,13 +26,13 @@ def create_parser( spec = json_load(res.text) fpath_or_url = None except JSONDecodeError: - logger.error('Invalid json data spec file url') + logger.error("Invalid json data spec file url") exit(-1) try: parser = BaseParser(file_or_url=fpath_or_url, spec=spec, server_url=server_url) except OSError: - logger.error('File Not Found') + logger.error("File Not Found") exit(-1) if parser.is_v3: diff --git a/src/offat/parsers/openapi.py b/src/offat/parsers/openapi.py index 16c3de4..d762100 100644 --- a/src/offat/parsers/openapi.py +++ b/src/offat/parsers/openapi.py @@ -1,17 +1,17 @@ -''' +""" module to parse OAS v3 documentation JSON/YAML files. -''' +""" from .parser import BaseParser from ..utils import parse_server_url from ..logger import logger class InvalidOpenAPIv3File(Exception): - '''Exception to be raised when openAPI/OAS spec validation fails''' + """Exception to be raised when openAPI/OAS spec validation fails""" class OpenAPIv3Parser(BaseParser): - '''OpenAPI v3 Spec File Parser''' + """OpenAPI v3 Spec File Parser""" # while adding new method to this class, make sure same method is present in SwaggerParser class @@ -20,7 +20,7 @@ def __init__( ) -> None: super().__init__(file_or_url=file_or_url, spec=spec, *args, **kwargs) # noqa if not self.is_v3: - raise InvalidOpenAPIv3File('Invalid OAS v3 file') + raise InvalidOpenAPIv3File("Invalid OAS v3 file") self.http_scheme = self._get_scheme() @@ -29,23 +29,23 @@ def __init__( # raise error if host data not found if not (self.hosts and self.hosts[0]): - raise ValueError('Host is invalid or not found') + raise ValueError("Host is invalid or not found") # parse and set host data host_dict = self.hosts[0] - self.http_scheme = host_dict['scheme'] + self.http_scheme = host_dict["scheme"] self.host = f'{host_dict["host"]}:{host_dict["port"]}' - self.api_base_path = host_dict['basepath'] + self.api_base_path = host_dict["basepath"] self.base_url = f"{self.http_scheme}://{self.host}" self.request_response_params = self._get_request_response_params() def _populate_hosts(self): - servers = self.specification.get('servers', []) + servers = self.specification.get("servers", []) hosts = [] if not servers: - logger.error('Invalid Server Url: Server URLs are missing in spec file') - raise InvalidOpenAPIv3File('Server URLs Not Found in spec file') + logger.error("Invalid Server Url: Server URLs are missing in spec file") + raise InvalidOpenAPIv3File("Server URLs Not Found in spec file") for server in servers: # host = ( @@ -55,34 +55,34 @@ def _populate_hosts(self): # .removesuffix('/') # ) # host = None if host == '' else host - scheme, host, port, basepath = parse_server_url(url=server.get('url')) + scheme, host, port, basepath = parse_server_url(url=server.get("url")) hosts.append( { - 'scheme': scheme, - 'host': host, - 'port': port, - 'basepath': basepath, + "scheme": scheme, + "host": host, + "port": port, + "basepath": basepath, } ) self.hosts = hosts def _get_scheme(self): - servers = self.specification.get('servers', []) + servers = self.specification.get("servers", []) schemes = [] for server in servers: - schemes.append('https' if 'https://' in server.get('url', '') else 'http') + schemes.append("https" if "https://" in server.get("url", "") else "http") - scheme = 'https' if 'https' in schemes else 'http' + scheme = "https" if "https" in schemes else "http" return scheme def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict: - schema_spec_path = param_schema_ref.split('/')[1:] + schema_spec_path = param_schema_ref.split("/")[1:] if len(schema_spec_path) > 3: logger.error( - 'Schema spec $ref path should not be greater than 3 (excluding #)' + "Schema spec $ref path should not be greater than 3 (excluding #)" ) return {} @@ -93,121 +93,121 @@ def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict: return schema_data def _get_param_definition_schema(self, param: dict): - '''Returns Model defined schema for the passed param''' - param_schema = param.get('schema') + """Returns Model defined schema for the passed param""" + param_schema = param.get("schema") # replace schema $ref with model params if param_schema: - param_schema_ref = param_schema.get('$ref') + param_schema_ref = param_schema.get("$ref") if param_schema_ref: param_schema = self._fetch_schema_from_spec(param_schema_ref) return param_schema def _get_response_definition_schema(self, responses: dict): - '''returns schema of API response + """returns schema of API response Args: responses (dict): responses from path http method json data Returns: dict: - ''' + """ for status_code in responses.keys(): # below line could return: ["application/json", "application/xml"] - content = responses[status_code].get('content', None) + content = responses[status_code].get("content", None) if content: status_code_content_type_responses = content.keys() for status_code_content_type in status_code_content_type_responses: - status_code_content = responses[status_code]['content'][ + status_code_content = responses[status_code]["content"][ status_code_content_type ].keys() - if 'parameters' in status_code_content: - responses[status_code]['schema'] = responses[status_code][ - 'content' - ][status_code_content_type]['parameters'] - elif 'schema' in status_code_content: + if "parameters" in status_code_content: + responses[status_code]["schema"] = responses[status_code][ + "content" + ][status_code_content_type]["parameters"] + elif "schema" in status_code_content: responses[status_code][ - 'schema' + "schema" ] = self._get_param_definition_schema( - responses[status_code]['content'][status_code_content_type] + responses[status_code]["content"][status_code_content_type] ) else: # Fetch $ref schema directly - ref = responses[status_code].get('$ref', None) + ref = responses[status_code].get("$ref", None) if ref: - responses[status_code]['schema'] = self._fetch_schema_from_spec(ref) + responses[status_code]["schema"] = self._fetch_schema_from_spec(ref) return responses def _get_request_response_params(self): - '''Returns Schema of requests and response params + """Returns Schema of requests and response params Args: None Returns: list: - ''' + """ requests = [] - paths = self.specification.get('paths', {}) + paths = self.specification.get("paths", {}) # extract endpoints and supported params for path in paths.keys(): - path_params = paths[path].get('parameters', []) + path_params = paths[path].get("parameters", []) for http_method in paths.get(path, {}).keys(): # consider only http methods - if http_method not in ['get', 'put', 'post', 'delete', 'options']: + if http_method not in ["get", "put", "post", "delete", "options"]: continue - request_parameters = paths[path][http_method].get('parameters', []) + request_parameters = paths[path][http_method].get("parameters", []) # create list of parameters: Fetch object schema from OAS file body_params = [] body_parameter_keys = ( - paths[path][http_method].get('requestBody', {}).get('content', {}) + paths[path][http_method].get("requestBody", {}).get("content", {}) ) for body_parameter_key in body_parameter_keys: - body_parameters_dict = paths[path][http_method]['requestBody'][ - 'content' + body_parameters_dict = paths[path][http_method]["requestBody"][ + "content" ][body_parameter_key] - required = paths[path][http_method]['requestBody'].get('required') - description = paths[path][http_method]['requestBody'].get( - 'description' + required = paths[path][http_method]["requestBody"].get("required") + description = paths[path][http_method]["requestBody"].get( + "description" ) body_param = self._get_param_definition_schema(body_parameters_dict) body_params.append( { - 'in': 'body', - 'name': body_parameter_key, - 'description': description, - 'required': required, - 'schema': body_param, + "in": "body", + "name": body_parameter_key, + "description": description, + "required": required, + "schema": body_param, } ) response_params = [] response_params = self._get_response_definition_schema( - paths[path][http_method].get('responses', {}) + paths[path][http_method].get("responses", {}) ) # add body param to request param request_parameters += body_params requests.append( { - 'http_method': http_method, - 'path': path, - 'request_params': request_parameters, - 'response_params': response_params, - 'path_params': path_params, - 'body_params': body_params, + "http_method": http_method, + "path": path, + "request_params": request_parameters, + "response_params": response_params, + "path_params": path_params, + "body_params": body_params, } ) diff --git a/src/offat/parsers/parser.py b/src/offat/parsers/parser.py index a438448..9576770 100644 --- a/src/offat/parsers/parser.py +++ b/src/offat/parsers/parser.py @@ -5,7 +5,7 @@ class InvalidSpecVersion(Exception): - '''Exception to be raised''' + """Exception to be raised""" pass @@ -16,7 +16,7 @@ def __init__( ) -> None: if spec: self.specification: dict = spec - base_uri = '' + base_uri = "" else: self.specification, base_uri = read_from_filename(file_or_url) @@ -24,44 +24,44 @@ def __init__( # overwrite server if present according to OAS version if self.is_v3 and server_url: - self.specification['servers'] = [{'url': server_url}] + self.specification["servers"] = [{"url": server_url}] elif server_url: scheme, host, port, basepath = parse_server_url(url=server_url) - basepath = '/' if basepath == '' else basepath - self.specification['host'] = f'{host}:{port}' - self.specification['schemes'] = [scheme] - self.specification['basePath'] = basepath + basepath = "/" if basepath == "" else basepath + self.specification["host"] = f"{host}:{port}" + self.specification["schemes"] = [scheme] + self.specification["basePath"] = basepath try: validate(spec=self.specification, base_uri=base_uri) self.valid = True except Exception as e: - logger.warning('OAS/Swagger file is invalid!') + logger.warning("OAS/Swagger file is invalid!") logger.error( - 'Failed to validate spec %s due to err: %s', file_or_url, repr(e) + "Failed to validate spec %s due to err: %s", file_or_url, repr(e) ) self.valid = False self.hosts = [] def _get_oas_version(self): - if self.specification.get('openapi'): + if self.specification.get("openapi"): return 3 - elif self.specification.get('swagger'): + elif self.specification.get("swagger"): return 2 - raise InvalidSpecVersion('only openapi and swagger specs are supported for now') + raise InvalidSpecVersion("only openapi and swagger specs are supported for now") def _get_endpoints(self): - '''Returns list of endpoint paths along with HTTP methods allowed''' + """Returns list of endpoint paths along with HTTP methods allowed""" endpoints = [] - for endpoint in self.specification.get('paths', {}).keys(): - methods = list(self.specification['paths'][endpoint].keys()) - if 'parameters' in methods: - methods.remove('parameters') + for endpoint in self.specification.get("paths", {}).keys(): + methods = list(self.specification["paths"][endpoint].keys()) + if "parameters" in methods: + methods.remove("parameters") endpoints.append((endpoint, methods)) return endpoints def _get_endpoint_details_for_fuzz_test(self): - return self.specification.get('paths') + return self.specification.get("paths") diff --git a/src/offat/parsers/swagger.py b/src/offat/parsers/swagger.py index 98ec5ad..0078df7 100644 --- a/src/offat/parsers/swagger.py +++ b/src/offat/parsers/swagger.py @@ -1,16 +1,16 @@ -''' +""" module to parse Swagger v2 documentation JSON/YAML files. -''' +""" from .parser import BaseParser from ..logger import logger class InvalidSwaggerFile(Exception): - '''Exception to be raised when swagger spec validation fails''' + """Exception to be raised when swagger spec validation fails""" class SwaggerParser(BaseParser): - '''Swagger Spec file Parser''' + """Swagger Spec file Parser""" # while adding new method to this class, make sure same method is present in OpenAPIv3Parser class @@ -19,56 +19,56 @@ def __init__( ) -> None: super().__init__(file_or_url=fpath_or_url, spec=spec, *args, **kwargs) # noqa if self.is_v3: - raise InvalidSwaggerFile('Invalid OAS v3 file') + raise InvalidSwaggerFile("Invalid OAS v3 file") self._populate_hosts() self.http_scheme = self._get_scheme() - self.api_base_path = self.specification.get('basePath', '') + self.api_base_path = self.specification.get("basePath", "") self.base_url = f"{self.http_scheme}://{self.host}" self.request_response_params = self._get_request_response_params() def _populate_hosts(self): - host = self.specification.get('host') + host = self.specification.get("host") if not host: - logger.error('Invalid Host: Host is missing') - raise InvalidSwaggerFile('Host Not Found in spec file') + logger.error("Invalid Host: Host is missing") + raise InvalidSwaggerFile("Host Not Found in spec file") hosts = [host] self.hosts = hosts self.host = self.hosts[0] def _get_scheme(self): - scheme = 'https' if 'https' in self.specification.get('schemes', []) else 'http' + scheme = "https" if "https" in self.specification.get("schemes", []) else "http" return scheme def _get_param_definition_schema(self, param: dict): - '''Returns Model defined schema for the passed param''' - param_schema = param.get('schema') + """Returns Model defined schema for the passed param""" + param_schema = param.get("schema") # replace schema $ref with model params if param_schema: - param_schema_ref = param_schema.get('$ref') + param_schema_ref = param_schema.get("$ref") if param_schema_ref: - model_slug = param_schema_ref.split('/')[-1] - param_schema = self.specification.get('definitions', {}).get(model_slug) + model_slug = param_schema_ref.split("/")[-1] + param_schema = self.specification.get("definitions", {}).get(model_slug) return param_schema def _get_response_definition_schema(self, responses: dict): - '''returns schema of API response + """returns schema of API response Args: responses (dict): responses from path http method json data Returns: dict: - ''' + """ for status_code in responses.keys(): status_code_response = responses[status_code].keys() - if 'parameters' in status_code_response: - responses[status_code]['schema'] = responses[status_code]['parameters'] - elif 'schema' in status_code_response: - responses[status_code]['schema'] = self._get_param_definition_schema( + if "parameters" in status_code_response: + responses[status_code]["schema"] = responses[status_code]["parameters"] + elif "schema" in status_code_response: + responses[status_code]["schema"] = self._get_param_definition_schema( responses[status_code] ) else: @@ -77,43 +77,43 @@ def _get_response_definition_schema(self, responses: dict): return responses def _get_request_response_params(self): - '''Returns Schema of requests and response params + """Returns Schema of requests and response params Args: None Returns: list: - ''' + """ requests = [] - paths = self.specification.get('paths', {}) + paths = self.specification.get("paths", {}) # extract endpoints and supported params for path in paths.keys(): - path_params = paths[path].get('parameters', []) + path_params = paths[path].get("parameters", []) for http_method in paths.get(path, {}).keys(): # consider only http methods - if http_method not in ['get', 'put', 'post', 'delete', 'options']: + if http_method not in ["get", "put", "post", "delete", "options"]: continue # below var contains overall params - request_parameters = paths[path][http_method].get('parameters', []) + request_parameters = paths[path][http_method].get("parameters", []) response_params = self._get_response_definition_schema( - paths[path][http_method].get('responses', {}) + paths[path][http_method].get("responses", {}) ) # create list of parameters: Fetch object schema from OAS file for param in request_parameters: - param['schema'] = self._get_param_definition_schema(param) + param["schema"] = self._get_param_definition_schema(param) requests.append( { - 'http_method': http_method, - 'path': path, - 'request_params': request_parameters, - 'response_params': response_params, - 'path_params': path_params, + "http_method": http_method, + "path": path, + "request_params": request_parameters, + "response_params": response_params, + "path_params": path_params, } ) diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index a94b0b7..af04ea2 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -20,21 +20,21 @@ class ReportGenerator: @staticmethod def generate_html_report(results: list[dict]): """generates html report from OFFAT results""" - html_report_template_file_name = 'report.html' + html_report_template_file_name = "report.html" html_report_file_path = path_join( dirname(templates.__file__), html_report_template_file_name ) - with open(html_report_file_path, 'r', encoding='utf-8') as f: + with open(html_report_file_path, "r", encoding="utf-8") as f: report_file_content = f.read() # TODO: validate report data to avoid HTML injection attacks. if not isinstance(results, list): - raise ValueError('results arg expects a list[dict].') + raise ValueError("results arg expects a list[dict].") # HTML escape data escaped_results = [] - escape_keys = ['response_body'] + escape_keys = ["response_body"] for result_dict in results: escaped_result_dict = {} for key, value in result_dict.items(): @@ -47,7 +47,7 @@ def generate_html_report(results: list[dict]): escaped_results.append(escaped_result_dict) report_file_content = report_file_content.replace( - '{ results }', json_dumps(escaped_results) + "{ results }", json_dumps(escaped_results) ) return report_file_content @@ -60,42 +60,42 @@ def handle_report_format( result = None match report_format: - case 'html': - logger.warning('HTML output format displays only basic data.') + case "html": + logger.warning("HTML output format displays only basic data.") result = ReportGenerator.generate_html_report(results=results) - case 'yaml': + case "yaml": logger.warning( - 'YAML output format needs to be sanitized before using it further.' + "YAML output format needs to be sanitized before using it further." ) result = yaml_dump( { - 'results': results, + "results": results, } ) - case 'json': - report_format = 'json' + case "json": + report_format = "json" result = json_dumps( { - 'results': results, + "results": results, } ) case _: # default: CLI table # TODO: filter failed requests first and then create new table for failed requests - report_format = 'table' + report_format = "table" results_table = TestResultTable().generate_result_table( deepcopy(results) ) result = results_table - logger.info('Generated %s format report.', report_format.upper()) + logger.info("Generated %s format report.", report_format.upper()) return result @staticmethod def save_report(report_path: str | None, report_file_content: str | Table | None): """saves/prints report to console""" - if report_path != '/' and report_path: + if report_path != "/" and report_path: dir_name = dirname(report_path) - if dir_name != '' and report_path: + if dir_name != "" and report_path: makedirs(dir_name, exist_ok=True) # print to cli if report path and file content as absent else write to file location. @@ -104,8 +104,8 @@ def save_report(report_path: str | None, report_file_content: str | Table | None and report_file_content and not isinstance(report_file_content, Table) ): - with open(report_path, 'w', encoding='utf-8') as f: - logger.info('Writing report to file: %s', report_path) + with open(report_path, "w", encoding="utf-8") as f: + logger.info("Writing report to file: %s", report_path) f.write(report_file_content) else: if isinstance(report_file_content, Table) and report_file_content.columns: @@ -114,7 +114,7 @@ def save_report(report_path: str | None, report_file_content: str | Table | None isinstance(report_file_content, Table) and not report_file_content.columns ): - logger.warning('No Columns found in Table.') + logger.warning("No Columns found in Table.") else: console.print(report_file_content) @@ -127,12 +127,12 @@ def generate_report( ): """main function used to generate report""" if report_path: - report_format = report_path.split('.')[-1] + report_format = report_path.split(".")[-1] # do not store errored results if `capture_failed` is False if not capture_failed: results = list( - filter(lambda result: result.get('error', True) == False, results) + filter(lambda result: result.get("error", True) == False, results) ) formatted_results = ReportGenerator.handle_report_format( diff --git a/src/offat/report/summary.py b/src/offat/report/summary.py index 557aa58..f101711 100644 --- a/src/offat/report/summary.py +++ b/src/offat/report/summary.py @@ -22,26 +22,26 @@ def get_counts(results: list[dict], filter_errors: bool = False) -> dict[str, in dict: name (str) as key and its associated count (int) """ if filter_errors: - results = list(filter(lambda result: result.get('error', False), results)) + results = list(filter(lambda result: result.get("error", False), results)) error_count = 0 data_leak_count = 0 failed_count = 0 success_count = 0 for result in results: - error_count += 1 if result.get('error', False) else 0 - data_leak_count += 1 if result.get('data_leak', False) else 0 + error_count += 1 if result.get("error", False) else 0 + data_leak_count += 1 if result.get("data_leak", False) else 0 - if result.get('result'): + if result.get("result"): success_count += 1 else: failed_count += 1 count_dict = { - 'errors': error_count, - 'data_leaks': data_leak_count, - 'failed': failed_count, - 'success': success_count, + "errors": error_count, + "data_leaks": data_leak_count, + "failed": failed_count, + "success": success_count, } return count_dict @@ -50,7 +50,7 @@ def get_counts(results: list[dict], filter_errors: bool = False) -> dict[str, in def generate_count_summary( results: list[dict], filter_errors: bool = False, - output_format: str = 'table', + output_format: str = "table", table_title: str | None = None, ) -> Table | str: """ @@ -70,18 +70,18 @@ def generate_count_summary( results=results, filter_errors=filter_errors ) match output_format: - case 'markdown': - output = '' + case "markdown": + output = "" if table_title: - output += f'**{table_title}**\n' + output += f"**{table_title}**\n" for key, count in count_summary.items(): - output += f'{key:<15}:\t{count}\n' + output += f"{key:<15}:\t{count}\n" case _: # table format output = Table( - Column(header='⚔️', overflow='fold', justify='center'), - Column(header='Endpoints Count', overflow='fold'), + Column(header="⚔️", overflow="fold", justify="center"), + Column(header="Endpoints Count", overflow="fold"), title=table_title, ) diff --git a/src/offat/tester/fuzzer.py b/src/offat/tester/fuzzer.py index 8c6119b..d9a09bb 100644 --- a/src/offat/tester/fuzzer.py +++ b/src/offat/tester/fuzzer.py @@ -3,45 +3,45 @@ def generate_random_int(max_value: int = 100): - '''Generate Random Integer value between specified maximum value - note: maximum_value is not consider in range''' + """Generate Random Integer value between specified maximum value + note: maximum_value is not consider in range""" return random.randint(0, max_value) def generate_phone_number(): - '''Generate Random 10 digit phone number starting with 72''' - return '72'+''.join(random.choice(string.digits) for _ in range(8)) + """Generate Random 10 digit phone number starting with 72""" + return "72" + "".join(random.choice(string.digits) for _ in range(8)) def generate_random_chars(length): """Generate a random string of given length containing characters only.""" characters = string.ascii_letters - return ''.join(random.choice(characters) for _ in range(length)) + return "".join(random.choice(characters) for _ in range(length)) def generate_random_char_digits(length): """Generate a random string of given length containing characters and digits only.""" characters = string.ascii_letters + string.digits - return ''.join(random.choice(characters) for _ in range(length)) + return "".join(random.choice(characters) for _ in range(length)) def generate_random_string(length): """Generate a random string of given length.""" characters = string.ascii_letters + string.digits + string.punctuation - return ''.join(random.choice(characters) for _ in range(length)) + return "".join(random.choice(characters) for _ in range(length)) def fuzz_string_type(var_name: str): var_name_lower = str(var_name).lower() - if 'email' in var_name_lower: - var_value = generate_random_char_digits(6).lower() + '@example.com' - elif 'password' in var_name_lower: + if "email" in var_name_lower: + var_value = generate_random_char_digits(6).lower() + "@example.com" + elif "password" in var_name_lower: var_value = generate_random_string(15) - elif 'phone' in var_name_lower: + elif "phone" in var_name_lower: var_value = generate_phone_number() - elif 'name' in var_name_lower: + elif "name" in var_name_lower: var_value = generate_random_chars(7) - elif 'username' in var_name_lower: + elif "username" in var_name_lower: var_value = generate_random_char_digits(6) else: var_value = generate_random_string(10) @@ -49,29 +49,31 @@ def fuzz_string_type(var_name: str): return var_value -def fill_schema_params(params: dict[dict], param_in: str = None, is_required: bool = None): +def fill_schema_params( + params: dict[dict], param_in: str = None, is_required: bool = None +): schema_params = [] for var_name, var_data in params.items(): - var_type = var_data.get('type') + var_type = var_data.get("type") match var_type: - case 'string': + case "string": var_value = fuzz_string_type(var_name) - case 'integer': + case "integer": var_value = generate_random_int() case _: var_value = generate_random_string(10) - var_data['value'] = var_value - var_data['name'] = var_name + var_data["value"] = var_value + var_data["name"] = var_name if is_required: - var_data['required'] = is_required + var_data["required"] = is_required if param_in: - var_data['in'] = param_in + var_data["in"] = param_in schema_params.append(var_data) @@ -80,10 +82,10 @@ def fill_schema_params(params: dict[dict], param_in: str = None, is_required: bo def fuzz_type_value(param_type: str, param_name: str): match param_type: - case 'string': + case "string": param_value = fuzz_string_type(param_name) - case 'integer': + case "integer": param_value = generate_random_int() # TODO: handle file and array type @@ -95,38 +97,44 @@ def fuzz_type_value(param_type: str, param_name: str): def fill_params(params: list[dict], is_v3: bool) -> list[dict]: - '''fills params for OAS/swagger specs''' + """fills params for OAS/swagger specs""" schema_params = [] for index in range(len(params)): - param_type = params[index].get('schema', {}).get( - 'type') if is_v3 else params[index].get('type') - param_is_required = params[index].get('required') - param_in = params[index].get('in') - param_name = params[index].get('name', '') + param_type = ( + params[index].get("schema", {}).get("type") + if is_v3 + else params[index].get("type") + ) + param_is_required = params[index].get("required") + param_in = params[index].get("in") + param_name = params[index].get("name", "") param_value = fuzz_type_value(param_type=param_type, param_name=param_name) - if params[index].get('schema'): - schema_type = params[index].get('schema', {}).get('type') + if params[index].get("schema"): + schema_type = params[index].get("schema", {}).get("type") if schema_type == "object": - schema_obj = params[index].get('schema', {}).get('properties', {}) + schema_obj = params[index].get("schema", {}).get("properties", {}) filled_schema_params = fill_schema_params( - schema_obj, param_in, param_is_required) + schema_obj, param_in, param_is_required + ) else: - filled_schema_params = [{ - 'in': param_in, - 'name': param_name, - 'required': param_is_required, - 'value': param_value - }] + filled_schema_params = [ + { + "in": param_in, + "name": param_name, + "required": param_is_required, + "value": param_value, + } + ] schema_params.append(filled_schema_params) else: - params[index]['value'] = param_value + params[index]["value"] = param_value # delete schema params for param in params: - if param.get('schema'): + if param.get("schema"): params.remove(param) for schema_param in schema_params: diff --git a/src/offat/tester/post_test_processor.py b/src/offat/tester/post_test_processor.py index d793ba8..f7b6ceb 100644 --- a/src/offat/tester/post_test_processor.py +++ b/src/offat/tester/post_test_processor.py @@ -12,13 +12,13 @@ class PostTestFiltersEnum(Enum): class PostRunTests: - '''class Includes tests that should be ran after running all the active test''' + """class Includes tests that should be ran after running all the active test""" @staticmethod def run_broken_access_control_tests( results: list[dict], test_data_config: dict ) -> list[dict]: - ''' + """ Runs tests for broken access control Args: @@ -30,10 +30,10 @@ def run_broken_access_control_tests( Raises: Any Exception occurred during the test. - ''' + """ def re_match(patterns: list[str], endpoint: str) -> bool: - '''Matches endpoint for specified patterns + """Matches endpoint for specified patterns Args: patterns (list[str]): endpoint regex pattern for matching endpoints @@ -44,7 +44,7 @@ def re_match(patterns: list[str], endpoint: str) -> bool: Exception: Any Exception occurred during test procedure. - ''' + """ for pattern in patterns: if re_search(pattern, endpoint): return True @@ -52,27 +52,27 @@ def re_match(patterns: list[str], endpoint: str) -> bool: return False actor_based_tests = [] - actors = test_data_config.get('actors', [{}]) + actors = test_data_config.get("actors", [{}]) actor_names = [] for actor in actors: actor_name = list(actor.keys())[-1] - unauth_endpoint_regex = actor[actor_name].get('unauthorized_endpoints', []) + unauth_endpoint_regex = actor[actor_name].get("unauthorized_endpoints", []) for result in results: - if result.get('test_actor_name') != actor_name: + if result.get("test_actor_name") != actor_name: continue - endpoint = result.get('endpoint', 'endpoint path not found') + endpoint = result.get("endpoint", "endpoint path not found") if not re_match(unauth_endpoint_regex, endpoint): continue actor_names.append(actor_name) actor_test_result = deepcopy(result) - actor_test_result['test_name'] = 'Broken Access Control' - actor_test_result['result_details'] = { - True: 'Endpoint might not vulnerable to BAC', # passed - False: f'BAC: Endpoint is accessible to {actor_name}', # failed + actor_test_result["test_name"] = "Broken Access Control" + actor_test_result["result_details"] = { + True: "Endpoint might not vulnerable to BAC", # passed + False: f"BAC: Endpoint is accessible to {actor_name}", # failed } actor_based_tests.append(actor_test_result) @@ -80,7 +80,7 @@ def re_match(patterns: list[str], endpoint: str) -> bool: @staticmethod def detect_data_exposure(results: list[dict]) -> list[dict]: - '''Detects data exposure against sensitive data regex + """Detects data exposure against sensitive data regex patterns and returns dict of matched results Args: @@ -88,7 +88,7 @@ def detect_data_exposure(results: list[dict]) -> list[dict]: Returns: dict: dictionary with tag as dict key and matched pattern as dict value - ''' + """ def detect_exposure(data: str) -> dict: # Dictionary to store detected data exposures @@ -103,9 +103,9 @@ def detect_exposure(data: str) -> dict: new_results = [] for result in results: - res_body = result.get('response_body') + res_body = result.get("response_body") data_exposures_dict = detect_exposure(str(res_body)) - result['data_leak'] = data_exposures_dict + result["data_leak"] = data_exposures_dict new_results.append(result) return new_results @@ -117,8 +117,8 @@ def filter_status_code_based_results(results: list[dict]) -> list[dict]: for result in results: new_result = deepcopy(result) - response_status_code = result.get('response_status_code') - success_codes = result.get('success_codes') + response_status_code = result.get("response_status_code") + success_codes = result.get("success_codes") # if response status code or success code is not # found then continue updating status of remaining @@ -131,7 +131,7 @@ def filter_status_code_based_results(results: list[dict]) -> list[dict]: else: res_status = True # test passed - new_result['result'] = res_status + new_result["result"] = res_status # new_result['result_details'] = result['result_details'].get(res_status) @@ -144,8 +144,8 @@ def update_result_details(results: list[dict]): new_results = [] for result in results: new_result = deepcopy(result) - new_result['result_details'] = result['result_details'].get( - result['result'] + new_result["result_details"] = result["result_details"].get( + result["result"] ) new_results.append(new_result) @@ -154,7 +154,7 @@ def update_result_details(results: list[dict]): @staticmethod def matcher(results: list[dict]): - ''' + """ Args: results (list[dict]): list of dict for tests results ran @@ -168,12 +168,12 @@ def matcher(results: list[dict]): Raises: Any Exception occurred during the test. - ''' + """ new_results = [] for result in results: - match_location = result.get('response_filter') - match_regex = result.get('response_match_regex') + match_location = result.get("response_filter") + match_regex = result.get("response_match_regex") # skip test if match regex not found if not match_regex or not match_location: @@ -181,17 +181,17 @@ def matcher(results: list[dict]): match match_location: case PostTestFiltersEnum.STATUS_CODE_FILTER: - target_data = result.get('response_status_code') + target_data = result.get("response_status_code") case PostTestFiltersEnum.HEADER_REGEX_FILTER: - target_data = result.get('response_body') + target_data = result.get("response_body") case _: # PostTestFiltersEnum.BODY_REGEX_FILTER.name: - target_data = result.get('response_body') + target_data = result.get("response_body") match_response = re_search(match_regex, target_data) new_result = deepcopy(result) - new_result['regex_match_result'] = str(match_response) + new_result["regex_match_result"] = str(match_response) # None (no match) -> False (Vulnerable) -> Not False (not Vulnerable) - new_result['result'] = not bool(match_response) + new_result["result"] = not bool(match_response) new_results.append(new_result) return new_results diff --git a/src/offat/tester/regexs.py b/src/offat/tester/regexs.py index 4559749..e083489 100644 --- a/src/offat/tester/regexs.py +++ b/src/offat/tester/regexs.py @@ -1,32 +1,28 @@ sensitive_data_regex_patterns = { # General Data - 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b', + "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", # 'passwordOrToken': r'(^|\s|")(?=.*[A-Za-z])(?=.*\d)(?=.*[@$!%*#?&_])[A-Za-z\d@$!%*#?&_]{10,}($|\s|")', # Assuming the password contains at least 1 uppercase letter, 1 lowercase letter, 1 digit, 1 special character, and is at least 8 characters long. - 'date': r'\b\d{2}/\d{2}/\d{4}\b', - 'ip': r'(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b', - 'ccn': r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', - 'jwtToken': r'(^|\s|")[A-Za-z0-9_-]{2,}(?:\.[A-Za-z0-9_-]{2,}){2}($|\s|")', - 'ato_data': r'\b(auth_code|otp|password|password_hash|auth_token|access_token|refresh_token|secret|session_id|key|pin|accessToken|refreshToken|authenticationCode|authentication_code|jwt|api_secret|apiSecret)\b', - + "date": r"\b\d{2}/\d{2}/\d{4}\b", + "ip": r"(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b", + "ccn": r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", + "jwtToken": r'(^|\s|")[A-Za-z0-9_-]{2,}(?:\.[A-Za-z0-9_-]{2,}){2}($|\s|")', + "ato_data": r"\b(auth_code|otp|password|password_hash|auth_token|access_token|refresh_token|secret|session_id|key|pin|accessToken|refreshToken|authenticationCode|authentication_code|jwt|api_secret|apiSecret)\b", # BRAZIL - 'BrazilCPF': r'\b(\d{3}\.){2}\d{3}\-\d{2}\b', - + "BrazilCPF": r"\b(\d{3}\.){2}\d{3}\-\d{2}\b", # INDIA # Assuming the format: AAAAB1234C (5 uppercase letters, 4 digits, 1 uppercase letter) - 'pan': r'\b[A-Z]{5}\d{4}[A-Z]{1}\b', + "pan": r"\b[A-Z]{5}\d{4}[A-Z]{1}\b", # Assuming the format XXXX XXXX XXXX (4 digits, space, 4 digits, space, 4 digits) - 'aadhaarCard': r'\b\d{4}\s\d{4}\s\d{4}\b', - 'PhoneNumberIN': r'((\+*)((0[ -]*)*|((91 )*))((\d{12})+|(\d{10})+))|\d{5}([- ]*)\d{6}', - + "aadhaarCard": r"\b\d{4}\s\d{4}\s\d{4}\b", + "PhoneNumberIN": r"((\+*)((0[ -]*)*|((91 )*))((\d{12})+|(\d{10})+))|\d{5}([- ]*)\d{6}", # US - 'ssn': r'\b\d{3}-\d{2}-\d{4}\b', - 'PhoneNumberUS': r'(^|\s|")(1\s?)?(\d{3}|\(\d{3}\))[\s\-]?\d{3}[\s\-]?\d{4}(?:$|\s|")', - + "ssn": r"\b\d{3}-\d{2}-\d{4}\b", + "PhoneNumberUS": r'(^|\s|")(1\s?)?(\d{3}|\(\d{3}\))[\s\-]?\d{3}[\s\-]?\d{4}(?:$|\s|")', # AWS # Assuming the format: AKIA followed by 16 uppercase alphanumeric characters - 'AWSAccessKey': r'\bAKIA[0-9A-Z]{16}\b', + "AWSAccessKey": r"\bAKIA[0-9A-Z]{16}\b", # Assuming the format: 40 alphanumeric characters, including + and / - 'AWSSecretKey': r'\b[0-9a-zA-Z/+]{40}\b', - 'AWSResourceURL': r'\b([A-Za-z0-9-_]*\.[A-Za-z0-9-_]*\.amazonaws.com*)\b', - 'AWSArnId': r'\barn:aws:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-/_]*\b', + "AWSSecretKey": r"\b[0-9a-zA-Z/+]{40}\b", + "AWSResourceURL": r"\b([A-Za-z0-9-_]*\.[A-Za-z0-9-_]*\.amazonaws.com*)\b", + "AWSArnId": r"\barn:aws:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-/_]*\b", } diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index d7fbde4..f96470d 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -232,12 +232,12 @@ def generate_and_run_tests( # XSS/HTML Injection Fuzz Test test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body' # noqa: E501 logger.info(test_name) - os_command_injection_tests = test_generator.xss_html_injection_fuzz_params_test( + xss_injection_tests = test_generator.xss_html_injection_fuzz_params_test( api_parser ) results += run_test( test_runner=test_runner, - tests=os_command_injection_tests, + tests=xss_injection_tests, regex_pattern=regex_pattern, post_run_matcher_test=True, description='(FUZZED) Checking for XSS/HTML Injection', diff --git a/src/offat/tests/utils/test_parse_server_url.py b/src/offat/tests/utils/test_parse_server_url.py index fe9c81c..dfea3e6 100644 --- a/src/offat/tests/utils/test_parse_server_url.py +++ b/src/offat/tests/utils/test_parse_server_url.py @@ -6,30 +6,30 @@ class TestParseUrls(unittest.TestCase): def test_valid_urls(self): urls = [ - 'https://example.com', - 'https://owasp.org/OFFAT/', - 'http://localhost:8000/test', - 'http://127.0.0.1:8001/url/1', + "https://example.com", + "https://owasp.org/OFFAT/", + "http://localhost:8000/test", + "http://127.0.0.1:8001/url/1", ] for url in urls: scheme, host, port, basepath = parse_server_url(url=url) self.assertIn( - scheme, ['http', 'https'], f'Failed to validate url scheme: {url}' + scheme, ["http", "https"], f"Failed to validate url scheme: {url}" ) self.assertIn( host, - ['example.com', 'owasp.org', 'localhost', '127.0.0.1'], - 'Host does not match expected test cases', + ["example.com", "owasp.org", "localhost", "127.0.0.1"], + "Host does not match expected test cases", ) self.assertIn( port, [80, 443, 8000, 8001], - 'Port does not match according to test case', + "Port does not match according to test case", ) - self.assertIn(basepath, ['', '/OFFAT/', '/test', '/url/1']) + self.assertIn(basepath, ["", "/OFFAT/", "/test", "/url/1"]) def test_invalid_urls(self): - urls = ['owasp', 'ftp://example/', '\0\0alkdsjlatest', '" OR 1==1 -- -'] + urls = ["owasp", "ftp://example/", "\0\0alkdsjlatest", '" OR 1==1 -- -'] for url in urls: with raises(ValueError): parse_server_url(url=url) diff --git a/src/offat/tests/utils/test_url_validations.py b/src/offat/tests/utils/test_url_validations.py index fb7209d..7f44fca 100644 --- a/src/offat/tests/utils/test_url_validations.py +++ b/src/offat/tests/utils/test_url_validations.py @@ -5,15 +5,15 @@ class TestUrls(unittest.TestCase): def test_valid_urls(self): urls = [ - 'https://example.com', - 'https://owasp.org/OFFAT/', - 'http://localhost:8000/test', - 'http://127.0.0.1:8001/url', + "https://example.com", + "https://owasp.org/OFFAT/", + "http://localhost:8000/test", + "http://127.0.0.1:8001/url", ] for url in urls: - self.assertTrue(is_valid_url(url=url), f'Failed to validate url: {url}') + self.assertTrue(is_valid_url(url=url), f"Failed to validate url: {url}") def test_invalid_urls(self): - urls = ['owasp', 'ftp://example/', '\0\0alkdsjlatest', '" OR 1==1 -- -'] + urls = ["owasp", "ftp://example/", "\0\0alkdsjlatest", '" OR 1==1 -- -'] for url in urls: assert is_valid_url(url=url) is False