-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #8 from Nr18/develop
release: 0.2.0
- Loading branch information
Showing
30 changed files
with
1,532 additions
and
225 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,89 @@ | ||
import click | ||
import boto3 | ||
from .aws_config import AWSConfig | ||
from .credentials import Credentials | ||
from botocore.exceptions import ParamValidationError, ClientError | ||
from click import Context | ||
|
||
from aws_iam_login.actions.initialize_configuration import InitializeConfiguration | ||
from aws_iam_login.observer import Observer | ||
from aws_iam_login.temp_credentials import TempCredentials | ||
from aws_iam_login.aws_config import AWSConfig | ||
from aws_iam_login.actions.rotate_access_keys import RotateAccessKeys | ||
from aws_iam_login.credentials import Credentials | ||
from aws_iam_login.application_context import ApplicationContext, ApplicationMessages | ||
|
||
@click.command() | ||
|
||
@click.group(invoke_without_command=True) | ||
@click.option("--debug/--no-debug") | ||
@click.argument("profile") | ||
def main(profile: str) -> None: | ||
@click.pass_context | ||
def main(ctx: Context, debug: bool, profile: str) -> None: | ||
""" | ||
aws-iam-login | ||
""" | ||
config = AWSConfig(profile) | ||
session = boto3.Session(profile_name=profile) | ||
ctx.obj = ApplicationContext(profile=profile, debug=debug) | ||
|
||
if ctx.invoked_subcommand is None: | ||
credentials(ctx=ctx.obj) | ||
|
||
|
||
def credentials(ctx: ApplicationContext) -> None: | ||
config = AWSConfig(ctx.profile) | ||
session = boto3.Session(profile_name=ctx.profile) | ||
client = session.client("sts") | ||
|
||
response = client.get_session_token( | ||
SerialNumber=config.mfa_serial, | ||
TokenCode=input(f"Enter MFA code for {config.mfa_serial}: "), | ||
) | ||
try: | ||
response = client.get_session_token( | ||
SerialNumber=config.mfa_serial, | ||
TokenCode=input(f"Enter MFA code for {config.mfa_serial}: "), | ||
) | ||
|
||
config.write( | ||
f"{ctx.profile}-sts", TempCredentials(response.get("Credentials", {})) | ||
) | ||
click.echo(f"Login credentials stored in the {ctx.profile}-sts profile!") | ||
except ClientError as exc: | ||
click.echo(f"ClientError: {exc}") | ||
exit(1) | ||
except ParamValidationError as exc: | ||
click.echo(f"ValidationError: {exc}") | ||
exit(1) | ||
|
||
|
||
@main.command() | ||
@click.pass_obj | ||
def init(ctx: ApplicationContext) -> None: | ||
""" | ||
Initialize your `.aws/configuration` | ||
""" | ||
click.echo(f"Looking up additional required data...") | ||
|
||
action = InitializeConfiguration(profile=ctx.profile) | ||
action.subscribe_subject(ctx.subject) | ||
|
||
if not action.execute(): | ||
click.echo(f"Failed to initialize the {ctx.profile} profile") | ||
exit(1) | ||
|
||
click.echo(f"The {ctx.profile} profile has been successfully initialized!") | ||
|
||
|
||
@main.command() | ||
@click.pass_obj | ||
def rotate(ctx: ApplicationContext) -> None: | ||
""" | ||
Rotate your IAM User credentials | ||
""" | ||
click.echo(f"Key rotation process in progress") | ||
|
||
action = RotateAccessKeys(profile=ctx.profile) | ||
action.subscribe_subject(ctx.subject) | ||
|
||
if not action.execute(): | ||
click.echo(f"Failed to rotate the access keys for the {ctx.profile} profile") | ||
exit(1) | ||
|
||
config.write(f"{profile}-sts", Credentials(response.get("Credentials", {}))) | ||
click.echo(f"Login credentials stored in the {profile}-sts profile!") | ||
click.echo(f"Keys successfully rotated for the {ctx.profile} profile") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
main(obj={}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from datetime import datetime | ||
from typing import Dict, Union, Optional | ||
|
||
from aws_iam_login.credentials import Credentials | ||
|
||
|
||
class AccessKey: | ||
""" | ||
Understands the AccessKey format of AWS | ||
""" | ||
|
||
def __init__(self, data: Optional[dict]): | ||
sanitized_data = self.__sanitize_input(data) | ||
self.__data = sanitized_data if sanitized_data else {} | ||
self.__credentials = Credentials(data) | ||
|
||
@staticmethod | ||
def __sanitize_input(data: Optional[dict]) -> Optional[dict]: | ||
if not isinstance(data, dict): | ||
return None | ||
|
||
sanitized = {} | ||
|
||
for key in ["UserName", "Status", "CreateDate"]: | ||
if key in data: | ||
sanitized[key] = data[key] | ||
|
||
return sanitized | ||
|
||
@property | ||
def username(self) -> str: | ||
return str(self.__data.get("UserName", "")) | ||
|
||
@property | ||
def credentials(self) -> Credentials: | ||
return self.__credentials | ||
|
||
@property | ||
def status(self) -> str: | ||
return str(self.__data.get("Status", "")) | ||
|
||
@property | ||
def created(self) -> datetime: | ||
date = self.__data.get("CreateDate") | ||
return date if isinstance(date, datetime) else datetime.now() | ||
|
||
def __str__(self) -> str: | ||
postfix = "" | ||
if self.credentials.aws_secret_access_key: | ||
postfix = " (contains secret)" | ||
return f"{self.credentials.aws_access_key_id}, {self.status} {self.username} created at {self.created}{postfix}" | ||
|
||
def __repr__(self) -> str: | ||
return str(self) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from typing import Optional | ||
from abc import ABC, abstractmethod | ||
from aws_iam_login.application_context import ApplicationMessages | ||
|
||
|
||
class Action(ABC): | ||
""" | ||
Understands generic actions that are executed | ||
""" | ||
|
||
__subject: Optional[ApplicationMessages] = None | ||
|
||
def subscribe_subject(self, subject: ApplicationMessages): | ||
self.__subject = subject | ||
|
||
def info(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="INFO", message=message) | ||
|
||
def warning(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="WARN", message=message) | ||
|
||
def error(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="ERROR", message=message) | ||
|
||
def exception(self, message: str, exception: Exception) -> None: | ||
if self.__subject: | ||
self.__subject.send( | ||
message_type="ERROR", message=message, exception=exception | ||
) | ||
|
||
@abstractmethod | ||
def execute(self) -> bool: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from typing import Optional | ||
import boto3 | ||
|
||
from aws_iam_login.actions.action import Action | ||
from aws_iam_login.aws_config import AWSConfig | ||
|
||
|
||
class InitializeConfiguration(Action): | ||
""" | ||
Understands how to configure the AWS profiles to be used by aws-iam-login. | ||
""" | ||
|
||
__cached_client: Optional[boto3.Session.client] = None | ||
|
||
def __init__(self, profile: str) -> None: | ||
self.__profile = profile | ||
|
||
self.__config = AWSConfig(profile) | ||
self.__client = boto3.Session(profile_name=profile).client("sts") | ||
|
||
def execute(self) -> bool: | ||
if self.__config.mfa_serial and self.__config.username: | ||
self.info(f"The {self.__profile} profile is already initialized.") | ||
return True | ||
|
||
try: | ||
data = self.__client.get_caller_identity() | ||
mfa_serial = data["Arn"].replace(":user/", ":mfa/") | ||
username = mfa_serial.split("/")[-1] | ||
self.info(f"Determined MFA ARN: {mfa_serial}") | ||
self.info(f"Determined IAM Username: {username}") | ||
self.__config.initialize(username=username, mfa_serial=mfa_serial) | ||
|
||
return True | ||
except Exception as exc: | ||
message = ( | ||
f"Failed to get the caller identity for the {self.__profile} profile." | ||
) | ||
self.exception(message=message, exception=exc) | ||
|
||
return False |
Oops, something went wrong.