diff --git a/cli/auth/auth_command.py b/cli/auth/auth_command.py new file mode 100644 index 00000000..d6ad272b --- /dev/null +++ b/cli/auth/auth_command.py @@ -0,0 +1,34 @@ +import click +import traceback +from cli.auth.auth_manager import AuthManager +from cli.exceptions.custom_exceptions import AuthProcessError, CycodeError +from cyclient import logger + + +@click.command() +@click.pass_context +def authenticate(context: click.Context): + """ Initial command to authenticate your CLI - TODO better text """ + try: + logger.debug("starting authentication process") + auth_manager = AuthManager() + auth_manager.authenticate() + click.echo("success TODO TEXT") + except Exception as e: + _handle_exception(context, e) + + +def _handle_exception(context: click.Context, e: Exception): + verbose = context.obj["verbose"] + if verbose: + click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False) + if isinstance(e, AuthProcessError): + click.secho('Authentication process has failed. Please try again by executing the `cycode auth` command', + fg='red', nl=False) + elif isinstance(e, CycodeError): + click.secho('TBD message. Please try again by executing the `cycode auth` command', + fg='red', nl=False) + elif isinstance(e, click.ClickException): + raise e + else: + raise click.ClickException(str(e)) diff --git a/cli/auth/auth_manager.py b/cli/auth/auth_manager.py index c82b2417..b2a38952 100644 --- a/cli/auth/auth_manager.py +++ b/cli/auth/auth_manager.py @@ -1,17 +1,101 @@ +import time +import webbrowser +from requests import Request +from typing import Optional +from cli.exceptions.custom_exceptions import AuthProcessError from cli.utils.string_utils import generate_random_string, hash_string_to_sha256 from cli.user_settings.configuration_manager import ConfigurationManager +from cli.user_settings.credentials_manager import CredentialsManager +from cyclient.auth_client import AuthClient +from cyclient.models import ApiToken, ApiTokenGenerationPollingResponse +from cyclient import logger class AuthManager: - CODE_VERIFIER_LENGTH = 101 + POLLING_WAIT_INTERVAL_IN_SECONDS = 3 + POLLING_TIMEOUT_IN_SECONDS = 180 + FAILED_POLLING_STATUS = "Error" + COMPLETED_POLLING_STATUS = "Completed" configuration_manager: ConfigurationManager + credentials_manager: CredentialsManager + auth_client: AuthClient def __init__(self): self.configuration_manager = ConfigurationManager() + self.credentials_manager = CredentialsManager() + self.auth_client = AuthClient() + + def authenticate(self): + logger.debug('generating pkce code pair') + code_challenge, code_verifier = self._generate_pkce_code_pair() + + logger.debug('starting authentication session') + session_id = self.start_session(code_challenge) + logger.debug('authentication session created, %s', {'session_id': session_id}) + + logger.debug('opening browser and redirecting to cycode login page') + self.redirect_to_login_page(code_challenge, session_id) + + logger.debug('starting get api token process') + api_token = self.get_api_token(session_id, code_verifier) + + logger.debug('saving get api token') + self.save_api_token(api_token) + + def start_session(self, code_challenge: str): + auth_session = self.auth_client.start_session(code_challenge) + return auth_session.session_id + + def redirect_to_login_page(self, code_challenge: str, session_id: str): + login_url = self._build_login_url(code_challenge, session_id) + webbrowser.open(login_url) - def generate_pkce_code_pair(self) -> (str, str): + def get_api_token(self, session_id: str, code_verifier: str) -> Optional[ApiToken]: + api_token = self.get_api_token_polling(session_id, code_verifier) + if api_token is None: + raise AuthProcessError("getting api token is completed, but the token is missing") + return api_token + + def get_api_token_polling(self, session_id: str, code_verifier: str) -> Optional[ApiToken]: + end_polling_time = time.time() + self.POLLING_TIMEOUT_IN_SECONDS + while time.time() < end_polling_time: + logger.debug('trying to get api token...') + api_token_polling_response = self.auth_client.get_api_token(session_id, code_verifier) + if self._is_api_token_process_completed(api_token_polling_response): + logger.debug('get api token process completed') + return api_token_polling_response.api_token + if self._is_api_token_process_failed(api_token_polling_response): + logger.debug('get api token process failed') + raise AuthProcessError('error during getting api token') + time.sleep(self.POLLING_WAIT_INTERVAL_IN_SECONDS) + + raise AuthProcessError('session expired') + + def save_api_token(self, api_token: ApiToken): + self.credentials_manager.update_credentials_file(api_token.client_id, api_token.secret) + + def _build_login_url(self, code_challenge: str, session_id: str): + app_url = self.configuration_manager.get_cycode_app_url() + login_url = f'{app_url}/account/login' + query_params = { + 'source': 'cycode_cli', + 'code_challenge': code_challenge, + 'session_id': session_id + } + request = Request(url=login_url, params=query_params) + return request.prepare().url + + def _generate_pkce_code_pair(self) -> (str, str): code_verifier = generate_random_string(self.CODE_VERIFIER_LENGTH) code_challenge = hash_string_to_sha256(code_verifier) - return code_challenge, code_verifier \ No newline at end of file + return code_challenge, code_verifier + + def _is_api_token_process_completed(self, api_token_polling_response: ApiTokenGenerationPollingResponse) -> bool: + return api_token_polling_response is not None \ + and api_token_polling_response.status == self.COMPLETED_POLLING_STATUS + + def _is_api_token_process_failed(self, api_token_polling_response: ApiTokenGenerationPollingResponse) -> bool: + return api_token_polling_response is not None \ + and api_token_polling_response.status == self.FAILED_POLLING_STATUS diff --git a/cli/cycode.py b/cli/cycode.py index 35fc6ba9..5deac5e4 100644 --- a/cli/cycode.py +++ b/cli/cycode.py @@ -3,10 +3,12 @@ import sys from cli.config import config from cli import code_scanner, __version__ -from cyclient import ScanClient, K8SUpdaterClient, logger +from cyclient import ScanClient, logger from cli.user_settings.credentials_manager import CredentialsManager from cli.user_settings.user_settings_commands import set_credentials, add_exclusions +from cli.auth.auth_command import authenticate from cli.user_settings.configuration_manager import ConfigurationManager +from cli.auth.auth_manager import AuthManager CONTEXT = dict() ISSUE_DETECTED_STATUS_CODE = 1 @@ -72,7 +74,7 @@ def code_scan(context: click.Context, scan_type, client_id, secret, show_secret, context.obj["scan_type"] = scan_type context.obj["output"] = output - context.obj["client"] = get_cycode_client(client_id, secret, "code_scan") + context.obj["client"] = get_cycode_client(client_id, secret) return 1 @@ -90,7 +92,8 @@ def finalize(context: click.Context, *args, **kwargs): commands={ "scan": code_scan, "configure": set_credentials, - "ignore": add_exclusions + "ignore": add_exclusions, + "auth": authenticate }, context_settings=CONTEXT ) @@ -108,7 +111,7 @@ def main_cli(context: click.Context, verbose: bool): logger.setLevel(log_level) -def get_cycode_client(client_id, client_secret, execution_type): +def get_cycode_client(client_id, client_secret): if not client_id or not client_secret: client_id, client_secret = _get_configured_credentials() if not client_id: @@ -116,10 +119,7 @@ def get_cycode_client(client_id, client_secret, execution_type): if not client_secret: raise click.ClickException("Cycode client secret is needed.") - if execution_type == "code_scan": - return ScanClient(client_secret=client_secret, client_id=client_id) - - return K8SUpdaterClient(client_secret=client_secret, client_id=client_id) + return ScanClient(client_secret=client_secret, client_id=client_id) def _get_configured_credentials(): diff --git a/cli/exceptions/custom_exceptions.py b/cli/exceptions/custom_exceptions.py index 79e32699..b33d9367 100644 --- a/cli/exceptions/custom_exceptions.py +++ b/cli/exceptions/custom_exceptions.py @@ -6,7 +6,7 @@ def __init__(self, status_code: int, error_message: str): super().__init__(self.error_message) def __str__(self): - return f'error occurred during scan request. status code: {self.status_code}, error message: ' \ + return f'error occurred during the request. status code: {self.status_code}, error message: ' \ f'{self.error_message}' @@ -27,3 +27,12 @@ def __init__(self, size_limit: int): def __str__(self): return f'The size of zip to scan is too large, size limit: {self.size_limit}' + + +class AuthProcessError(Exception): + def __init__(self, error_message: str): + self.error_message = error_message + super().__init__() + + def __str__(self): + return f'Something went wrong during the authentication process, error message: {self.error_message}' diff --git a/cyclient/__init__.py b/cyclient/__init__.py index 6854133c..64c57310 100644 --- a/cyclient/__init__.py +++ b/cyclient/__init__.py @@ -1,7 +1,6 @@ """Cycode Client""" -from .client import CycodeClient +from .cycode_client import CycodeClient from .scan_client import ScanClient -from .k8s_updater_client import K8SUpdaterClient from .config import logger __version__ = "0.0.15" @@ -9,6 +8,5 @@ __all__ = [ "CycodeClient", "ScanClient", - "K8SUpdaterClient", "logger" ] diff --git a/cyclient/auth_client.py b/cyclient/auth_client.py new file mode 100644 index 00000000..61d3765f --- /dev/null +++ b/cyclient/auth_client.py @@ -0,0 +1,46 @@ +import requests.exceptions +from requests import Response +from typing import Optional +from .cycode_client import CycodeClient +from . import models +from cli.exceptions.custom_exceptions import CycodeError + + +class AuthClient: + AUTH_CONTROLLER_PATH = 'api/v1/device-auth' + + def __init__(self): + self.cycode_client = CycodeClient() + + def start_session(self, code_challenge: str): + path = f"{self.AUTH_CONTROLLER_PATH}/start" + body = {'code_challenge': code_challenge} + try: + response = self.cycode_client.post(url_path=path, body=body) + return self.parse_start_session_response(response) + except requests.exceptions.Timeout as e: + raise CycodeError(504, e.response.text) + except requests.exceptions.HTTPError as e: + raise CycodeError(e.response.status_code, e.response.text) + + def get_api_token(self, session_id: str, code_verifier: str) -> Optional[models.ApiTokenGenerationPollingResponse]: + path = f"{self.AUTH_CONTROLLER_PATH}/token" + body = {'session_id': session_id, 'code_verifier': code_verifier} + try: + response = self.cycode_client.post(url_path=path, body=body) + return self.parse_api_token_polling_response(response) + except requests.exceptions.HTTPError as e: + return self.parse_api_token_polling_response(e.response) + except Exception as e: + return None + + @staticmethod + def parse_start_session_response(response: Response) -> models.AuthenticationSession: + return models.AuthenticationSessionSchema().load(response.json()) + + @staticmethod + def parse_api_token_polling_response(response: Response) -> Optional[models.ApiTokenGenerationPollingResponse]: + try: + return models.ApiTokenGenerationPollingResponseSchema().load(response.json()) + except Exception as e: + return None diff --git a/cyclient/client.py b/cyclient/client.py deleted file mode 100644 index ed5fa9a4..00000000 --- a/cyclient/client.py +++ /dev/null @@ -1,120 +0,0 @@ -import arrow -from threading import Lock -import requests.exceptions -from requests import Response, Session -from cyclient import config -from cli.exceptions.custom_exceptions import CycodeError, HttpUnauthorizedError - - -class CycodeClient: - - session: Session - - def __init__(self, client_id: str, client_secret: str): - """ - :param client_secret: the api token to added to the requests - :param base_url: the api base url - """ - self.init_session() - - self.client_secret = client_secret - self.client_id = client_id - self.timeout = config.timeout - - self.base_url = config.cycode_api_url - - self._api_token = None - self._expires_in = None - - self.lock = Lock() - - @property - def api_token(self) -> str: - with self.lock: - self.refresh_api_token_if_needed() - return self._api_token - - def refresh_api_token_if_needed(self) -> None: - if self._api_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in: - self.refresh_api_token() - - def refresh_api_token(self) -> None: - try: - auth_response = self.session.post(f"{self.base_url}/api/v1/auth/api-token", - json={ - 'clientId': self.client_id, - 'secret': self.client_secret - }) - auth_response.raise_for_status() - except requests.exceptions.HTTPError as e: # 4xx/5xx status codes - self._handle_http_exception(e) - - auth_response_data = auth_response.json() - self._api_token = auth_response_data['token'] - self._expires_in = arrow.utcnow().shift( - seconds=auth_response_data['expires_in'] * 0.8) - - def init_session(self): - self.session = Session() - self.session.headers.update( - { - "User-Agent": "cycode-cli" - } - ) - - def execute( - self, - method: str, - endpoint: str, - **kwargs - ) -> Response: - - self.session.headers.update( - { - "Authorization": f"Bearer {self.api_token}", - } - ) - - url = f"{self.base_url}/{endpoint}" - - try: - response = self.session.request( - method=method, url=url, timeout=self.timeout, **kwargs - ) - response.raise_for_status() - return response - except requests.exceptions.Timeout: - raise CycodeError(504, "Timeout Error") - except requests.exceptions.HTTPError as e: # 4xx/5xx status codes - self._handle_http_exception(e) - - def post( - self, - url_path: str, - body: dict = None, - **kwargs - ) -> Response: - return self.execute( - method="post", endpoint=url_path, json=body, **kwargs) - - def put( - self, - url_path: str, - body: dict = None, - **kwargs - ) -> Response: - return self.execute( - method="put", endpoint=url_path, json=body, **kwargs) - - def get( - self, - url_path: str, - **kwargs - ) -> Response: - return self.execute(method="get", endpoint=url_path, **kwargs) - - def _handle_http_exception(self, e: requests.exceptions.HTTPError): - if e.response.status_code == 401: - raise HttpUnauthorizedError(e.response.text) - else: - raise CycodeError(e.response.status_code, e.response.text) \ No newline at end of file diff --git a/cyclient/cycode_client.py b/cyclient/cycode_client.py new file mode 100644 index 00000000..da3b73fe --- /dev/null +++ b/cyclient/cycode_client.py @@ -0,0 +1,63 @@ +from requests import Response, request +from cyclient import config + + +class CycodeClient: + + MANDATORY_HEADERS: dict = { + "User-Agent": "cycode-cli" + } + + def __init__(self): + self.timeout = config.timeout + self.api_url = config.cycode_api_url + + def post( + self, + url_path: str, + body: dict = None, + headers: dict = None, + **kwargs + ) -> Response: + return self._execute( + method="post", endpoint=url_path, json=body, headers=headers, **kwargs) + + def put( + self, + url_path: str, + body: dict = None, + headers: dict = None, + **kwargs + ) -> Response: + return self._execute( + method="put", endpoint=url_path, json=body, headers=headers, **kwargs) + + def get( + self, + url_path: str, + headers: dict = None, + **kwargs + ) -> Response: + return self._execute(method="get", endpoint=url_path, headers=headers, **kwargs) + + def _execute( + self, + method: str, + endpoint: str, + headers: dict = None, + **kwargs + ) -> Response: + + url = f"{self.api_url}/{endpoint}" + + response = request( + method=method, url=url, timeout=self.timeout, headers=self.get_request_headers(headers), **kwargs + ) + response.raise_for_status() + return response + + def get_request_headers(self, additional_headers: dict = None): + if additional_headers is None: + return self.MANDATORY_HEADERS + return {**self.MANDATORY_HEADERS, **additional_headers} + diff --git a/cyclient/cycode_token_based_client.py b/cyclient/cycode_token_based_client.py new file mode 100644 index 00000000..0a0da5d3 --- /dev/null +++ b/cyclient/cycode_token_based_client.py @@ -0,0 +1,51 @@ +import arrow +import requests +from threading import Lock +from cyclient.cycode_client import CycodeClient + +""" +Send requests with api token +""" +class CycodeTokenBasedClient(CycodeClient): + + def __init__(self, client_id: str, client_secret: str): + super().__init__() + self.client_secret = client_secret + self.client_id = client_id + + self._api_token = None + self._expires_in = None + + self.lock = Lock() + + @property + def api_token(self) -> str: + with self.lock: + self.refresh_api_token_if_needed() + return self._api_token + + def refresh_api_token_if_needed(self) -> None: + if self._api_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in: + self.refresh_api_token() + + def refresh_api_token(self) -> None: + auth_response = requests.post(f"{self.api_url}/api/v1/auth/api-token", + json={ + 'clientId': self.client_id, + 'secret': self.client_secret + }) + auth_response.raise_for_status() + + auth_response_data = auth_response.json() + self._api_token = auth_response_data['token'] + self._expires_in = arrow.utcnow().shift( + seconds=auth_response_data['expires_in'] * 0.8) + + def get_request_headers(self, additional_headers: dict = None): + headers = super().get_request_headers(additional_headers=additional_headers) + headers = self._add_auth_header(headers) + return headers + + def _add_auth_header(self, headers: dict): + headers['Authorization'] = f'Bearer {self.api_token}' + return headers diff --git a/cyclient/k8s_updater_client.py b/cyclient/k8s_updater_client.py deleted file mode 100644 index d700ec66..00000000 --- a/cyclient/k8s_updater_client.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import List - -from . import config -from .client import CycodeClient -from .config import get_logger -from .models import K8SResource, ResourcesCollection -from .utils import split_list - - -class K8SUpdaterClient: - - def __init__(self, client_id: str = None, client_secret: str = None): - self.cycode_client = CycodeClient(client_secret=client_secret, client_id=client_id) - self.base_path = 'api/v1' if config.dev_mode else 'kubernetes-connector/api/v1' - self.logger = get_logger(__name__) - - def publish_resources(self, resource_type: str, resources: List[K8SResource], namespace: str = None): - for batch in split_list(resources, config.batch_size): - self._send_batch(ResourcesCollection(resource_type, namespace, batch, len(resources))) - - def _send_batch(self, batch: ResourcesCollection): - path = f'{self.base_path}/resources' - try: - self.logger.debug(f'Publishing batch resources {batch.type}, {len(batch.resources)}/{batch.total_count}') - response = self.cycode_client.post(url_path=path, body=batch.to_json()) - response.raise_for_status() - except Exception as ex: - self.logger.exception( - f'Failed to publish resources. Type: {batch.type}, count: {len(batch.resources)}/{batch.total_count}', - ex) diff --git a/cyclient/models.py b/cyclient/models.py index a92860b4..ef228ef0 100644 --- a/cyclient/models.py +++ b/cyclient/models.py @@ -15,7 +15,8 @@ def __repr__(self) -> str: "type:{0}, " "message:{1}, " "detection_details: {2}" - "detection_rule_id:{3}".format(self.type, self.message, repr(self.detection_details), self.detection_rule_id) + "detection_rule_id:{3}".format(self.type, self.message, repr(self.detection_details), + self.detection_rule_id) ) @@ -53,7 +54,8 @@ def build_dto(self, data, **kwargs): class ZippedFileScanResult(Schema): - def __init__(self, did_detect: bool, detections_per_file: List[DetectionsPerFile], scan_id: str = None, err: str = None): + def __init__(self, did_detect: bool, detections_per_file: List[DetectionsPerFile], scan_id: str = None, + err: str = None): super().__init__() self.did_detect = did_detect self.detections_per_file = detections_per_file @@ -166,3 +168,60 @@ def __init__(self, name: str, kind: str): def __str__(self): return "Name: {0}, Kind: {1}".format(self.name, self.kind) + + +class AuthenticationSession(Schema): + def __init__(self, session_id: str): + super().__init__() + self.session_id = session_id + + +class AuthenticationSessionSchema(Schema): + class Meta: + unknown = EXCLUDE + + session_id = fields.String() + + @post_load + def build_dto(self, data, **kwargs): + return AuthenticationSession(**data) + + +class ApiToken(Schema): + def __init__(self, client_id: str, secret: str, description: str): + super().__init__() + self.client_id = client_id + self.secret = secret + self.description = description + + +class ApiTokenSchema(Schema): + class Meta: + unknown = EXCLUDE + + client_id = fields.String(data_key='clientId') + secret = fields.String() + description = fields.String() + + @post_load + def build_dto(self, data, **kwargs): + return ApiToken(**data) + + +class ApiTokenGenerationPollingResponse(Schema): + def __init__(self, status: str, api_token): + super().__init__() + self.status = status + self.api_token = api_token + + +class ApiTokenGenerationPollingResponseSchema(Schema): + class Meta: + unknown = EXCLUDE + + status = fields.String() + api_token = fields.Nested(ApiTokenSchema, allow_none=True) + + @post_load + def build_dto(self, data, **kwargs): + return ApiTokenGenerationPollingResponse(**data) \ No newline at end of file diff --git a/cyclient/scan_client.py b/cyclient/scan_client.py index 46cc4f20..e4b21272 100644 --- a/cyclient/scan_client.py +++ b/cyclient/scan_client.py @@ -1,54 +1,85 @@ -from . import models -from .client import CycodeClient +import requests.exceptions from requests import Response +from . import models +from .cycode_token_based_client import CycodeTokenBasedClient from cli.zip_file import InMemoryZip +from cli.exceptions.custom_exceptions import CycodeError, HttpUnauthorizedError class ScanClient: - def __init__(self, client_id: str = None, - client_secret: str = None): - self.cycode_client = CycodeClient(client_secret=client_secret, client_id=client_id) + SCAN_CONTROLLER_PATH = 'api/v1/scan' + + def __init__(self, client_id: str = None, client_secret: str = None): + self.cycode_client = CycodeTokenBasedClient(client_id, client_secret) def content_scan(self, scan_type: str, file_name: str, content: str, is_git_diff: bool = True): - path = f"{self.get_service_name(scan_type)}/api/v1/scan/content" + path = f"{self.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/content" body = {'name': file_name, 'content': content, 'is_git_diff': is_git_diff} - response = self.cycode_client.post(url_path=path, body=body) - return self.parse_scan_response(response) + try: + response = self.cycode_client.post(url_path=path, body=body) + return self.parse_scan_response(response) + except Exception as e: + self._handle_exception(e) def file_scan(self, scan_type: str, path: str) -> models.ScanResult: - url_path = f"{self.get_service_name(scan_type)}/api/v1/scan" + url_path = f"{self.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}" files = {'file': open(path, 'rb')} - response = self.cycode_client.post(url_path=url_path, files=files) - return self.parse_scan_response(response) + try: + response = self.cycode_client.post(url_path=url_path, files=files) + return self.parse_scan_response(response) + except Exception as e: + self._handle_exception(e) def zipped_file_scan(self, scan_type: str, zip_file: InMemoryZip, scan_id: str, is_git_diff: bool = False) -> models.ZippedFileScanResult: - url_path = f"{self.get_service_name(scan_type)}/api/v1/scan/zipped-file" + url_path = f"{self.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/zipped-file" files = {'file': ('multiple_files_scan.zip', zip_file.read())} - response = self.cycode_client.post(url_path=url_path, data={'scan_id': scan_id, 'is_git_diff': is_git_diff}, - files=files) - return self.parse_zipped_file_scan_response(response) + try: + response = self.cycode_client.post(url_path=url_path, data={'scan_id': scan_id, 'is_git_diff': is_git_diff}, + files=files) + return self.parse_zipped_file_scan_response(response) + except Exception as e: + self._handle_exception(e) def commit_range_zipped_file_scan(self, scan_type: str, zip_file: InMemoryZip, scan_id: str) -> models.ZippedFileScanResult: - url_path = f"{self.get_service_name(scan_type)}/api/v1/scan/commit-range-zipped-file" + url_path = f"{self.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/commit-range-zipped-file" files = {'file': ('multiple_files_scan.zip', zip_file.read())} - response = self.cycode_client.post(url_path=url_path, data={'scan_id': scan_id}, files=files) - return self.parse_zipped_file_scan_response(response) + try: + response = self.cycode_client.post(url_path=url_path, data={'scan_id': scan_id}, files=files) + return self.parse_zipped_file_scan_response(response) + except Exception as e: + self._handle_exception(e) def report_scan_status(self, scan_type: str, scan_id: str, scan_status: dict): - url_path = f"{self.get_service_name(scan_type)}/api/v1/scan/{scan_id}/status" - self.cycode_client.post(url_path=url_path, body=scan_status) + url_path = f"{self.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/{scan_id}/status" + try: + self.cycode_client.post(url_path=url_path, body=scan_status) + except Exception as e: + self._handle_exception(e) @staticmethod def parse_scan_response(response: Response) -> models.ScanResult: return models.ScanResultSchema().load(response.json()) @staticmethod - def parse_zipped_file_scan_response(response: Response): + def parse_zipped_file_scan_response(response: Response) -> models.ZippedFileScanResult: return models.ZippedFileScanResultSchema().load(response.json()) @staticmethod def get_service_name(scan_type): return 'secret' if scan_type == 'secret' else 'iac' + + def _handle_exception(self, e: Exception): + if isinstance(e, requests.exceptions.Timeout): + raise CycodeError(504, "Timeout Error") + elif isinstance(e, requests.exceptions.HTTPError): + self._handle_http_exception(e) + + @staticmethod + def _handle_http_exception(e: requests.exceptions.HTTPError): + if e.response.status_code == 401: + raise HttpUnauthorizedError(e.response.text) + else: + raise CycodeError(e.response.status_code, e.response.text) \ No newline at end of file