Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'black', src/offat/tester/tester_utils.py wrong parameter #89

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 55 additions & 55 deletions src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

def banner():
print(
r'''
r"""
_/| |\_
/ | | \
| \ / |
Expand All @@ -25,114 +25,114 @@ def banner():
/ | \
/ v \
OFFAT
'''
"""
)


def start():
'''Starts cli tool'''
"""Starts cli tool"""
banner()

parser = ArgumentParser(prog='offat')
parser = ArgumentParser(prog="offat")
parser.add_argument(
'-f',
'--file',
dest='fpath',
"-f",
"--file",
dest="fpath",
type=str,
help='path or url of openapi/swagger specification file',
help="path or url of openapi/swagger specification file",
required=True,
)
parser.add_argument(
'-v', '--version', action='version', version=f'%(prog)s {get_package_version()}'
"-v", "--version", action="version", version=f"%(prog)s {get_package_version()}"
)
parser.add_argument(
'-rl',
'--rate-limit',
dest='rate_limit',
help='API requests rate limit per second',
"-rl",
"--rate-limit",
dest="rate_limit",
help="API requests rate limit per second",
type=float,
default=60,
required=False,
)
parser.add_argument(
'-pr',
'--path-regex',
dest='path_regex_pattern',
"-pr",
"--path-regex",
dest="path_regex_pattern",
type=str,
help='run tests for paths matching given regex pattern',
help="run tests for paths matching given regex pattern",
required=False,
default=None,
)
parser.add_argument(
'-o',
'--output',
dest='output_file',
"-o",
"--output",
dest="output_file",
type=str,
help='path to store test results',
help="path to store test results",
required=False,
default=None,
)
parser.add_argument(
'-of',
'--format',
dest='output_format',
"-of",
"--format",
dest="output_format",
type=str,
choices=['json', 'yaml', 'html', 'table'],
help='Data format to save (json, yaml, html, table). Default: table',
choices=["json", "yaml", "html", "table"],
help="Data format to save (json, yaml, html, table). Default: table",
required=False,
default='table',
default="table",
)
parser.add_argument(
'-H',
'--headers',
dest='headers',
"-H",
"--headers",
dest="headers",
type=str,
help='HTTP requests headers that should be sent during testing eg: User-Agent: offat',
help="HTTP requests headers that should be sent during testing eg: User-Agent: offat",
required=False,
default=None,
action='append',
nargs='*',
action="append",
nargs="*",
)
parser.add_argument(
'-tdc',
'--test-data-config',
dest='test_data_config',
help='YAML file containing user test data for tests',
"-tdc",
"--test-data-config",
dest="test_data_config",
help="YAML file containing user test data for tests",
required=False,
type=str,
)
parser.add_argument(
'-p',
'--proxy',
dest='proxies_list',
"-p",
"--proxy",
dest="proxies_list",
help='Proxy server URL to route HTTP requests through (e.g. "http://proxyserver:port")',
action='append',
action="append",
required=False,
type=str,
default=None,
)
parser.add_argument(
'-s',
'--ssl',
dest='ssl',
"-s",
"--ssl",
dest="ssl",
required=False,
action='store_true',
help='Enable SSL Verification',
action="store_true",
help="Enable SSL Verification",
)
parser.add_argument(
'-cf',
'--capture-failed',
dest='capture_failed',
action='store_true',
help='Captures failed requests due to any exceptions into output file',
"-cf",
"--capture-failed",
dest="capture_failed",
action="store_true",
help="Captures failed requests due to any exceptions into output file",
)
parser.add_argument(
'--server',
dest='server_url',
"--server",
dest="server_url",
type=str,
default=None,
required=False,
help='server/host base url to overwrite from OAS/Swagger file',
help="server/host base url to overwrite from OAS/Swagger file",
)
args = parser.parse_args()

Expand Down Expand Up @@ -165,5 +165,5 @@ def start():
)


if __name__ == '__main__':
if __name__ == "__main__":
start()
8 changes: 4 additions & 4 deletions src/offat/api/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
def get_offat_installation_dir():
try:
# For non-editable installation
return importlib.resources.files('offat')
return importlib.resources.files("offat")
except ImportError:
# For editable installation (pip install -e .)
return importlib.resources.files('.')
return importlib.resources.files(".")


def start():
installation_dir = get_offat_installation_dir()
run(
app='offat.api.app:app',
app="offat.api.app:app",
host="0.0.0.0",
port=8000,
workers=2,
Expand All @@ -23,5 +23,5 @@ def start():
)


if __name__ == '__main__':
if __name__ == "__main__":
start()
50 changes: 26 additions & 24 deletions src/offat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
from offat.api.jobs import scan_api
from offat.api.models import CreateScanModel
from offat.logger import logger

# from os import uname, environ


logger.info('Secret Key: %s', auth_secret_key)
logger.info("Secret Key: %s", auth_secret_key)


# if uname().sysname == 'Darwin' and environ.get('OBJC_DISABLE_INITIALIZE_FORK_SAFETY') != 'YES':
# logger.warning('Mac Users might need to configure OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in env\nVisit StackOverFlow link for more info: https://stackoverflow.com/questions/50168647/multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progr')


@app.get('/', status_code=status.HTTP_200_OK)
@app.get("/", status_code=status.HTTP_200_OK)
async def root():
return {
"name": "OFFAT API",
Expand All @@ -22,63 +23,64 @@ async def root():
}


@app.post('/api/v1/scan', status_code=status.HTTP_201_CREATED)
async def add_scan_task(scan_data: CreateScanModel, request: Request, response: Response):
# for auth
@app.post("/api/v1/scan", status_code=status.HTTP_201_CREATED)
async def add_scan_task(
scan_data: CreateScanModel, request: Request, response: Response
):
# for auth
client_ip = request.client.host
secret_key = request.headers.get('SECRET-KEY', None)
secret_key = request.headers.get("SECRET-KEY", None)
if secret_key != auth_secret_key:
# return 404 for better endpoint security
response.status_code = status.HTTP_401_UNAUTHORIZED
logger.warning('INTRUSION: %s tried to create a new scan job', client_ip)
logger.warning("INTRUSION: %s tried to create a new scan job", client_ip)
return {"message": "Unauthorized"}

msg = {
"msg": "Scan Task Created",
"job_id": None
}
msg = {"msg": "Scan Task Created", "job_id": None}

job = task_queue.enqueue(scan_api, scan_data, job_timeout=task_timeout)
msg['job_id'] = job.id
job = task_queue.enqueue(scan_api, scan_data, job_timeout=task_timeout)
msg["job_id"] = job.id

logger.info('SUCCESS: %s created new scan job - %s', client_ip, job.id)
logger.info("SUCCESS: %s created new scan job - %s", client_ip, job.id)

return msg


@app.get('/api/v1/scan/{job_id}/result')
@app.get("/api/v1/scan/{job_id}/result")
async def get_scan_task_result(job_id: str, request: Request, response: Response):
# for auth
client_ip = request.client.host
secret_key = request.headers.get('SECRET-KEY', None)
secret_key = request.headers.get("SECRET-KEY", None)
if secret_key != auth_secret_key:
# return 404 for better endpoint security
response.status_code = status.HTTP_401_UNAUTHORIZED
logger.warning('INTRUSION: %s tried to access %s job scan results', client_ip, job_id)
logger.warning(
"INTRUSION: %s tried to access %s job scan results", client_ip, job_id
)
return {"message": "Unauthorized"}

scan_results_job = task_queue.fetch_job(job_id=job_id)

logger.info('SUCCESS: %s accessed %s job scan results', client_ip, job_id)
logger.info("SUCCESS: %s accessed %s job scan results", client_ip, job_id)

msg = 'Task Remaining or Invalid Job Id'
msg = "Task Remaining or Invalid Job Id"
results = None
response.status_code = status.HTTP_202_ACCEPTED

if scan_results_job and scan_results_job.is_started:
msg = 'Job In Progress'
msg = "Job In Progress"

elif scan_results_job and scan_results_job.is_finished:
msg = 'Task Completed'
msg = "Task Completed"
results = scan_results_job.result
response.status_code = status.HTTP_200_OK

elif scan_results_job and scan_results_job.is_failed:
msg = 'Task Failed. Try Creating Task Again.'
msg = "Task Failed. Try Creating Task Again."
response.status_code = status.HTTP_200_OK

msg = {
'msg': msg,
'results': results,
"msg": msg,
"results": results,
}
return msg
2 changes: 1 addition & 1 deletion src/offat/api/auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ def generate_random_secret_key_string(length=128):
characters = string.ascii_letters + string.digits + "-_."

# Generate a random string of the specified length
random_string = ''.join(secrets.choice(characters) for _ in range(length))
random_string = "".join(secrets.choice(characters) for _ in range(length))

return random_string
21 changes: 12 additions & 9 deletions src/offat/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
load_dotenv()

app = FastAPI(
title='OFFAT - API',
servers=[{
'url':'http://localhost:8000',
}],
title="OFFAT - API",
servers=[
{
"url": "http://localhost:8000",
}
],
)

auth_secret_key = environ.get(
'AUTH_SECRET_KEY', generate_random_secret_key_string())
redis_con = Redis(host=environ.get('REDIS_HOST', 'localhost'),
port=int(environ.get('REDIS_PORT', 6379)))
task_queue = Queue(name='offat_task_queue', connection=redis_con)
auth_secret_key = environ.get("AUTH_SECRET_KEY", generate_random_secret_key_string())
redis_con = Redis(
host=environ.get("REDIS_HOST", "localhost"),
port=int(environ.get("REDIS_PORT", 6379)),
)
task_queue = Queue(name="offat_task_queue", connection=redis_con)
task_timeout = 60 * 60 # 3600 s = 1 hour
4 changes: 2 additions & 2 deletions src/offat/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ def scan_api(body_data: CreateScanModel):
)
return results
except Exception as e:
logger.error('Error occurred while creating a job: %s', repr(e))
logger.error("Error occurred while creating a job: %s", repr(e))
logger.debug("Debug Data:", exc_info=exc_info())
return [{'error': str(e)}]
return [{"error": str(e)}]
Loading