From 9a2658d7ea165666df445e4906893ea55c8bdbe8 Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 23 Feb 2024 12:28:43 -0800 Subject: [PATCH 1/6] Add Condition exceptions --- .../resource/aws/S3AllowsAnyPrincipal.py | 20 ++ .../aws/example_S3AllowsAnyPrincipal/main.tf | 205 ++++++++++++++++++ .../resource/aws/test_S3AllowsAnyPrincipal.py | 14 +- 3 files changed, 235 insertions(+), 4 deletions(-) diff --git a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py index ae04de38b3a..b4909fd2a8b 100644 --- a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py +++ b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py @@ -1,4 +1,5 @@ from json import JSONDecodeError +import re from checkov.common.models.enums import CheckResult, CheckCategories from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck @@ -41,8 +42,27 @@ def scan_resource_conf(self, conf): # Can be a string or an array of strings aws = statement['Principal']['AWS'] if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): + if 'Condition' not in statement: + return CheckResult.FAILED + elif 'ArnNotEquals' in statement['Condition'] or 'ArnNotLike' in statement['Condition']: + return CheckResult.PASSED + elif 'ArnEquals' in statement['Condition'] and 'aws:PrincipalArn' in \ + statement['Condition']['ArnEquals']: + pattern = r'^arn:aws:iam::\*.*$' + principal_arn = statement['Condition']['ArnEquals']['aws:PrincipalArn'] + if re.match(pattern, principal_arn): + return CheckResult.FAILED + return CheckResult.PASSED + elif 'ArnLike' in statement['Condition'] and 'aws:PrincipalArn' \ + in statement['Condition']['ArnLike']: + pattern = r'^arn:aws:iam::\*.*$' + principal_arn = statement['Condition']['ArnLike']['aws:PrincipalArn'] + if re.match(pattern, principal_arn): + return CheckResult.FAILED + return CheckResult.PASSED return CheckResult.FAILED + return CheckResult.PASSED def get_evaluated_keys(self) -> List[str]: diff --git a/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf b/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf index ed0013d67f4..44ea6b91739 100644 --- a/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf +++ b/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf @@ -183,3 +183,208 @@ data "aws_iam_policy_document" "test" { resources = ["${aws_s3_bucket.b.arn}/*"] } } + + +resource "aws_s3_bucket_policy" "pass_w_condition" { + bucket = "bucket" + + policy = < Date: Fri, 23 Feb 2024 13:50:49 -0800 Subject: [PATCH 2/6] Refactor --- .../resource/aws/S3AllowsAnyPrincipal.py | 59 +++++++++------ .../aws/example_S3AllowsAnyPrincipal/main.tf | 71 +++++++++++++++++++ .../resource/aws/test_S3AllowsAnyPrincipal.py | 3 + 3 files changed, 113 insertions(+), 20 deletions(-) diff --git a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py index b4909fd2a8b..eb1fc7e8984 100644 --- a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py +++ b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py @@ -8,6 +8,43 @@ from typing import List +def check_conditions(statement): + # Check if 'Condition' key exists + if 'Condition' not in statement: + return CheckResult.FAILED + + condition = statement['Condition'] + + # Direct pass conditions based on not keys + if any(key in condition for key in ['ArnNotEquals', 'ArnNotLike']): + return CheckResult.PASSED + + # Handling 'ArnEquals' and 'ArnLike' + for arn_key in ['ArnEquals', 'ArnLike']: + if arn_key in condition: + # Pass unless it is for all IAM ARNs + for principal_key in ['aws:PrincipalArn', 'aws:SourceArn']: + if principal_key in condition[arn_key]: + principal_arn = condition[arn_key][principal_key] + # Fail if the Condition is for all IAM ARNs + if re.match(r'^arn:aws:iam::\*.*$', principal_arn): + return CheckResult.FAILED + # Passed if 'aws:PrincipalArn' or 'aws:SourceArn' do not match because then they are specific + return CheckResult.PASSED + + # Handle VPC sources. Other sources not specific enough + string_conditions = ['StringEquals', 'StringNotEquals', 'StringEqualsIgnoreCase', + 'StringNotEqualsIgnoreCase', 'StringLike', 'StringNotLike'] + if any(condition_type in condition for condition_type in string_conditions): + for condition_type in string_conditions: + if condition_type in condition: + if any(source in condition[condition_type] for source in ['aws:sourceVpce', 'aws:SourceVpc']): + return CheckResult.PASSED + + # Default fail if none of the above conditions are met + return CheckResult.FAILED + + class S3AllowsAnyPrincipal(BaseResourceCheck): def __init__(self): @@ -37,30 +74,12 @@ def scan_resource_conf(self, conf): continue principal = statement['Principal'] if principal == '*': - return CheckResult.FAILED + return check_conditions(statement) if 'AWS' in statement['Principal']: # Can be a string or an array of strings aws = statement['Principal']['AWS'] if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): - if 'Condition' not in statement: - return CheckResult.FAILED - elif 'ArnNotEquals' in statement['Condition'] or 'ArnNotLike' in statement['Condition']: - return CheckResult.PASSED - elif 'ArnEquals' in statement['Condition'] and 'aws:PrincipalArn' in \ - statement['Condition']['ArnEquals']: - pattern = r'^arn:aws:iam::\*.*$' - principal_arn = statement['Condition']['ArnEquals']['aws:PrincipalArn'] - if re.match(pattern, principal_arn): - return CheckResult.FAILED - return CheckResult.PASSED - elif 'ArnLike' in statement['Condition'] and 'aws:PrincipalArn' \ - in statement['Condition']['ArnLike']: - pattern = r'^arn:aws:iam::\*.*$' - principal_arn = statement['Condition']['ArnLike']['aws:PrincipalArn'] - if re.match(pattern, principal_arn): - return CheckResult.FAILED - return CheckResult.PASSED - return CheckResult.FAILED + return check_conditions(statement) return CheckResult.PASSED diff --git a/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf b/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf index 44ea6b91739..925323e004c 100644 --- a/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf +++ b/tests/terraform/checks/resource/aws/example_S3AllowsAnyPrincipal/main.tf @@ -388,3 +388,74 @@ resource "aws_s3_bucket_policy" "fail_w_condition" { } POLICY } + +resource "aws_s3_bucket" "pass_w_condition3" { + bucket = "bucket" + + policy = < Date: Fri, 23 Feb 2024 14:10:26 -0800 Subject: [PATCH 3/6] Update S3AllowsAnyPrincipal.py --- .../checks/resource/aws/S3AllowsAnyPrincipal.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py index eb1fc7e8984..33274fc6133 100644 --- a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py +++ b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py @@ -15,7 +15,7 @@ def check_conditions(statement): condition = statement['Condition'] - # Direct pass conditions based on not keys + # Pass if they define bad ARNs. Assumes they are not too narrow if any(key in condition for key in ['ArnNotEquals', 'ArnNotLike']): return CheckResult.PASSED @@ -26,15 +26,15 @@ def check_conditions(statement): for principal_key in ['aws:PrincipalArn', 'aws:SourceArn']: if principal_key in condition[arn_key]: principal_arn = condition[arn_key][principal_key] - # Fail if the Condition is for all IAM ARNs - if re.match(r'^arn:aws:iam::\*.*$', principal_arn): + # Fail if the Condition is for all ARNs of any resource + if re.match(r'^arn:aws:[a-z0-9-]+::\*.*$', principal_arn): return CheckResult.FAILED # Passed if 'aws:PrincipalArn' or 'aws:SourceArn' do not match because then they are specific return CheckResult.PASSED # Handle VPC sources. Other sources not specific enough - string_conditions = ['StringEquals', 'StringNotEquals', 'StringEqualsIgnoreCase', - 'StringNotEqualsIgnoreCase', 'StringLike', 'StringNotLike'] + # Leaves out the NOT conditions as too broad ('StringNotEquals', 'StringNotEqualsIgnoreCase', 'StringNotLike') + string_conditions = ['StringEquals', 'StringEqualsIgnoreCase', 'StringLike'] if any(condition_type in condition for condition_type in string_conditions): for condition_type in string_conditions: if condition_type in condition: From 5e5ca8e0ff1db1ef842985643135fb69f6b9d15d Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 23 Feb 2024 14:18:27 -0800 Subject: [PATCH 4/6] Fix tests --- .../resource/aws/S3AllowsAnyPrincipal.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py index 33274fc6133..514ec7133d8 100644 --- a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py +++ b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py @@ -8,16 +8,16 @@ from typing import List -def check_conditions(statement): +def check_conditions(statement) -> bool: # Check if 'Condition' key exists if 'Condition' not in statement: - return CheckResult.FAILED + return False condition = statement['Condition'] # Pass if they define bad ARNs. Assumes they are not too narrow if any(key in condition for key in ['ArnNotEquals', 'ArnNotLike']): - return CheckResult.PASSED + return True # Handling 'ArnEquals' and 'ArnLike' for arn_key in ['ArnEquals', 'ArnLike']: @@ -28,9 +28,9 @@ def check_conditions(statement): principal_arn = condition[arn_key][principal_key] # Fail if the Condition is for all ARNs of any resource if re.match(r'^arn:aws:[a-z0-9-]+::\*.*$', principal_arn): - return CheckResult.FAILED + return False # Passed if 'aws:PrincipalArn' or 'aws:SourceArn' do not match because then they are specific - return CheckResult.PASSED + return True # Handle VPC sources. Other sources not specific enough # Leaves out the NOT conditions as too broad ('StringNotEquals', 'StringNotEqualsIgnoreCase', 'StringNotLike') @@ -39,15 +39,15 @@ def check_conditions(statement): for condition_type in string_conditions: if condition_type in condition: if any(source in condition[condition_type] for source in ['aws:sourceVpce', 'aws:SourceVpc']): - return CheckResult.PASSED + return True # Default fail if none of the above conditions are met - return CheckResult.FAILED + return False class S3AllowsAnyPrincipal(BaseResourceCheck): - def __init__(self): + def __init__(self) -> None: name = "Ensure S3 bucket does not allow an action with any Principal" id = "CKV_AWS_70" supported_resources = ['aws_s3_bucket', 'aws_s3_bucket_policy'] @@ -74,14 +74,17 @@ def scan_resource_conf(self, conf): continue principal = statement['Principal'] if principal == '*': - return check_conditions(statement) + if check_conditions(statement): + return CheckResult.PASSED + CheckResult.FAILED if 'AWS' in statement['Principal']: # Can be a string or an array of strings aws = statement['Principal']['AWS'] if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): - return check_conditions(statement) - - + if check_conditions(statement): + return CheckResult.PASSED + CheckResult.FAILED + return CheckResult.PASSED def get_evaluated_keys(self) -> List[str]: From a55617c78fb3f61b1ecf3e131f48e1b689f584fa Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 23 Feb 2024 14:31:17 -0800 Subject: [PATCH 5/6] Fix bug --- .../terraform/checks/resource/aws/S3AllowsAnyPrincipal.py | 5 ++--- .../checks/resource/aws/test_S3AllowsAnyPrincipal.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py index 514ec7133d8..42cb441b385 100644 --- a/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py +++ b/checkov/terraform/checks/resource/aws/S3AllowsAnyPrincipal.py @@ -76,15 +76,14 @@ def scan_resource_conf(self, conf): if principal == '*': if check_conditions(statement): return CheckResult.PASSED - CheckResult.FAILED + return CheckResult.FAILED if 'AWS' in statement['Principal']: # Can be a string or an array of strings aws = statement['Principal']['AWS'] if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): if check_conditions(statement): return CheckResult.PASSED - CheckResult.FAILED - + return CheckResult.FAILED return CheckResult.PASSED def get_evaluated_keys(self) -> List[str]: diff --git a/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py b/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py index 1a72b0107cc..f41fc5fcadb 100644 --- a/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py +++ b/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py @@ -41,8 +41,8 @@ def test(self): #self.assertEqual(summary["passed"], len(passing_resources)) #self.assertEqual(summary["failed"], len(failing_resources)) - #self.assertEqual(summary["skipped"], 0) - #self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) From aaa881c60e6a13b06e600c75a10d0d995b3e62ed Mon Sep 17 00:00:00 2001 From: Taylor Date: Fri, 23 Feb 2024 14:31:44 -0800 Subject: [PATCH 6/6] reopen tests --- .../checks/resource/aws/test_S3AllowsAnyPrincipal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py b/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py index f41fc5fcadb..3c84fcd0b0c 100644 --- a/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py +++ b/tests/terraform/checks/resource/aws/test_S3AllowsAnyPrincipal.py @@ -39,8 +39,8 @@ def test(self): passed_check_resources = set([c.resource for c in report.passed_checks]) failed_check_resources = set([c.resource for c in report.failed_checks]) - #self.assertEqual(summary["passed"], len(passing_resources)) - #self.assertEqual(summary["failed"], len(failing_resources)) + self.assertEqual(summary["passed"], len(passing_resources)) + self.assertEqual(summary["failed"], len(failing_resources)) self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0)