Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cli/auth/auth_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import click
import traceback
from cli.auth.auth_manager import AuthManager
from cli.exceptions.custom_exceptions import AuthProcessError, CycodeError
from cyclient import logger


@click.command()
@click.pass_context
def authenticate(context: click.Context):
""" Initial command to authenticate your CLI - TODO better text """
try:
logger.debug("starting authentication process")
auth_manager = AuthManager()
auth_manager.authenticate()
click.echo("success TODO TEXT")
except Exception as e:
_handle_exception(context, e)


def _handle_exception(context: click.Context, e: Exception):
verbose = context.obj["verbose"]
if verbose:
click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False)
if isinstance(e, AuthProcessError):
click.secho('Authentication process has failed. Please try again by executing the `cycode auth` command',
fg='red', nl=False)
elif isinstance(e, CycodeError):
click.secho('TBD message. Please try again by executing the `cycode auth` command',
fg='red', nl=False)
elif isinstance(e, click.ClickException):
raise e
else:
raise click.ClickException(str(e))
90 changes: 87 additions & 3 deletions cli/auth/auth_manager.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,101 @@
import time
import webbrowser
from requests import Request
from typing import Optional
from cli.exceptions.custom_exceptions import AuthProcessError
from cli.utils.string_utils import generate_random_string, hash_string_to_sha256
from cli.user_settings.configuration_manager import ConfigurationManager
from cli.user_settings.credentials_manager import CredentialsManager
from cyclient.auth_client import AuthClient
from cyclient.models import ApiToken, ApiTokenGenerationPollingResponse
from cyclient import logger


class AuthManager:

CODE_VERIFIER_LENGTH = 101
POLLING_WAIT_INTERVAL_IN_SECONDS = 3
POLLING_TIMEOUT_IN_SECONDS = 180
FAILED_POLLING_STATUS = "Error"
COMPLETED_POLLING_STATUS = "Completed"

configuration_manager: ConfigurationManager
credentials_manager: CredentialsManager
auth_client: AuthClient

def __init__(self):
self.configuration_manager = ConfigurationManager()
self.credentials_manager = CredentialsManager()
self.auth_client = AuthClient()

def authenticate(self):
logger.debug('generating pkce code pair')
code_challenge, code_verifier = self._generate_pkce_code_pair()

logger.debug('starting authentication session')
session_id = self.start_session(code_challenge)
logger.debug('authentication session created, %s', {'session_id': session_id})

logger.debug('opening browser and redirecting to cycode login page')
self.redirect_to_login_page(code_challenge, session_id)

logger.debug('starting get api token process')
api_token = self.get_api_token(session_id, code_verifier)

logger.debug('saving get api token')
self.save_api_token(api_token)

def start_session(self, code_challenge: str):
auth_session = self.auth_client.start_session(code_challenge)
return auth_session.session_id

def redirect_to_login_page(self, code_challenge: str, session_id: str):
login_url = self._build_login_url(code_challenge, session_id)
webbrowser.open(login_url)

def generate_pkce_code_pair(self) -> (str, str):
def get_api_token(self, session_id: str, code_verifier: str) -> Optional[ApiToken]:
api_token = self.get_api_token_polling(session_id, code_verifier)
if api_token is None:
raise AuthProcessError("getting api token is completed, but the token is missing")
return api_token

def get_api_token_polling(self, session_id: str, code_verifier: str) -> Optional[ApiToken]:
end_polling_time = time.time() + self.POLLING_TIMEOUT_IN_SECONDS
while time.time() < end_polling_time:
logger.debug('trying to get api token...')
api_token_polling_response = self.auth_client.get_api_token(session_id, code_verifier)
if self._is_api_token_process_completed(api_token_polling_response):
logger.debug('get api token process completed')
return api_token_polling_response.api_token
if self._is_api_token_process_failed(api_token_polling_response):
logger.debug('get api token process failed')
raise AuthProcessError('error during getting api token')
time.sleep(self.POLLING_WAIT_INTERVAL_IN_SECONDS)

raise AuthProcessError('session expired')

def save_api_token(self, api_token: ApiToken):
self.credentials_manager.update_credentials_file(api_token.client_id, api_token.secret)

def _build_login_url(self, code_challenge: str, session_id: str):
app_url = self.configuration_manager.get_cycode_app_url()
login_url = f'{app_url}/account/login'
query_params = {
'source': 'cycode_cli',
'code_challenge': code_challenge,
'session_id': session_id
}
request = Request(url=login_url, params=query_params)
return request.prepare().url

def _generate_pkce_code_pair(self) -> (str, str):
code_verifier = generate_random_string(self.CODE_VERIFIER_LENGTH)
code_challenge = hash_string_to_sha256(code_verifier)
return code_challenge, code_verifier
return code_challenge, code_verifier

def _is_api_token_process_completed(self, api_token_polling_response: ApiTokenGenerationPollingResponse) -> bool:
return api_token_polling_response is not None \
and api_token_polling_response.status == self.COMPLETED_POLLING_STATUS

def _is_api_token_process_failed(self, api_token_polling_response: ApiTokenGenerationPollingResponse) -> bool:
return api_token_polling_response is not None \
and api_token_polling_response.status == self.FAILED_POLLING_STATUS
16 changes: 8 additions & 8 deletions cli/cycode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import sys
from cli.config import config
from cli import code_scanner, __version__
from cyclient import ScanClient, K8SUpdaterClient, logger
from cyclient import ScanClient, logger
from cli.user_settings.credentials_manager import CredentialsManager
from cli.user_settings.user_settings_commands import set_credentials, add_exclusions
from cli.auth.auth_command import authenticate
from cli.user_settings.configuration_manager import ConfigurationManager
from cli.auth.auth_manager import AuthManager

CONTEXT = dict()
ISSUE_DETECTED_STATUS_CODE = 1
Expand Down Expand Up @@ -72,7 +74,7 @@ def code_scan(context: click.Context, scan_type, client_id, secret, show_secret,

context.obj["scan_type"] = scan_type
context.obj["output"] = output
context.obj["client"] = get_cycode_client(client_id, secret, "code_scan")
context.obj["client"] = get_cycode_client(client_id, secret)

return 1

Expand All @@ -90,7 +92,8 @@ def finalize(context: click.Context, *args, **kwargs):
commands={
"scan": code_scan,
"configure": set_credentials,
"ignore": add_exclusions
"ignore": add_exclusions,
"auth": authenticate
},
context_settings=CONTEXT
)
Expand All @@ -108,18 +111,15 @@ def main_cli(context: click.Context, verbose: bool):
logger.setLevel(log_level)


def get_cycode_client(client_id, client_secret, execution_type):
def get_cycode_client(client_id, client_secret):
if not client_id or not client_secret:
client_id, client_secret = _get_configured_credentials()
if not client_id:
raise click.ClickException("Cycode client id needed.")
if not client_secret:
raise click.ClickException("Cycode client secret is needed.")

if execution_type == "code_scan":
return ScanClient(client_secret=client_secret, client_id=client_id)

return K8SUpdaterClient(client_secret=client_secret, client_id=client_id)
return ScanClient(client_secret=client_secret, client_id=client_id)


def _get_configured_credentials():
Expand Down
11 changes: 10 additions & 1 deletion cli/exceptions/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def __init__(self, status_code: int, error_message: str):
super().__init__(self.error_message)

def __str__(self):
return f'error occurred during scan request. status code: {self.status_code}, error message: ' \
return f'error occurred during the request. status code: {self.status_code}, error message: ' \
f'{self.error_message}'


Expand All @@ -27,3 +27,12 @@ def __init__(self, size_limit: int):

def __str__(self):
return f'The size of zip to scan is too large, size limit: {self.size_limit}'


class AuthProcessError(Exception):
def __init__(self, error_message: str):
self.error_message = error_message
super().__init__()

def __str__(self):
return f'Something went wrong during the authentication process, error message: {self.error_message}'
4 changes: 1 addition & 3 deletions cyclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Cycode Client"""
from .client import CycodeClient
from .cycode_client import CycodeClient
from .scan_client import ScanClient
from .k8s_updater_client import K8SUpdaterClient
from .config import logger

__version__ = "0.0.15"

__all__ = [
"CycodeClient",
"ScanClient",
"K8SUpdaterClient",
"logger"
]
46 changes: 46 additions & 0 deletions cyclient/auth_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import requests.exceptions
from requests import Response
from typing import Optional
from .cycode_client import CycodeClient
from . import models
from cli.exceptions.custom_exceptions import CycodeError


class AuthClient:
AUTH_CONTROLLER_PATH = 'api/v1/device-auth'

def __init__(self):
self.cycode_client = CycodeClient()

def start_session(self, code_challenge: str):
path = f"{self.AUTH_CONTROLLER_PATH}/start"
body = {'code_challenge': code_challenge}
try:
response = self.cycode_client.post(url_path=path, body=body)
return self.parse_start_session_response(response)
except requests.exceptions.Timeout as e:
raise CycodeError(504, e.response.text)
except requests.exceptions.HTTPError as e:
raise CycodeError(e.response.status_code, e.response.text)

def get_api_token(self, session_id: str, code_verifier: str) -> Optional[models.ApiTokenGenerationPollingResponse]:
path = f"{self.AUTH_CONTROLLER_PATH}/token"
body = {'session_id': session_id, 'code_verifier': code_verifier}
try:
response = self.cycode_client.post(url_path=path, body=body)
return self.parse_api_token_polling_response(response)
except requests.exceptions.HTTPError as e:
return self.parse_api_token_polling_response(e.response)
except Exception as e:
return None

@staticmethod
def parse_start_session_response(response: Response) -> models.AuthenticationSession:
return models.AuthenticationSessionSchema().load(response.json())

@staticmethod
def parse_api_token_polling_response(response: Response) -> Optional[models.ApiTokenGenerationPollingResponse]:
try:
return models.ApiTokenGenerationPollingResponseSchema().load(response.json())
except Exception as e:
return None
Loading