Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 55 additions & 31 deletions src/pybritive/britive_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import io
import os

from britive.britive import Britive
from .helpers.config import ConfigManager
from .helpers.credentials import FileCredentialManager, EncryptedFileCredentialManager
Expand All @@ -13,6 +11,7 @@
from .helpers.cache import Cache
from britive import exceptions
from pathlib import Path
from datetime import datetime


default_table_format = 'fancy_grid'
Expand Down Expand Up @@ -286,46 +285,71 @@ def checkin(self, profile):
application_name=app_name
)

def checkout(self, alias, blocktime, console, justification, mode, maxpolltime, profile):
self.login()
def checkout(self, alias, blocktime, console, justification, mode, maxpolltime, profile, passphrase):
# first check if this is a profile alias
profile_or_alias = alias or profile
profile = self.config.profile_aliases.get(profile, profile)
parts = profile.split('/')
if len(parts) != 3:
raise click.ClickException('Provided profile string does not have the required 3 parts.')
app_name = parts[0]
env_name = parts[1]
profile_name = parts[2]

try:
response = self.b.my_access.checkout_by_name(
profile_name=profile_name,
environment_name=env_name,
application_name=app_name,
programmatic=False if console else True,
include_credentials=True,
wait_time=blocktime,
max_wait_time=maxpolltime,
justification=justification
)
except exceptions.ApprovalRequiredButNoJustificationProvided:
raise click.ClickException('approval required and no justification provided.')
except ValueError as e:
raise click.BadParameter(str(e))
credentials = None
app_type = None

if mode == 'awscredentialprocess':
self.silent = True # the aws credential process CANNOT output anything other than the expected JSON
# we need to check the credential process cache for the credentials first
# then check to see if they are expired
# if not simply return those credentials
# if they are expired
app_type = 'AWS' # just hardcode as we know for sure this is for AWS
credentials = Cache(passphrase=passphrase).get_awscredentialprocess(profile_name=profile_or_alias)
if credentials:
expiration_timestamp_str = credentials['expirationTime'].replace('Z', '')
expires = datetime.fromisoformat(expiration_timestamp_str)
now = datetime.utcnow()
if now >= expires: # check to ensure the credentials are still valid, if not, set to None and get new
credentials = None

if not credentials: # nothing found via aws credential process or not aws credential process mode
self.login()
profile = self.config.profile_aliases.get(profile, profile)
parts = profile.split('/')
if len(parts) != 3:
raise click.ClickException('Provided profile string does not have the required 3 parts.')
app_name = parts[0]
env_name = parts[1]
profile_name = parts[2]

if alias: # do this down here so we know that the profile is valid and a checkout was successful
self.config.save_profile_alias(alias=alias, profile=profile)
try:
response = self.b.my_access.checkout_by_name(
profile_name=profile_name,
environment_name=env_name,
application_name=app_name,
programmatic=False if console else True,
include_credentials=True,
wait_time=blocktime,
max_wait_time=maxpolltime,
justification=justification
)
credentials = response['credentials']
app_type = self._get_app_type(response['appContainerId'])
except exceptions.ApprovalRequiredButNoJustificationProvided:
raise click.ClickException('approval required and no justification provided.')
except ValueError as e:
raise click.BadParameter(str(e))

if alias: # do this down here so we know that the profile is valid and a checkout was successful
self.config.save_profile_alias(alias=alias, profile=profile)
if mode == 'awscredentialprocess':
Cache(passphrase=passphrase).save_awscredentialprocess(
profile_name=profile_or_alias,
credentials=credentials
)

app_container_id = response['appContainerId']
app_type = self._get_app_type(app_container_id)
cc_printer = self.__get_cloud_credential_printer(
app_type,
console,
mode,
profile_or_alias,
self.silent,
response['credentials']
credentials
)
cc_printer.print()

Expand Down
2 changes: 1 addition & 1 deletion src/pybritive/choices/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
'env-nix', # environment variable output specifying "export"
'env-wincmd', # environment variable output specifying "set"
'env-winps', # environment variable output specifying "$env:"
'awscredentialprocess', # aws credential process output
'awscredentialprocess', # aws credential process output with additional caching to make the credential process more performant
'azlogin', # azure az login command with all fields populated (suitable for eval)
'azps', # azure powershell script
'browser' # when console access is checked out open the browser to the URL provided
Expand Down
3 changes: 2 additions & 1 deletion src/pybritive/commands/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ def checkout(ctx, alias, blocktime, console, justification, mode, maxpolltime, s
justification=justification,
mode=mode,
maxpolltime=maxpolltime,
profile=profile
profile=profile,
passphrase=passphrase
)
24 changes: 23 additions & 1 deletion src/pybritive/helpers/cache.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from pathlib import Path
import json
import os
from .encryption import StringEncryption, InvalidPassphraseException


class Cache:
def __init__(self):
def __init__(self, passphrase: str = None):
self.passphrase = passphrase
self.string_encryptor = StringEncryption(passphrase=self.passphrase)
home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home()))
self.path = str(Path(home) / '.britive' / 'pybritive.cache') # handle os specific separators properly
self.cache = {}
Expand All @@ -27,6 +30,8 @@ def load(self):

if 'profiles' not in self.cache.keys():
self.cache['profiles'] = []
if 'awscredentialprocess' not in self.cache.keys():
self.cache['awscredentialprocess'] = {}

def write(self):
# write the new cache file
Expand All @@ -38,8 +43,25 @@ def get_profiles(self):

def save_profiles(self, profiles: list):
self.cache['profiles'] += profiles
# dedup the list of profiles
self.cache['profiles'] = list(dict.fromkeys(self.cache['profiles']))
self.write()

def clear(self):
self.cache['profiles'] = []
self.cache['awscredentialprocess'] = {}
self.write()

def get_awscredentialprocess(self, profile_name: str):
try:
ciphertext = self.cache['awscredentialprocess'].get(profile_name)
if not ciphertext:
return None
return json.loads(self.string_encryptor.decrypt(ciphertext))
except InvalidPassphraseException: # if we cannot decrypt don't error - just make the API call to get the creds
return None

def save_awscredentialprocess(self, profile_name: str, credentials: dict):
ciphertext = self.string_encryptor.encrypt(json.dumps(credentials, default=str))
self.cache['awscredentialprocess'][profile_name] = ciphertext
self.write()
34 changes: 6 additions & 28 deletions src/pybritive/helpers/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import click
import configparser
import json
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .config import ConfigManager
import os
from .encryption import StringEncryption, InvalidPassphraseException


interactive_login_fields_to_pop = [
Expand Down Expand Up @@ -179,23 +176,9 @@ def __init__(self, tenant_name: str, tenant_alias: str, cli: ConfigManager, pass
self.path = str(Path(home) / '.britive' / 'pybritive.credentials.encrypted')
self.passphrase = passphrase
self.prompt()
self.string_encryptor = StringEncryption(passphrase=self.passphrase)
super().__init__(tenant_name, tenant_alias, cli)

@staticmethod
def salt():
return base64.b64encode(os.urandom(32)).decode('utf-8')

def key(self, salt: str):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=base64.b64decode(salt.encode()),
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(self.passphrase.encode()))
return key

def prompt(self):
if not self.passphrase:
self.passphrase = click.prompt(
Expand All @@ -205,17 +188,12 @@ def prompt(self):

def decrypt(self, encrypted_access_token: str):
try:
encrypted_access_token, b64salt = encrypted_access_token.split(':')
key = self.key(b64salt)
return Fernet(key).decrypt(base64.b64decode(encrypted_access_token.encode())).decode('utf-8')
except InvalidToken:
raise click.ClickException('Invalid passphrase provided. Unable to decrypt credentials.')
return self.string_encryptor.decrypt(ciphertext=encrypted_access_token)
except InvalidPassphraseException:
click.ClickException('invalid passphrase provided - cannot decrypt credentials.')

def encrypt(self, decrypted_access_token: str):
salt = self.salt()
key = self.key(salt)
encrypted_access_token = Fernet(key).encrypt(decrypted_access_token.encode())
return f'{base64.b64encode(encrypted_access_token).decode("utf-8")}:{salt}'
return self.string_encryptor.encrypt(plaintext=decrypted_access_token)

def load(self, full=False):
path = Path(self.path)
Expand Down
45 changes: 45 additions & 0 deletions src/pybritive/helpers/encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import uuid
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
import click


class InvalidPassphraseException(Exception):
pass


class StringEncryption:
def __init__(self, passphrase: str = None):
self.passphrase = passphrase or str(uuid.getnode()) # TODO change?

@staticmethod
def _salt():
return base64.b64encode(os.urandom(32)).decode('utf-8')

def _key(self, salt: str):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=base64.b64decode(salt.encode()),
iterations=100000,
backend=default_backend()
)
return base64.urlsafe_b64encode(kdf.derive(self.passphrase.encode()))

def encrypt(self, plaintext: str) -> str:
salt = self._salt()
key = self._key(salt)
ciphertext = Fernet(key).encrypt(plaintext.encode('utf-8'))
return f'{base64.b64encode(ciphertext).decode("utf-8")}:{salt}'

def decrypt(self, ciphertext: str):
try:
ciphertext, b64salt = ciphertext.split(':')
key = self._key(b64salt)
return Fernet(key).decrypt(base64.b64decode(ciphertext.encode())).decode('utf-8')
except InvalidToken:
raise InvalidPassphraseException()
2 changes: 1 addition & 1 deletion src/pybritive/options/passphrase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

option = click.option(
'--passphrase', '-p',
help='The passphrase to use for the encrypted-file credential backend type.',
help='The passphrase to use for encrypting credentials.',
envvar='PYBRITIVE_ENCRYPTED_CREDENTIAL_PASSPHRASE',
show_envvar=True,
show_default=True
Expand Down