Skip to content

Commit

Permalink
feat: support initialization of configuration
Browse files Browse the repository at this point in the history
By using the `get_caller_identity` API call. We can automatically configure the `username` and `mfa_serial` for the given access keys. This makes the tool it easier to use!
  • Loading branch information
Joris Conijn committed Jul 21, 2022
1 parent 4690f63 commit f70b033
Show file tree
Hide file tree
Showing 20 changed files with 853 additions and 484 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@ stored in the `~/.aws/credentials` file for re-use.
## Configuration

You will need to configure your roles and IAM User credentials in the same places as you are used to. So in your
`~/.aws/credentials` file you will need to have the following:
`~/.aws/credentials` file. To make this process as easy as possible you could use the following command:

```bash
aws-iam-login my-profile init
```

This command will fetch the ARN of the caller identity. Based on this identity we will determin the `username` and
`mfa_serial` of the IAM User. These will then be stored in the `~/.aws/credentials` file. For example:

```ini
[my-profile]
aws_access_key_id = XXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX
mfa_serial = arn:aws:iam::111122223333:mfa/my-iam-user
username = my-iam-user
```

The only addition is the `mfa_serial` field.
The only addition is the `username` and `mfa_serial` fields.

### AWS Least privileged

Expand Down
31 changes: 24 additions & 7 deletions aws_iam_login/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from botocore.exceptions import ParamValidationError, ClientError
from click import Context

from aws_iam_login.actions.initialize_configuration import InitializeConfiguration
from aws_iam_login.observer import Observer
from .aws_config import AWSConfig
from .rotate_access_keys import RotateAccessKeys
from aws_iam_login.actions.rotate_access_keys import RotateAccessKeys
from .credentials import Credentials
from .application_context import ApplicationContext, ApplicationMessages

Expand Down Expand Up @@ -46,19 +47,35 @@ def credentials(ctx: ApplicationContext) -> None:


@main.command()
@click.argument("username")
@click.pass_obj
def rotate(ctx: ApplicationContext, username: str) -> None:
def init(ctx: ApplicationContext) -> None:
"""
Initialize your `.aws/configuration`
"""
click.echo(f"Looking up additional required data...")

action = InitializeConfiguration(profile=ctx.profile)
action.subscribe_subject(ctx.subject)

if not action.execute():
click.echo(f"Failed to initialize the {ctx.profile} profile")
exit(1)

click.echo(f"The {ctx.profile} profile has been successfully initialized!")


@main.command()
@click.pass_obj
def rotate(ctx: ApplicationContext) -> None:
"""
Rotate your IAM User credentials
"""
click.echo(f"Key rotation process in progress")

action = RotateAccessKeys(
profile=ctx.profile, username=username, subject=ctx.subject
)
action = RotateAccessKeys(profile=ctx.profile)
action.subscribe_subject(ctx.subject)

if not action.rotate():
if not action.execute():
click.echo(f"Failed to rotate the access keys for the {ctx.profile} profile")
exit(1)

Expand Down
8 changes: 0 additions & 8 deletions aws_iam_login/access_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ def __iter__(self):
yield "aws_access_key_id", self.access_key
yield "aws_secret_access_key", self.secret_access_key

@property
def profile(self) -> str:
return str(self.__raw_data.get("Profile", ""))

@profile.setter
def profile(self, value: str) -> None:
self.__raw_data["Profile"] = value

@property
def username(self) -> str:
return str(self.__raw_data.get("UserName", ""))
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions aws_iam_login/actions/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional
from abc import ABC, abstractmethod
from aws_iam_login.application_context import ApplicationMessages


class Action(ABC):
"""
Understands generic actions that are executed
"""

__subject: Optional[ApplicationMessages] = None

def subscribe_subject(self, subject: ApplicationMessages):
self.__subject = subject

def info(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="INFO", message=message)

def warning(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="WARN", message=message)

def error(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="ERROR", message=message)

def exception(self, message: str, exception: Exception) -> None:
if self.__subject:
self.__subject.send(
message_type="ERROR", message=message, exception=exception
)

@abstractmethod
def execute(self) -> bool:
pass
41 changes: 41 additions & 0 deletions aws_iam_login/actions/initialize_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional
import boto3

from aws_iam_login.actions.action import Action
from aws_iam_login.aws_config import AWSConfig


class InitializeConfiguration(Action):
"""
Understands how to configure the AWS profiles to be used by aws-iam-login.
"""

__cached_client: Optional[boto3.Session.client] = None

def __init__(self, profile: str) -> None:
self.__profile = profile

self.__config = AWSConfig(profile)
self.__client = boto3.Session(profile_name=profile).client("sts")

def execute(self) -> bool:
if self.__config.mfa_serial and self.__config.username:
self.info(f"The {self.__profile} profile is already initialized.")
return True

try:
data = self.__client.get_caller_identity()
mfa_serial = data["Arn"].replace(":user/", ":mfa/")
username = mfa_serial.split("/")[-1]
self.info(f"Determined MFA ARN: {mfa_serial}")
self.info(f"Determined IAM Username: {username}")
self.__config.initialize(username=username, mfa_serial=mfa_serial)

return True
except Exception as exc:
message = (
f"Failed to get the caller identity for the {self.__profile} profile."
)
self.exception(message=message, exception=exc)

return False
102 changes: 102 additions & 0 deletions aws_iam_login/actions/rotate_access_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Optional, List
import time
import boto3
from aws_iam_login import AWSConfig
from aws_iam_login.actions.action import Action
from aws_iam_login.access_key import AccessKey
from functools import lru_cache


class RotateAccessKeys(Action):
"""
Understands how to rotate access keys
"""

__cached_client: Optional[boto3.Session.client] = None

def __init__(self, profile: str) -> None:
self.__profile = profile
self.__config = AWSConfig(profile)

@lru_cache()
def new_key(self) -> Optional[AccessKey]:
try:
response = self.__client.create_access_key(UserName=self.__config.username)
self.info(
message="Sleep for 10 seconds to make sure the credentials are active."
)
time.sleep(10)
return AccessKey(response["AccessKey"])
except Exception as exc:
message = f"Failed to create a new access key for the user: {self.__config.username}"
self.exception(message=message, exception=exc)

return None

def execute(self) -> bool:
if not self.__rotation_possible():
return False

new_key = self.new_key()

if self.__config.key and new_key:
try:
self.__flush_client()
self.__disable_key(self.__config.key)
self.__config.write(f"{self.__profile}", new_key)
self.__delete_key(self.__config.key)

return True
except Exception as exc:
message = f"Failed to rotate the credentials for the user: {self.__config.username}"
self.exception(message=message, exception=exc)

return False

def __flush_client(self) -> None:
self.__cached_client = None

@property
def __client(self) -> boto3.Session.client:
if not self.__cached_client:
session = boto3.Session(profile_name=self.__profile)
self.__cached_client = session.client("iam")

return self.__cached_client

def __get_current_keys(self) -> List[dict]:
try:
response = self.__client.list_access_keys(UserName=self.__config.username)
return response["AccessKeyMetadata"]
except Exception as exc:
message = (
f"Failed to list access keys for the user: {self.__config.username}"
)
self.exception(message=message, exception=exc)

return []

def __rotation_possible(self) -> bool:
if not self.__config.valid:
self.warning(
f"The configuration for the {self.__profile} is invalid, please try `aws-iam-login {self.__profile} init` first!"
)
return False

if len(self.__get_current_keys()) != 1:
self.error(f"There needs to be only 1 AccessKey present!")
return False

return True

def __disable_key(self, key: AccessKey) -> None:
self.__client.update_access_key(
UserName=self.__config.username,
AccessKeyId=key.access_key,
Status="Inactive",
)

def __delete_key(self, key: AccessKey) -> None:
self.__client.delete_access_key(
UserName=self.__config.username, AccessKeyId=key.access_key
)
33 changes: 22 additions & 11 deletions aws_iam_login/aws_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ def __init__(self, profile: str) -> None:

@property
def valid(self) -> bool:
valid = bool(
return bool(
self.__profile_name
and self.__profile_name in self.__config
and self.mfa_serial
and self.username
)
return valid

@property
def __config(self) -> configparser.ConfigParser:
Expand All @@ -41,8 +41,12 @@ def __profile(self) -> configparser.SectionProxy:
return self.__config[self.__profile_name]

@property
def mfa_serial(self) -> Optional[str]:
return self.__profile.get("mfa_serial", None)
def mfa_serial(self) -> str:
return self.__profile.get("mfa_serial", "")

@property
def username(self) -> str:
return self.__profile.get("username", "")

@property
def key(self) -> AccessKey:
Expand All @@ -58,18 +62,25 @@ def key(self) -> AccessKey:

return self.__cached_key

def initialize(self, username: str, mfa_serial: str) -> None:
self.__profile["username"] = username
self.__profile["mfa_serial"] = mfa_serial
self.__save_configuration()

def __save_configuration(self):
with open(self.__credential_file, "w") as fh:
self.__config.write(fh)

def write(
self, profile: str, credentials: Union[None, Credentials, AccessKey]
) -> None:
if not credentials:
return

data = dict(credentials)

if self.mfa_serial:
data["mfa_serial"] = self.mfa_serial
for key, value in dict(credentials).items():
if profile not in self.__config:
self.__config[profile] = {}

self.__config[profile] = data
self.__config[profile][key] = value

with open(self.__credential_file, "w") as fh:
self.__config.write(fh)
self.__save_configuration()
Loading

0 comments on commit f70b033

Please sign in to comment.