Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account Context Module Improved and Enabled by Default #77

Merged
merged 4 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ In **MetaHub**, context refers to information about the affected resources like

MetaHub doesn't stop at the affected resource but analyzes any associated or attached resources. For instance, if there is a security finding on an EC2 instance, MetaHub will not only analyze the instance but also the security groups attached to it, including their rules. MetaHub will examine the IAM roles that the affected resource is using and the policies attached to those roles for any issues. It will analyze the EBS attached to the instance and determine if they are encrypted. It will also analyze the Auto Scaling Groups that the instance is associated with and how. MetaHub will also analyze the VPC, Subnets, and other resources associated with the instance.

The **Context** module has the capability to retrieve information from the affected resources, affected accounts, and every associated resources. The context module has five main parts: `config` (which includes `associations` as well), `tags`, `cloudtrail`, and `account`. By default `config` and `tags` are enabled, but you can change this behavior using the option `--context` (for enabling all the context modules you can use `--context config tags cloudtrail account`). The output of each enabled key will be added under the affected resource.
The **Context** module has the capability to retrieve information from the affected resources, affected accounts, and every associated resources. The context module has five main parts: `config` (which includes `associations` as well), `tags`, `cloudtrail`, and `account`. By default `config`, `tags` and `account` are enabled, but you can change this behavior using the option `--context` (for enabling all the context modules you can use `--context config tags cloudtrail account`). The output of each enabled key will be added under the affected resource.

- [Config](#config)
- [Associations](#associations)
Expand Down
138 changes: 116 additions & 22 deletions lib/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,47 +250,141 @@ def get_account_organizations(self):
self.resource_arn,
)
# Organizations
organizations = False
organizations_client = get_boto3_client(
self.logger, "organizations", self.resource_region, self.sess
)
# Describe Organization
try:
response = organizations_client.describe_account(
AccountId=self.resource_account_id
response_describe_organization = (
organizations_client.describe_organization().get("Organization")
)
organizations_name = response["Account"]["Name"]
response = organizations_client.list_parents(
ChildId=self.resource_account_id
organization_arn = response_describe_organization.get("Arn")
organization_id = response_describe_organization.get("Id")
organization_master_id = response_describe_organization.get(
"MasterAccountId"
)
organization_master_email = response_describe_organization.get(
"MasterAccountEmail"
)
ou_id = response["Parents"][0]["Id"]
if ou_id and response["Parents"][0]["Type"] == "ORGANIZATIONAL_UNIT":
response = organizations_client.describe_organizational_unit(
OrganizationalUnitId=ou_id
organization_feature_set = response_describe_organization.get("FeatureSet")
# Describe Delegations
# This only works if we are the master account or a delegated administrator, but we try anyways to get the info
organization_delegated_administrators = {}
try:
response_list_delegated_administrators = (
organizations_client.list_delegated_administrators().get(
"DelegatedAdministrators"
)
)
organizations_ou = response["OrganizationalUnit"]["Name"]
elif ou_id:
organizations_ou = "ROOT"
if response_list_delegated_administrators:
for (
delegated_administrator
) in response_list_delegated_administrators:
organization_delegated_administrators[
delegated_administrator.get("Id")
] = {}
except ClientError as err:
if not err.response["Error"]["Code"] == "AccessDeniedException":
self.logger.warning(
"Failed to list_delegated_administrators: %s, for resource: %s - %s",
self.resource_account_id,
self.resource_arn,
err,
)
# Organizations Details
organizations = {
"Name": organizations_name,
"OU": organizations_ou,
"Arn": organization_arn,
"Id": organization_id,
"MasterAccountId": organization_master_id,
"MasterAccountEmail": organization_master_email,
"FeatureSet": organization_feature_set,
"DelegatedAdministrators": organization_delegated_administrators,
}
except ClientError as err:
if err.response["Error"]["Code"] == "AWSOrganizationsNotInUseException":
self.logger.info(
"Failed to describe_account: %s, for resource: %s - %s",
organizations = False
if not err.response["Error"]["Code"] == "AWSOrganizationsNotInUseException":
self.logger.warning(
"Failed to describe_organization: %s, for resource: %s - %s",
self.resource_account_id,
self.resource_arn,
err,
)
else:
return organizations

def get_account_organizations_details(self):
# The following operations can be called only from the organization’s management account or by a member account that is a delegated administrator.
organizations_details = {}
self.logger.info(
"get_account_organizations_details for account: %s (%s)",
self.resource_account_id,
self.resource_arn,
)
# Organizations
organizations_client = get_boto3_client(
self.logger, "organizations", self.resource_region, self.sess
)
# Get parent ID and OU
try:
response_list_parents = organizations_client.list_parents(
ChildId=self.resource_account_id
)
parent_id = response_list_parents["Parents"][0]["Id"]
parent_type = response_list_parents["Parents"][0]["Type"]
if parent_id and parent_type == "ORGANIZATIONAL_UNIT":
response_describe_organizational_unit = (
organizations_client.describe_organizational_unit(
OrganizationalUnitId=parent_id
)
)
organizations_ou = response_describe_organizational_unit[
"OrganizationalUnit"
]["Name"]
elif parent_id:
organizations_ou = "ROOT"
organizations_details["ParentId"] = parent_id
organizations_details["ParentType"] = parent_type
organizations_details["OU"] = organizations_ou
except ClientError as err:
if not err.response["Error"]["Code"] == "AccessDeniedException":
self.logger.warning(
"Failed to describe_account: %s, for resource: %s - %s",
"Failed to list_parents: %s, for resource: %s - %s",
self.resource_account_id,
self.resource_arn,
err,
)

return organizations
# Policies
available_organizations_policies = [
"SERVICE_CONTROL_POLICY",
"TAG_POLICY",
"BACKUP_POLICY",
"AISERVICES_OPT_OUT_POLICY",
]
organizations_details["Policies"] = {}
for policy_type in available_organizations_policies:
response_list_policies = organizations_client.list_policies(
Filter=policy_type
)
if response_list_policies.get("Policies"):
for policy in response_list_policies.get("Policies"):
policy_id = policy.get("Id")
policy_name = policy.get("Name")
policy_arn = policy.get("Arn")
policy_type = policy.get("Type")
policy_description = policy.get("Description")
policy_awsmanaged = policy.get("AwsManaged")
targes = organizations_client.list_targets_for_policy(
PolicyId=policy_id
)["Targets"]
organizations_details["Policies"][policy_id] = {
"Name": policy_name,
"Arn": policy_arn,
"Type": policy_type,
"Description": policy_description,
"AwsManaged": policy_awsmanaged,
"Targets": targes,
}

return organizations_details

def get_account_alternate_contact(self, alternate_contact_type="SECURITY"):
# https://docs.aws.amazon.com/accounts/latest/reference/using-orgs-trusted-access.html
Expand Down
2 changes: 1 addition & 1 deletion lib/context/resources/AwsAutoScalingLaunchConfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _describe_launch_configuration_iam_roles(self):
if self.launch_configuration:
instance_profile = self.launch_configuration.get("IamInstanceProfile")
arn = IamHelper(
self.logger, self.finding, False, self.sess, instance_profile
self.logger, self.finding, self.sess
).get_role_from_instance_profile(instance_profile)
iam_roles[arn] = {}

Expand Down
14 changes: 4 additions & 10 deletions lib/context/resources/AwsEc2Instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,13 @@ def describe_instance(self):
}
],
)
if response["Reservations"]:
return response["Reservations"][0]["Instances"][0]
except ClientError as err:
if err.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
self.logger.info(
"Failed to describe_instance: {}, {}".format(self.resource_id, err)
)
return False
else:
if not err.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
self.logger.error(
"Failed to describe_instance: {}, {}".format(self.resource_id, err)
)
return False
if response["Reservations"]:
return response["Reservations"][0]["Instances"][0]
return False

def _describe_instance_iam_roles(self):
Expand All @@ -91,7 +85,7 @@ def _describe_instance_iam_roles(self):
if profile:
profile_arn = profile.get("Arn")
arn = IamHelper(
self.logger, self.finding, False, self.sess, profile_arn
self.logger, self.finding, self.sess
).get_role_from_instance_profile(profile_arn)
if arn:
iam_roles[arn] = {}
Expand Down
2 changes: 1 addition & 1 deletion lib/context/resources/AwsEc2LaunchTemplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _describe_launch_template_versions_iam_roles(self):
elif "Name" in instance_profile:
instance_profile = instance_profile.get("Name")
arn = IamHelper(
self.logger, self.finding, False, self.sess, instance_profile
self.logger, self.finding, self.sess
).get_role_from_instance_profile(instance_profile)
iam_roles[arn] = {}

Expand Down
8 changes: 3 additions & 5 deletions lib/context/resources/ContextHelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@


class IamHelper:
def __init__(self, logger, finding, role, sess, instance_profile=False):
def __init__(self, logger, finding, sess):
self.logger = logger
self.finding = finding
self.region = finding["Region"]
self.account = finding["AwsAccountId"]
self.partition = finding["Resources"][0]["Id"].split(":")[1]
self.finding = finding
self.sess = sess
self.resource_arn = finding["Resources"][0]["Id"]
self.resource_id = finding["Resources"][0]["Id"].split("/")[1]
self.iam_client = get_boto3_client(self.logger, "iam", self.region, self.sess)
self.iam_client = get_boto3_client(self.logger, "iam", self.region, sess)

def get_role_from_instance_profile(self, instance_profile):
if "/" in instance_profile:
Expand Down
29 changes: 29 additions & 0 deletions lib/findings.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,35 @@ def evaluate_finding(
AwsAccountData[finding["AwsAccountId"]] = mh_account
else:
mh_account = AwsAccountData[finding["AwsAccountId"]]
# For fetching Organizations details, we need to be in the master account or a delegated administrator
if (
finding["AwsAccountId"] in AwsAccountData
and AwsAccountData[finding["AwsAccountId"]].get("Organizations")
and not AwsAccountData[finding["AwsAccountId"]]
.get("Organizations")
.get("Details")
):
# It is the master account or a delegated administrator
if finding["AwsAccountId"] == AwsAccountData[
finding["AwsAccountId"]
].get("Organizations").get("MasterAccountId") or finding[
"AwsAccountId"
] in AwsAccountData[
finding["AwsAccountId"]
].get(
"Organizations"
).get(
"DelegatedAdministrators"
):
organizations_details = context.get_account_organizations_details()
AwsAccountData[finding["AwsAccountId"]]["Organizations"][
"Details"
] = organizations_details
else:
AwsAccountData[finding["AwsAccountId"]]["Organizations"][
"Details"
] = "n/a"
mh_account = AwsAccountData[finding["AwsAccountId"]]
else:
mh_account = False
# If both Tags and Config matchs are True we show the resource
Expand Down
1 change: 1 addition & 0 deletions lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def get_parser():
default=[
"config",
"tags",
"account",
],
help="This option defines which actions MetaHub will execute to get the context of the affected resources. By default, MetaHub will execute config and tags actions. CloudTrail and Account are disabled by default as could be expensive to execute and requires non-standard iam actions policies. Check that before enabling them",
choices=[
Expand Down