Skip to content

Commit

Permalink
Implement filters in CrossAccount Rules + Refactor of these inherit c…
Browse files Browse the repository at this point in the history
…lasses (#111)

* implement_filters_in_crossaccount: refactor + add support for filters

* implement_filters_in_crossaccount: update docs

* implement_filters_in_crossaccount: add tests

* implement_filters_in_crossaccount: update changelog and version
  • Loading branch information
oscarbc96 committed Mar 27, 2020
1 parent da12775 commit 20c7c4b
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 32 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.

## [0.17.0] - 2020-03-27
### Improvements
- `CrossAccountCheckingRule`, `CrossAccountTrustRule`, `S3CrossAccountTrustRule` and `KMSKeyCrossAccountTrustRule` include support for filters.
### Breaking changes
- `CrossAccountCheckingRule` now includes the invoke method. Statements of PolicyDocument are now analysed using `RESOURCE_TYPE` and `PROPERTY_WITH_POLICYDOCUMENT` class variables.

## [0.16.0] - 2020-03-27
### Improvements
- Add new `RuleConfig`, allows to overwrite the default behaviour of the rule changing rule mode and risk value.
Expand Down
2 changes: 1 addition & 1 deletion cfripper/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
VERSION = (0, 16, 0)
VERSION = (0, 17, 0)

__version__ = ".".join(map(str, VERSION))
96 changes: 67 additions & 29 deletions cfripper/rules/cross_account_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
]

import logging
import re
from typing import Dict, Optional, Set

from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.iam_role import IAMRole
from pycfmodel.model.resources.kms_key import KMSKey
from pycfmodel.model.resources.properties.statement import Statement
from pycfmodel.model.resources.resource import Resource
from pycfmodel.model.resources.s3_bucket_policy import S3BucketPolicy

from cfripper.config.regex import REGEX_CROSS_ACCOUNT_ROOT
from cfripper.model.enums import RuleGranularity, RuleMode
from cfripper.model.result import Result
from cfripper.model.utils import get_account_id_from_principal
Expand All @@ -45,6 +44,8 @@ class CrossAccountCheckingRule(PrincipalCheckingRule):
"""

GRANULARITY = RuleGranularity.RESOURCE
RESOURCE_TYPE: Resource
PROPERTY_WITH_POLICYDOCUMENT: str

@property
def valid_principals(self) -> Set[str]:
Expand All @@ -54,11 +55,31 @@ def valid_principals(self) -> Set[str]:
self._valid_principals.add(self._config.aws_account_id)
return self._valid_principals

def _do_statement_check(self, result: Result, logical_id: str, statement: Statement):
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, self.RESOURCE_TYPE):
properties = resource.Properties
policy_document = getattr(properties, self.PROPERTY_WITH_POLICYDOCUMENT)
for statement in policy_document._statement_as_list():
filters_available_context = {
"config": self._config,
"extras": extras,
"logical_id": logical_id,
"resource": resource,
"statement": statement,
}
self._do_statement_check(result, logical_id, statement, filters_available_context)
return result

def _do_statement_check(
self, result: Result, logical_id: str, statement: Statement, filters_available_context: Dict
):
if statement.Effect == "Allow":
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
filters_available_context["principal"] = principal
filters_available_context["account_id"] = account_id
if (
# checks if principal is a canonical id and is whitelisted
principal not in self.valid_principals
Expand Down Expand Up @@ -86,7 +107,10 @@ def _do_statement_check(self, result: Result, logical_id: str, statement: Statem
)
else:
self.add_failure_to_result(
result, self.REASON.format(logical_id, principal), resource_ids={logical_id},
result,
self.REASON.format(logical_id, principal),
resource_ids={logical_id},
context=filters_available_context,
)


Expand All @@ -101,18 +125,22 @@ class CrossAccountTrustRule(CrossAccountCheckingRule):
Fix:
If cross account permissions are required, the stack should be added to the whitelist for this rule.
Otherwise, the access should be removed from the CloudFormation definition.
Filters context:
| Parameter | Type | Description |
|:-----------:|:-----------:|:--------------------------------------------------------------:|
|`config` | str | `config` variable available inside the rule |
|`extras` | str | `extras` variable available inside the rule |
|`logical_id` | str | ID used in Cloudformation to refer the resource being analysed |
|`resource` | `IAMRole` | Resource that is being addressed |
|`statement` | `Statement` | Statement being checked found in the Resource |
|`principal` | `str` | AWS Principal being checked found in the statement |
|`account_id` | `str` | Account ID found in the principal |
"""

REASON = "{} has forbidden cross-account trust relationship with {}"
ROOT_PATTERN = re.compile(REGEX_CROSS_ACCOUNT_ROOT)

def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
for statement in resource.Properties.AssumeRolePolicyDocument._statement_as_list():
self._do_statement_check(result, logical_id, statement)
return result
RESOURCE_TYPE = IAMRole
PROPERTY_WITH_POLICYDOCUMENT = "AssumeRolePolicyDocument"


class S3CrossAccountTrustRule(CrossAccountCheckingRule):
Expand All @@ -125,17 +153,22 @@ class S3CrossAccountTrustRule(CrossAccountCheckingRule):
Fix:
If cross account permissions are required for S3 access, the stack should be added to the whitelist for this rule.
Otherwise, the access should be removed from the CloudFormation definition.
Filters context:
| Parameter | Type | Description |
|:-----------:|:----------------:|:--------------------------------------------------------------:|
|`config` | str | `config` variable available inside the rule |
|`extras` | str | `extras` variable available inside the rule |
|`logical_id` | str | ID used in Cloudformation to refer the resource being analysed |
|`resource` | `S3BucketPolicy` | Resource that is being addressed |
|`statement` | `Statement` | Statement being checked found in the Resource |
|`principal` | `str` | AWS Principal being checked found in the statement |
|`account_id` | `str` | Account ID found in the principal |
"""

REASON = "{} has forbidden cross-account policy allow with {} for an S3 bucket."

def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
self._do_statement_check(result, logical_id, statement)
return result
RESOURCE_TYPE = S3BucketPolicy
PROPERTY_WITH_POLICYDOCUMENT = "PolicyDocument"


class KMSKeyCrossAccountTrustRule(CrossAccountCheckingRule):
Expand All @@ -148,14 +181,19 @@ class KMSKeyCrossAccountTrustRule(CrossAccountCheckingRule):
Fix:
If cross account permissions are required for KMS access, the stack should be added to the whitelist for this rule.
Otherwise, the access should be removed from the CloudFormation definition.
Filters context:
| Parameter | Type | Description |
|:-----------:|:-----------:|:--------------------------------------------------------------:|
|`config` | str | `config` variable available inside the rule |
|`extras` | str | `extras` variable available inside the rule |
|`logical_id` | str | ID used in Cloudformation to refer the resource being analysed |
|`resource` | `KMSKey` | Resource that is being addressed |
|`statement` | `Statement` | Statement being checked found in the Resource |
|`principal` | `str` | AWS Principal being checked found in the statement |
|`account_id` | `str` | Account ID found in the principal |
"""

REASON = "{} has forbidden cross-account policy allow with {} for an KMS Key Policy"

def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, KMSKey):
for statement in resource.Properties.KeyPolicy._statement_as_list():
self._do_statement_check(result, logical_id, statement)
return result
RESOURCE_TYPE = KMSKey
PROPERTY_WITH_POLICYDOCUMENT = "KeyPolicy"
2 changes: 1 addition & 1 deletion docs/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def cfripper_rules():

else:
content += f"\n#### {paragraph_title}\n"
content += paragraph_text
content += f"{paragraph_text}\n"

results.append((klass.__name__, content))

Expand Down
5 changes: 4 additions & 1 deletion docs/rule_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ The object accepts a reason parameter to say why that filter exists.
!!! warning
Only available for the following rules:

- Not supported yet
- CrossAccountCheckingRule
- CrossAccountTrustRule
- S3CrossAccountTrustRule
- KMSKeyCrossAccountTrustRule

### Filter preference

Expand Down
71 changes: 71 additions & 0 deletions tests/rules/test_CrossAccountTrustRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import pytest

from cfripper.config.config import Config
from cfripper.config.filter import Filter
from cfripper.config.rule_config import RuleConfig
from cfripper.model.enums import RuleGranularity, RuleMode, RuleRisk
from cfripper.model.result import Failure
from cfripper.rule_processor import RuleProcessor
Expand Down Expand Up @@ -121,6 +123,35 @@ def test_resource_whitelisting_works_as_expected(template_two_roles_dict, expect
assert result.failed_rules[0] == expected_result_two_roles[-1]


def test_filter_works_as_expected(template_two_roles_dict, expected_result_two_roles):
config = Config(
rules=["CrossAccountTrustRule"],
aws_account_id="123456789",
stack_name="mockstack",
rules_config={
"CrossAccountTrustRule": RuleConfig(
filters=[
Filter(
rule_mode=RuleMode.WHITELISTED,
eval={
"and": [
{"eq": [{"ref": "config.stack_name"}, "mockstack"]},
{"eq": [{"ref": "logical_id"}, "RootRoleOne"]},
]
},
)
],
)
},
)
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, config)

assert not result.valid
assert result.failed_rules[0] == expected_result_two_roles[-1]


def test_whitelisted_stacks_do_not_report_anything(template_two_roles_dict):
mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
mock_config = Config(
Expand All @@ -136,6 +167,26 @@ def test_whitelisted_stacks_do_not_report_anything(template_two_roles_dict):
assert result.valid


def test_filter_do_not_report_anything(template_two_roles_dict):
mock_config = Config(
rules=["CrossAccountTrustRule"],
aws_account_id="123456789",
stack_name="mockstack",
rules_config={
"CrossAccountTrustRule": RuleConfig(
filters=[
Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "mockstack"]})
],
)
},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, mock_config)

assert result.valid


def test_non_whitelisted_stacks_are_reported_normally(template_two_roles_dict, expected_result_two_roles):
mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
mock_config = Config(
Expand All @@ -151,6 +202,26 @@ def test_non_whitelisted_stacks_are_reported_normally(template_two_roles_dict, e
assert result.failed_rules == expected_result_two_roles


def test_non_matching_filters_are_reported_normally(template_two_roles_dict, expected_result_two_roles):
mock_config = Config(
rules=["CrossAccountTrustRule"],
aws_account_id="123456789",
stack_name="mockstack",
rules_config={
"CrossAccountTrustRule": RuleConfig(
filters=[
Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "anotherstack"]})
],
)
},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, mock_config)
assert not result.valid
assert result.failed_rules == expected_result_two_roles


def test_service_is_not_blocked(template_valid_with_service):
rule = CrossAccountTrustRule(Config())
result = rule.invoke(template_valid_with_service)
Expand Down

0 comments on commit 20c7c4b

Please sign in to comment.