Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): support s3 bucket name for references in graph #6134

Merged
merged 12 commits into from
Apr 3, 2024
29 changes: 29 additions & 0 deletions checkov/terraform/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

MODULE_RESERVED_ATTRIBUTES = ("source", "version")
CROSS_VARIABLE_EDGE_PREFIX = '[cross-variable] '
S3_BUCKET_RESOURCE_NAME = "aws_s3_bucket"
S3_BUCKET_REFERENCE_ATTRIBUTE = "bucket"


class Undetermined(TypedDict):
Expand Down Expand Up @@ -94,6 +96,12 @@ def build_graph(self, render_variables: bool) -> None:
edges_count = len(self.edges)
self._build_cross_variable_edges()
logging.info(f"Found {len(self.edges) - edges_count} cross variable edges")
if strtobool(os.getenv("CHECKOV_EXPERIMENTAL_AWS_S3_NAME_REFERENCES", "True")):
# experimental flag on building S3 edges by name for terraform graph
logging.info("Building S3 edges name references")
edges_count = len(self.edges)
self._build_s3_name_reference_edges()
logging.info(f"Found {len(self.edges) - edges_count} S3 name references edges")
else:
self.update_vertices_fields()

Expand Down Expand Up @@ -323,6 +331,27 @@ def _build_cross_variable_edges(self) -> None:
modules = vertex.breadcrumbs.get(CustomAttributes.SOURCE_MODULE, [])
self._build_edges_for_vertex(origin_node_index, vertex, aliases, resources_types, True, modules)

def _build_s3_name_reference_edges(self) -> None:
resources_types = self.get_resources_types_in_graph()
if S3_BUCKET_RESOURCE_NAME not in resources_types:
return
bucket_mapping = {}
for origin_node_index, referenced_vertices in self.out_edges.items():
vertex = self.vertices[origin_node_index]
for referenced_vertice in referenced_vertices:
if referenced_vertice.label == S3_BUCKET_REFERENCE_ATTRIBUTE:
current = bucket_mapping.get(referenced_vertice.dest, {"bucket_resource_index": None, "referenced_vertices": list()})
if vertex.id.startswith(f"{S3_BUCKET_RESOURCE_NAME}."):
current["bucket_resource_index"] = origin_node_index
else:
current["referenced_vertices"] = current["referenced_vertices"] + [referenced_vertice]
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved
bucket_mapping[referenced_vertice.dest] = current

for destination, mapping in bucket_mapping.items():
if self.vertices[destination].block_type == BlockType.VARIABLE:
for reference_vertex in mapping["referenced_vertices"]:
self.create_edge(mapping["bucket_resource_index"], reference_vertex.origin, S3_BUCKET_REFERENCE_ATTRIBUTE, True)
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved

def create_edge(self, origin_vertex_index: int, dest_vertex_index: int, label: str,
cross_variable_edges: bool = False) -> bool:
if origin_vertex_index == dest_vertex_index:
Expand Down