diff --git a/README.md b/README.md index 1b40ae7..789c5c2 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ installed via the published tar balls in the GitHub repo. ~~~bash -pip install https://github.com/britive/python-cli/releases/download/v0.1.2/pybritive-0.1.2.tar.gz +pip install https://github.com/britive/python-cli/releases/download/v0.1.3/pybritive-0.1.3.tar.gz ~~~ The end user is free to install the CLI into a virtual environment or in the global scope, so it is available diff --git a/setup.cfg b/setup.cfg index d58a98f..0fc10e7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pybritive -version = 0.1.2 +version = 0.1.3 author = Britive Inc. author_email = support@britive.com description = A pure Python CLI for Britive diff --git a/src/pybritive/britive_cli.py b/src/pybritive/britive_cli.py index 9d5d4d2..29c21c5 100644 --- a/src/pybritive/britive_cli.py +++ b/src/pybritive/britive_cli.py @@ -1,6 +1,4 @@ import io -import os - from britive.britive import Britive from .helpers.config import ConfigManager from .helpers.credentials import FileCredentialManager, EncryptedFileCredentialManager @@ -13,6 +11,7 @@ from .helpers.cache import Cache from britive import exceptions from pathlib import Path +from datetime import datetime default_table_format = 'fancy_grid' @@ -286,46 +285,71 @@ def checkin(self, profile): application_name=app_name ) - def checkout(self, alias, blocktime, console, justification, mode, maxpolltime, profile): - self.login() + def checkout(self, alias, blocktime, console, justification, mode, maxpolltime, profile, passphrase): # first check if this is a profile alias profile_or_alias = alias or profile - profile = self.config.profile_aliases.get(profile, profile) - parts = profile.split('/') - if len(parts) != 3: - raise click.ClickException('Provided profile string does not have the required 3 parts.') - app_name = parts[0] - env_name = parts[1] - profile_name = parts[2] - try: - response = self.b.my_access.checkout_by_name( - profile_name=profile_name, - environment_name=env_name, - application_name=app_name, - programmatic=False if console else True, - include_credentials=True, - wait_time=blocktime, - max_wait_time=maxpolltime, - justification=justification - ) - except exceptions.ApprovalRequiredButNoJustificationProvided: - raise click.ClickException('approval required and no justification provided.') - except ValueError as e: - raise click.BadParameter(str(e)) + credentials = None + app_type = None + + if mode == 'awscredentialprocess': + self.silent = True # the aws credential process CANNOT output anything other than the expected JSON + # we need to check the credential process cache for the credentials first + # then check to see if they are expired + # if not simply return those credentials + # if they are expired + app_type = 'AWS' # just hardcode as we know for sure this is for AWS + credentials = Cache(passphrase=passphrase).get_awscredentialprocess(profile_name=profile_or_alias) + if credentials: + expiration_timestamp_str = credentials['expirationTime'].replace('Z', '') + expires = datetime.fromisoformat(expiration_timestamp_str) + now = datetime.utcnow() + if now >= expires: # check to ensure the credentials are still valid, if not, set to None and get new + credentials = None + + if not credentials: # nothing found via aws credential process or not aws credential process mode + self.login() + profile = self.config.profile_aliases.get(profile, profile) + parts = profile.split('/') + if len(parts) != 3: + raise click.ClickException('Provided profile string does not have the required 3 parts.') + app_name = parts[0] + env_name = parts[1] + profile_name = parts[2] - if alias: # do this down here so we know that the profile is valid and a checkout was successful - self.config.save_profile_alias(alias=alias, profile=profile) + try: + response = self.b.my_access.checkout_by_name( + profile_name=profile_name, + environment_name=env_name, + application_name=app_name, + programmatic=False if console else True, + include_credentials=True, + wait_time=blocktime, + max_wait_time=maxpolltime, + justification=justification + ) + credentials = response['credentials'] + app_type = self._get_app_type(response['appContainerId']) + except exceptions.ApprovalRequiredButNoJustificationProvided: + raise click.ClickException('approval required and no justification provided.') + except ValueError as e: + raise click.BadParameter(str(e)) + + if alias: # do this down here so we know that the profile is valid and a checkout was successful + self.config.save_profile_alias(alias=alias, profile=profile) + if mode == 'awscredentialprocess': + Cache(passphrase=passphrase).save_awscredentialprocess( + profile_name=profile_or_alias, + credentials=credentials + ) - app_container_id = response['appContainerId'] - app_type = self._get_app_type(app_container_id) cc_printer = self.__get_cloud_credential_printer( app_type, console, mode, profile_or_alias, self.silent, - response['credentials'] + credentials ) cc_printer.print() diff --git a/src/pybritive/choices/mode.py b/src/pybritive/choices/mode.py index d9e9b54..c7c5163 100644 --- a/src/pybritive/choices/mode.py +++ b/src/pybritive/choices/mode.py @@ -11,7 +11,7 @@ 'env-nix', # environment variable output specifying "export" 'env-wincmd', # environment variable output specifying "set" 'env-winps', # environment variable output specifying "$env:" - 'awscredentialprocess', # aws credential process output + 'awscredentialprocess', # aws credential process output with additional caching to make the credential process more performant 'azlogin', # azure az login command with all fields populated (suitable for eval) 'azps', # azure powershell script 'browser' # when console access is checked out open the browser to the URL provided diff --git a/src/pybritive/commands/checkout.py b/src/pybritive/commands/checkout.py index 1472e12..7d33304 100644 --- a/src/pybritive/commands/checkout.py +++ b/src/pybritive/commands/checkout.py @@ -25,5 +25,6 @@ def checkout(ctx, alias, blocktime, console, justification, mode, maxpolltime, s justification=justification, mode=mode, maxpolltime=maxpolltime, - profile=profile + profile=profile, + passphrase=passphrase ) diff --git a/src/pybritive/helpers/cache.py b/src/pybritive/helpers/cache.py index 4b6f9d3..bfda76e 100644 --- a/src/pybritive/helpers/cache.py +++ b/src/pybritive/helpers/cache.py @@ -1,10 +1,13 @@ from pathlib import Path import json import os +from .encryption import StringEncryption, InvalidPassphraseException class Cache: - def __init__(self): + def __init__(self, passphrase: str = None): + self.passphrase = passphrase + self.string_encryptor = StringEncryption(passphrase=self.passphrase) home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home())) self.path = str(Path(home) / '.britive' / 'pybritive.cache') # handle os specific separators properly self.cache = {} @@ -27,6 +30,8 @@ def load(self): if 'profiles' not in self.cache.keys(): self.cache['profiles'] = [] + if 'awscredentialprocess' not in self.cache.keys(): + self.cache['awscredentialprocess'] = {} def write(self): # write the new cache file @@ -38,8 +43,25 @@ def get_profiles(self): def save_profiles(self, profiles: list): self.cache['profiles'] += profiles + # dedup the list of profiles + self.cache['profiles'] = list(dict.fromkeys(self.cache['profiles'])) self.write() def clear(self): self.cache['profiles'] = [] + self.cache['awscredentialprocess'] = {} + self.write() + + def get_awscredentialprocess(self, profile_name: str): + try: + ciphertext = self.cache['awscredentialprocess'].get(profile_name) + if not ciphertext: + return None + return json.loads(self.string_encryptor.decrypt(ciphertext)) + except InvalidPassphraseException: # if we cannot decrypt don't error - just make the API call to get the creds + return None + + def save_awscredentialprocess(self, profile_name: str, credentials: dict): + ciphertext = self.string_encryptor.encrypt(json.dumps(credentials, default=str)) + self.cache['awscredentialprocess'][profile_name] = ciphertext self.write() diff --git a/src/pybritive/helpers/credentials.py b/src/pybritive/helpers/credentials.py index 29fad99..947a42e 100644 --- a/src/pybritive/helpers/credentials.py +++ b/src/pybritive/helpers/credentials.py @@ -7,12 +7,9 @@ import click import configparser import json -from cryptography.fernet import Fernet, InvalidToken -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from .config import ConfigManager import os +from .encryption import StringEncryption, InvalidPassphraseException interactive_login_fields_to_pop = [ @@ -179,23 +176,9 @@ def __init__(self, tenant_name: str, tenant_alias: str, cli: ConfigManager, pass self.path = str(Path(home) / '.britive' / 'pybritive.credentials.encrypted') self.passphrase = passphrase self.prompt() + self.string_encryptor = StringEncryption(passphrase=self.passphrase) super().__init__(tenant_name, tenant_alias, cli) - @staticmethod - def salt(): - return base64.b64encode(os.urandom(32)).decode('utf-8') - - def key(self, salt: str): - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=32, - salt=base64.b64decode(salt.encode()), - iterations=100000, - backend=default_backend() - ) - key = base64.urlsafe_b64encode(kdf.derive(self.passphrase.encode())) - return key - def prompt(self): if not self.passphrase: self.passphrase = click.prompt( @@ -205,17 +188,12 @@ def prompt(self): def decrypt(self, encrypted_access_token: str): try: - encrypted_access_token, b64salt = encrypted_access_token.split(':') - key = self.key(b64salt) - return Fernet(key).decrypt(base64.b64decode(encrypted_access_token.encode())).decode('utf-8') - except InvalidToken: - raise click.ClickException('Invalid passphrase provided. Unable to decrypt credentials.') + return self.string_encryptor.decrypt(ciphertext=encrypted_access_token) + except InvalidPassphraseException: + click.ClickException('invalid passphrase provided - cannot decrypt credentials.') def encrypt(self, decrypted_access_token: str): - salt = self.salt() - key = self.key(salt) - encrypted_access_token = Fernet(key).encrypt(decrypted_access_token.encode()) - return f'{base64.b64encode(encrypted_access_token).decode("utf-8")}:{salt}' + return self.string_encryptor.encrypt(plaintext=decrypted_access_token) def load(self, full=False): path = Path(self.path) diff --git a/src/pybritive/helpers/encryption.py b/src/pybritive/helpers/encryption.py new file mode 100644 index 0000000..e0c7ff3 --- /dev/null +++ b/src/pybritive/helpers/encryption.py @@ -0,0 +1,45 @@ +import uuid +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +import os +import base64 +import click + + +class InvalidPassphraseException(Exception): + pass + + +class StringEncryption: + def __init__(self, passphrase: str = None): + self.passphrase = passphrase or str(uuid.getnode()) # TODO change? + + @staticmethod + def _salt(): + return base64.b64encode(os.urandom(32)).decode('utf-8') + + def _key(self, salt: str): + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=base64.b64decode(salt.encode()), + iterations=100000, + backend=default_backend() + ) + return base64.urlsafe_b64encode(kdf.derive(self.passphrase.encode())) + + def encrypt(self, plaintext: str) -> str: + salt = self._salt() + key = self._key(salt) + ciphertext = Fernet(key).encrypt(plaintext.encode('utf-8')) + return f'{base64.b64encode(ciphertext).decode("utf-8")}:{salt}' + + def decrypt(self, ciphertext: str): + try: + ciphertext, b64salt = ciphertext.split(':') + key = self._key(b64salt) + return Fernet(key).decrypt(base64.b64decode(ciphertext.encode())).decode('utf-8') + except InvalidToken: + raise InvalidPassphraseException() diff --git a/src/pybritive/options/passphrase.py b/src/pybritive/options/passphrase.py index 1a01c7f..cc0d238 100644 --- a/src/pybritive/options/passphrase.py +++ b/src/pybritive/options/passphrase.py @@ -3,7 +3,7 @@ option = click.option( '--passphrase', '-p', - help='The passphrase to use for the encrypted-file credential backend type.', + help='The passphrase to use for encrypting credentials.', envvar='PYBRITIVE_ENCRYPTED_CREDENTIAL_PASSPHRASE', show_envvar=True, show_default=True