Skip to content

Commit

Permalink
Merge pull request #69 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev RELEASE: v0.16.0
  • Loading branch information
dmdhrumilmistry committed Mar 26, 2024
2 parents 1eaadfc + 182313c commit 708deca
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 143 deletions.
118 changes: 94 additions & 24 deletions src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@


def banner():
print(r'''
print(
r'''
_/| |\_
/ | | \
| \ / |
Expand All @@ -19,37 +20,105 @@ def banner():
\_ \ / _/
\__ | __/
\ _ /
_/ \_
/ _/|\_ \
/ | \
_/ \_
/ _/|\_ \
/ | \
/ v \
OFFAT
''')
'''
)


def start():
'''Starts cli tool'''
banner()

parser = ArgumentParser(prog='offat')
parser.add_argument('-f', '--file', dest='fpath', type=str,
help='path or url of openapi/swagger specification file', required=True)
parser.add_argument('-v', '--version', action='version',
version=f'%(prog)s {get_package_version()}')
parser.add_argument('-rl', '--rate-limit', dest='rate_limit',
help='API requests rate limit per second', type=float, default=60, required=False)
parser.add_argument('-pr', '--path-regex', dest='path_regex_pattern', type=str,
help='run tests for paths matching given regex pattern', required=False, default=None)
parser.add_argument('-o', '--output', dest='output_file', type=str,
help='path to store test results in specified format. Default format is html', required=False, default=None)
parser.add_argument('-of', '--format', dest='output_format', type=str, choices=[
'json', 'yaml', 'html', 'table'], help='Data format to save (json, yaml, html, table). Default: table', required=False, default='table')
parser.add_argument('-H', '--headers', dest='headers', type=str,
help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*')
parser.add_argument('-tdc', '--test-data-config', dest='test_data_config',
help='YAML file containing user test data for tests', required=False, type=str)
parser.add_argument('-p', '--proxy', dest='proxy',
help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str)
parser.add_argument(
'-f',
'--file',
dest='fpath',
type=str,
help='path or url of openapi/swagger specification file',
required=True,
)
parser.add_argument(
'-v', '--version', action='version', version=f'%(prog)s {get_package_version()}'
)
parser.add_argument(
'-rl',
'--rate-limit',
dest='rate_limit',
help='API requests rate limit per second',
type=float,
default=60,
required=False,
)
parser.add_argument(
'-pr',
'--path-regex',
dest='path_regex_pattern',
type=str,
help='run tests for paths matching given regex pattern',
required=False,
default=None,
)
parser.add_argument(
'-o',
'--output',
dest='output_file',
type=str,
help='path to store test results in specified format. Default format is html',
required=False,
default=None,
)
parser.add_argument(
'-of',
'--format',
dest='output_format',
type=str,
choices=['json', 'yaml', 'html', 'table'],
help='Data format to save (json, yaml, html, table). Default: table',
required=False,
default='table',
)
parser.add_argument(
'-H',
'--headers',
dest='headers',
type=str,
help='HTTP requests headers that should be sent during testing eg: User-Agent: offat',
required=False,
default=None,
action='append',
nargs='*',
)
parser.add_argument(
'-tdc',
'--test-data-config',
dest='test_data_config',
help='YAML file containing user test data for tests',
required=False,
type=str,
)
parser.add_argument(
'-p',
'--proxy',
dest='proxies_list',
help='Proxy server URL to route HTTP requests through (e.g. "http://proxyserver:port")',
action='append',
required=False,
type=str,
default=None,
)
parser.add_argument(
'-s',
'--ssl',
dest='ssl',
required=False,
action='store_true',
help='Enable SSL Verification',
)
args = parser.parse_args()

# convert req headers str to dict
Expand All @@ -75,7 +144,8 @@ def start():
req_headers=headers_dict,
rate_limit=rate_limit,
test_data_config=test_data_config,
proxy=args.proxy,
proxies=args.proxies_list,
ssl=args.ssl,
)


Expand Down
51 changes: 45 additions & 6 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@
from .logger import logger


def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]:
"""
Update values in list1 based on the corresponding "name" values in list2.
Args:
list1 (list of dict): The list of dictionaries to be updated.
list2 (list of dict): The list of dictionaries containing values to update from.
Returns:
list of dict: The updated list1 with values from list2.
Example:
```python
list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': 'old@example.com'}]
list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}]
updated_list = update_values(list1, list2)
print(updated_list)
# Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}]
```
"""
# Create a dictionary for faster lookup
lookup_dict = {item['name']: item['value'] for item in list2}

# Update values in list1 using index lookup
for item in list1:
if item['name'] in lookup_dict:
item['value'] = lookup_dict[item['name']]

return list1


def validate_config_file_data(test_config_data: dict):
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
Expand All @@ -11,7 +42,9 @@ def validate_config_file_data(test_config_data: dict):
logger.warning('Error Occurred While reading file: %s', test_config_data)
return False

if not test_config_data.get('actors', ):
if not test_config_data.get(
'actors',
):
logger.warning('actors are required')
return False

Expand All @@ -36,15 +69,21 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
request_headers[header.get('name')] = header.get('value')

for test in tests:
# replace key and value instead of appending
test['body_params'] += body_params
test['query_params'] += query_params
test['path_params'] += path_params
test['body_params'] = overwrite_user_params(
deepcopy(test['body_params']), body_params
)
test['query_params'] = overwrite_user_params(
deepcopy(test['query_params']), query_params
)
test['path_params'] += overwrite_user_params(
deepcopy(test['path_params']), path_params
)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
test['kwargs']['headers'] = dict(
test['kwargs']['headers'], **request_headers)
test['kwargs']['headers'], **request_headers
)
else:
test['kwargs']['headers'] = request_headers

Expand Down
94 changes: 79 additions & 15 deletions src/offat/http.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
'''
module for interacting with HTTP layer
'''
from random import choice
from os import name as os_name
from urllib.parse import urlparse

from aiohttp import ClientSession, ClientTimeout
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, retry_if_not_exception_type
Expand All @@ -11,12 +17,54 @@
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


class Proxies:
'''
class for handling proxies
'''

def __init__(self, proxies: list[str] | None) -> None:
self.p_list = proxies

def validate_proxy(self, proxy_url: str | None):
"""
Validates a proxy URL based on format and attempts a basic connection.
Args:
proxy_url: The URL of the proxy server.
Returns:
True if the proxy URL seems valid and a basic connection can be established, False otherwise.
"""
# Check for valid URL format
parsed_url = urlparse(proxy_url)
if all([parsed_url.scheme, parsed_url.netloc]):
return True

return False

def get_random_proxy(self) -> str | None:
'''
Returns random proxy from the list
'''
if not self.p_list:
return None
return choice(self.p_list)


class AsyncRequests:
'''
AsyncRequests class helps to send HTTP requests with rate limiting options.
'''

def __init__(self, rate_limit: float = 50, headers: dict | None = None, proxy: str | None = None, allow_redirects: bool = True, timeout: float = 60) -> None:
def __init__(
self,
rate_limit: float = 50,
headers: dict | None = None,
proxies: list[str] | None = [],
allow_redirects: bool = True,
timeout: float = 60,
ssl: bool = False,
) -> None:
'''AsyncRequests class constructor
Args:
Expand All @@ -25,30 +73,39 @@ def __init__(self, rate_limit: float = 50, headers: dict | None = None, proxy: s
headers (dict): overrides default headers while sending HTTP requests
proxy (str): proxy URL to be used while sending requests
timeout (float): total timeout parameter of aiohttp.ClientTimeout
ssl (bool): enforces tls/ssl verification if True
Returns:
None
'''
self._headers = headers
self._proxy = proxy if proxy else None
self._proxy = Proxies(proxies=proxies)
self._allow_redirects = allow_redirects
self._limiter = AsyncLimiter(max_rate=rate_limit, time_period=1)
self._timeout = ClientTimeout(total=timeout)
self._ssl = ssl

@retry(stop=stop_after_attempt(3), retry=retry_if_not_exception_type(KeyboardInterrupt or asyncio.exceptions.CancelledError))
async def request(self, url: str, method: str = 'GET', *args, **kwargs) -> dict:
@retry(
stop=stop_after_attempt(3),
retry=retry_if_not_exception_type(
KeyboardInterrupt or asyncio.exceptions.CancelledError
),
)
async def request(self, url: str, *args, method: str = 'GET', **kwargs) -> dict:
'''Send HTTP requests asynchronously
Args:
url (str): URL of the webpage/endpoint
method (str): HTTP methods (default: GET) supports GET, POST,
method (str): HTTP methods (default: GET) supports GET, POST,
PUT, HEAD, OPTIONS, DELETE
Returns:
dict: returns request and response data as dict
'''
async with self._limiter:
async with ClientSession(headers=self._headers, timeout=self._timeout) as session:
async with ClientSession(
headers=self._headers, timeout=self._timeout
) as session:
method = str(method).upper()
match method:
case 'GET':
Expand All @@ -68,16 +125,23 @@ async def request(self, url: str, method: str = 'GET', *args, **kwargs) -> dict:
case _:
req_method = session.get

async with req_method(url, proxy=self._proxy, allow_redirects=self._allow_redirects, *args, **kwargs) as response:
async with req_method(
url,
allow_redirects=self._allow_redirects,
proxy=self._proxy.get_random_proxy(),
ssl=self._ssl,
*args,
**kwargs,
) as response:
resp_data = {
"status": response.status,
"req_url": str(response.request_info.real_url),
"query_url": str(response.url),
"req_method": response.request_info.method,
"req_headers": dict(**response.request_info.headers),
"res_redirection": str(response.history),
"res_headers": dict(response.headers),
"res_body": await response.text(),
'status': response.status,
'req_url': str(response.request_info.real_url),
'query_url': str(response.url),
'req_method': response.request_info.method,
'req_headers': dict(**response.request_info.headers),
'res_redirection': str(response.history),
'res_headers': dict(response.headers),
'res_body': await response.text(),
}

return resp_data

0 comments on commit 708deca

Please sign in to comment.