-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'bridgecrewio:main' into ACRAnonymousPullDisabled
- Loading branch information
Showing
55 changed files
with
4,092 additions
and
3,314 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...ov/cloudformation/checks/graph_checks/SageMakerIAMPolicyOverlyPermissiveToAllTraffic.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
metadata: | ||
id: "CKV2_AWS_68" | ||
name: "Ensure SageMaker notebook instance IAM policy is not overly permissive" | ||
category: "NETWORKING" | ||
|
||
definition: | ||
and: | ||
- cond_type: filter | ||
value: | ||
- AWS::SageMaker::NotebookInstance | ||
operator: within | ||
attribute: resource_type | ||
- cond_type: connection | ||
resource_types: | ||
- AWS::SageMaker::NotebookInstance | ||
connected_resource_types: | ||
- AWS::IAM::Role | ||
operator: exists | ||
- cond_type: attribute | ||
resource_types: | ||
- AWS::IAM::Role | ||
attribute: "AssumeRolePolicyDocument.Statement[?(@.Effect == Allow)].Action[*]" | ||
operator: "jsonpath_not_equals" | ||
value: "*" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...kov/terraform/checks/graph_checks/aws/SageMakerIAMPolicyOverlyPermissiveToAllTraffic.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
metadata: | ||
id: "CKV2_AWS_68" | ||
name: "Ensure SageMaker notebook instance IAM policy is not overly permissive" | ||
category: "NETWORKING" | ||
|
||
definition: | ||
and: | ||
- cond_type: filter | ||
value: | ||
- aws_sagemaker_notebook_instance | ||
operator: within | ||
attribute: resource_type | ||
- cond_type: connection | ||
resource_types: | ||
- aws_sagemaker_notebook_instance | ||
connected_resource_types: | ||
- aws_iam_role | ||
operator: exists | ||
- cond_type: attribute | ||
resource_types: | ||
- aws_iam_role | ||
attribute: "policy.Statement[?(@.Effect == Allow)].Action[*]" | ||
operator: "jsonpath_not_equals" | ||
value: "*" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from __future__ import annotations | ||
|
||
from checkov.terraform.graph_builder.graph_components.block_types import BlockType | ||
from checkov.terraform.graph_builder.foreach.foreach_entity_handler import ForeachEntityHandler | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from checkov.terraform.graph_builder.local_graph import TerraformLocalGraph | ||
|
||
|
||
class ForeachDataHandler(ForeachEntityHandler): | ||
def __init__(self, local_graph: TerraformLocalGraph) -> None: | ||
super().__init__(local_graph, BlockType.DATA) |
108 changes: 108 additions & 0 deletions
108
checkov/terraform/graph_builder/foreach/foreach_entity_handler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import Any, Optional, TYPE_CHECKING | ||
|
||
from checkov.common.util.data_structures_utils import pickle_deepcopy | ||
from checkov.terraform.graph_builder.foreach.abstract_handler import ForeachAbstractHandler | ||
from checkov.terraform.graph_builder.foreach.consts import FOR_EACH_BLOCK_TYPE, FOREACH_STRING, COUNT_STRING | ||
from checkov.terraform.graph_builder.graph_components.blocks import TerraformBlock | ||
|
||
if TYPE_CHECKING: | ||
from checkov.terraform.graph_builder.local_graph import TerraformLocalGraph | ||
|
||
|
||
class ForeachEntityHandler(ForeachAbstractHandler): | ||
def __init__(self, local_graph: TerraformLocalGraph, block_type_to_handle: str) -> None: | ||
super().__init__(local_graph) | ||
self.block_type_to_handle = block_type_to_handle | ||
|
||
def handle(self, resources_blocks: list[int]) -> None: | ||
block_index_to_statement: FOR_EACH_BLOCK_TYPE = self._get_statements(resources_blocks) | ||
self._create_new_resources(block_index_to_statement) | ||
|
||
def _get_statements(self, resources_blocks: list[int]) -> FOR_EACH_BLOCK_TYPE: | ||
if not resources_blocks: | ||
return {} | ||
block_index_to_statement: FOR_EACH_BLOCK_TYPE = {} | ||
for block_index, block in enumerate(self.local_graph.vertices): | ||
if block.block_type != self.block_type_to_handle or not (FOREACH_STRING in block.attributes or COUNT_STRING in block.attributes): | ||
continue | ||
foreach_statement = self._get_static_foreach_statement(block_index) | ||
block_index_to_statement[block_index] = foreach_statement | ||
blocks_to_render = [block_idx for block_idx, statement in block_index_to_statement.items() if statement is None] | ||
if blocks_to_render: | ||
rendered_statements: FOR_EACH_BLOCK_TYPE = self._handle_dynamic_statement(blocks_to_render) | ||
block_index_to_statement.update(rendered_statements) | ||
return block_index_to_statement | ||
|
||
def _get_static_foreach_statement(self, block_index: int) -> Optional[list[str] | dict[str, Any] | int]: | ||
attributes = self.local_graph.vertices[block_index].attributes | ||
if not attributes.get(FOREACH_STRING) and not attributes.get(COUNT_STRING): | ||
return None | ||
try: | ||
if self._is_static_statement(block_index): | ||
return self._handle_static_statement(block_index) | ||
else: | ||
return None | ||
except Exception as e: | ||
logging.info( | ||
f"Cannot get foreach statement for block: {self.local_graph.vertices[block_index]}, error: {str(e)}") | ||
return None | ||
|
||
def _handle_dynamic_statement(self, blocks_to_render: list[int]) -> FOR_EACH_BLOCK_TYPE: | ||
rendered_statements_by_idx: FOR_EACH_BLOCK_TYPE = {} | ||
sub_graph = self._build_sub_graph(blocks_to_render) | ||
self._render_sub_graph(sub_graph, blocks_to_render) | ||
for block_idx in blocks_to_render: | ||
if not self._is_static_statement(block_idx, sub_graph): | ||
rendered_statements_by_idx[block_idx] = None | ||
else: | ||
rendered_statements_by_idx[block_idx] = self._handle_static_statement(block_idx, sub_graph) | ||
return rendered_statements_by_idx | ||
|
||
def _create_new_resources_count(self, statement: int, block_idx: int) -> None: | ||
main_resource = self.local_graph.vertices[block_idx] | ||
for i in range(statement): | ||
self._create_new_resource(main_resource, i, resource_idx=block_idx, foreach_idx=i) | ||
|
||
def _create_new_foreach_resource(self, block_idx: int, foreach_idx: int, main_resource: TerraformBlock, | ||
new_key: int | str, new_value: int | str) -> None: | ||
self._create_new_resource(main_resource, new_value, new_key=new_key, resource_idx=block_idx, foreach_idx=foreach_idx) | ||
|
||
def _create_new_resource( | ||
self, | ||
main_resource: TerraformBlock, | ||
new_value: int | str, | ||
resource_idx: int, | ||
foreach_idx: int, | ||
new_key: int | str | None = None, | ||
) -> None: | ||
new_resource = pickle_deepcopy(main_resource) | ||
block_type, block_name = new_resource.name.split('.') | ||
key_to_val_changes = self._build_key_to_val_changes(main_resource, new_value, new_key) | ||
config_attrs = new_resource.config.get(block_type, {}).get(block_name, {}) | ||
|
||
self._update_foreach_attrs(config_attrs, key_to_val_changes, new_resource) | ||
idx_to_change = new_key or new_value | ||
self._add_index_to_resource_block_properties(new_resource, idx_to_change) | ||
if foreach_idx == 0: | ||
self.local_graph.vertices[resource_idx] = new_resource | ||
else: | ||
self.local_graph.vertices.append(new_resource) | ||
|
||
@staticmethod | ||
def _add_index_to_resource_block_properties(block: TerraformBlock, idx: str | int) -> None: | ||
block_type, block_name = block.name.split('.') | ||
idx_with_separator = ForeachEntityHandler._update_block_name_and_id(block, idx) | ||
if block.config.get(block_type) and block.config.get(block_type, {}).get(block_name): | ||
block.config[block_type][f"{block_name}[{idx_with_separator}]"] = block.config[block_type].pop(block_name) | ||
|
||
def _create_new_resources(self, block_index_to_statement: FOR_EACH_BLOCK_TYPE) -> None: | ||
for block_idx, statement in block_index_to_statement.items(): | ||
if not statement: | ||
continue | ||
if isinstance(statement, int): | ||
self._create_new_resources_count(statement, block_idx) | ||
else: | ||
self._create_new_resources_foreach(statement, block_idx) |
109 changes: 7 additions & 102 deletions
109
checkov/terraform/graph_builder/foreach/resource_handler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,110 +1,15 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import typing | ||
from typing import Any, Optional | ||
|
||
from checkov.common.util.data_structures_utils import pickle_deepcopy | ||
from checkov.terraform.graph_builder.graph_components.block_types import BlockType | ||
from checkov.terraform.graph_builder.foreach.abstract_handler import ForeachAbstractHandler | ||
from checkov.terraform.graph_builder.foreach.consts import FOREACH_STRING, COUNT_STRING, FOR_EACH_BLOCK_TYPE | ||
from checkov.terraform.graph_builder.graph_components.blocks import TerraformBlock | ||
|
||
if typing.TYPE_CHECKING: | ||
from checkov.terraform.graph_builder.local_graph import TerraformLocalGraph | ||
|
||
|
||
class ForeachResourceHandler(ForeachAbstractHandler): | ||
def __init__(self, local_graph: TerraformLocalGraph) -> None: | ||
super().__init__(local_graph) | ||
|
||
def handle(self, resources_blocks: list[int]) -> None: | ||
block_index_to_statement: FOR_EACH_BLOCK_TYPE = self._get_statements(resources_blocks) | ||
self._create_new_resources(block_index_to_statement) | ||
from checkov.terraform.graph_builder.foreach.foreach_entity_handler import ForeachEntityHandler | ||
|
||
def _get_statements(self, resources_blocks: list[int]) -> FOR_EACH_BLOCK_TYPE: | ||
if not resources_blocks: | ||
return {} | ||
block_index_to_statement: FOR_EACH_BLOCK_TYPE = {} | ||
for block_index, block in enumerate(self.local_graph.vertices): | ||
if block.block_type != BlockType.RESOURCE or not (FOREACH_STRING in block.attributes or COUNT_STRING in block.attributes): | ||
continue | ||
foreach_statement = self._get_static_foreach_statement(block_index) | ||
block_index_to_statement[block_index] = foreach_statement | ||
blocks_to_render = [block_idx for block_idx, statement in block_index_to_statement.items() if statement is None] | ||
if blocks_to_render: | ||
rendered_statements: FOR_EACH_BLOCK_TYPE = self._handle_dynamic_statement(blocks_to_render) | ||
block_index_to_statement.update(rendered_statements) | ||
return block_index_to_statement | ||
from typing import TYPE_CHECKING | ||
|
||
def _get_static_foreach_statement(self, block_index: int) -> Optional[list[str] | dict[str, Any] | int]: | ||
attributes = self.local_graph.vertices[block_index].attributes | ||
if not attributes.get(FOREACH_STRING) and not attributes.get(COUNT_STRING): | ||
return None | ||
try: | ||
if self._is_static_statement(block_index): | ||
return self._handle_static_statement(block_index) | ||
else: | ||
return None | ||
except Exception as e: | ||
logging.info( | ||
f"Cant get foreach statement for block: {self.local_graph.vertices[block_index]}, error: {str(e)}") | ||
return None | ||
|
||
def _handle_dynamic_statement(self, blocks_to_render: list[int]) -> FOR_EACH_BLOCK_TYPE: | ||
rendered_statements_by_idx: FOR_EACH_BLOCK_TYPE = {} | ||
sub_graph = self._build_sub_graph(blocks_to_render) | ||
self._render_sub_graph(sub_graph, blocks_to_render) | ||
for block_idx in blocks_to_render: | ||
if not self._is_static_statement(block_idx, sub_graph): | ||
rendered_statements_by_idx[block_idx] = None | ||
else: | ||
rendered_statements_by_idx[block_idx] = self._handle_static_statement(block_idx, sub_graph) | ||
return rendered_statements_by_idx | ||
|
||
def _create_new_resources_count(self, statement: int, block_idx: int) -> None: | ||
main_resource = self.local_graph.vertices[block_idx] | ||
for i in range(statement): | ||
self._create_new_resource(main_resource, i, resource_idx=block_idx, foreach_idx=i) | ||
|
||
def _create_new_resource( | ||
self, | ||
main_resource: TerraformBlock, | ||
new_value: int | str, | ||
resource_idx: int, | ||
foreach_idx: int, | ||
new_key: int | str | None = None, | ||
) -> None: | ||
new_resource = pickle_deepcopy(main_resource) | ||
block_type, block_name = new_resource.name.split('.') | ||
key_to_val_changes = self._build_key_to_val_changes(main_resource, new_value, new_key) | ||
config_attrs = new_resource.config.get(block_type, {}).get(block_name, {}) | ||
|
||
self._update_foreach_attrs(config_attrs, key_to_val_changes, new_resource) | ||
idx_to_change = new_key or new_value | ||
self._add_index_to_resource_block_properties(new_resource, idx_to_change) | ||
if foreach_idx == 0: | ||
self.local_graph.vertices[resource_idx] = new_resource | ||
else: | ||
self.local_graph.vertices.append(new_resource) | ||
if TYPE_CHECKING: | ||
from checkov.terraform.graph_builder.local_graph import TerraformLocalGraph | ||
|
||
def _create_new_foreach_resource(self, block_idx: int, foreach_idx: int, main_resource: TerraformBlock, | ||
new_key: int | str, new_value: int | str) -> None: | ||
self._create_new_resource(main_resource, new_value, new_key=new_key, resource_idx=block_idx, | ||
foreach_idx=foreach_idx) | ||
|
||
@staticmethod | ||
def _add_index_to_resource_block_properties(block: TerraformBlock, idx: str | int) -> None: | ||
block_type, block_name = block.name.split('.') | ||
idx_with_separator = ForeachResourceHandler._update_block_name_and_id(block, idx) | ||
if block.config.get(block_type) and block.config.get(block_type, {}).get(block_name): | ||
block.config[block_type][f"{block_name}[{idx_with_separator}]"] = block.config[block_type].pop(block_name) | ||
class ForeachResourceHandler(ForeachEntityHandler): | ||
|
||
def _create_new_resources(self, block_index_to_statement: FOR_EACH_BLOCK_TYPE) -> None: | ||
for block_idx, statement in block_index_to_statement.items(): | ||
if not statement: | ||
continue | ||
if isinstance(statement, int): | ||
self._create_new_resources_count(statement, block_idx) | ||
else: | ||
self._create_new_resources_foreach(statement, block_idx) | ||
def __init__(self, local_graph: TerraformLocalGraph) -> None: | ||
super().__init__(local_graph, BlockType.RESOURCE) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.