Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): support s3 bucket name for references in graph #6134

Merged
merged 12 commits into from
Apr 3, 2024
32 changes: 32 additions & 0 deletions checkov/terraform/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

MODULE_RESERVED_ATTRIBUTES = ("source", "version")
CROSS_VARIABLE_EDGE_PREFIX = '[cross-variable] '
S3_BUCKET_RESOURCE_NAME = "aws_s3_bucket"
S3_BUCKET_REFERENCE_ATTRIBUTE = "bucket"


class Undetermined(TypedDict):
Expand Down Expand Up @@ -94,6 +96,12 @@ def build_graph(self, render_variables: bool) -> None:
edges_count = len(self.edges)
self._build_cross_variable_edges()
logging.info(f"Found {len(self.edges) - edges_count} cross variable edges")
if strtobool(os.getenv("CHECKOV_EXPERIMENTAL_AWS_S3_NAME_REFERENCES", "True")):
# experimental flag on building S3 edges by name for terraform graph
logging.info("Building S3 edges name references")
edges_count = len(self.edges)
self._build_s3_name_reference_edges()
logging.info(f"Found {len(self.edges) - edges_count} S3 name references edges")
else:
self.update_vertices_fields()

Expand Down Expand Up @@ -323,6 +331,30 @@ def _build_cross_variable_edges(self) -> None:
modules = vertex.breadcrumbs.get(CustomAttributes.SOURCE_MODULE, [])
self._build_edges_for_vertex(origin_node_index, vertex, aliases, resources_types, True, modules)

def _build_s3_name_reference_edges(self) -> None:
# Supporting reference by name of S3 bucket
resources_types = self.get_resources_types_in_graph()
if S3_BUCKET_RESOURCE_NAME not in resources_types:
return
# Find all the edges leading to S3 bucket and their references
s3_buckets_mapping = {}
for origin_node_index, referenced_vertices in self.out_edges.items():
vertex = self.vertices[origin_node_index]
for referenced_vertice in referenced_vertices:
if referenced_vertice.label == S3_BUCKET_REFERENCE_ATTRIBUTE:
current = s3_buckets_mapping.get(referenced_vertice.dest, {"bucket_resource_index": None, "referenced_vertices": list()})
if vertex.id.startswith(f"{S3_BUCKET_RESOURCE_NAME}."):
current["bucket_resource_index"] = origin_node_index
else:
current["referenced_vertices"] = current["referenced_vertices"] + [referenced_vertice]
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved
s3_buckets_mapping[referenced_vertice.dest] = current

# Create new edges of the found connections
for destination, mapping in s3_buckets_mapping.items():
if self.vertices[destination].block_type == BlockType.VARIABLE:
for reference_vertex in mapping["referenced_vertices"]:
self.create_edge(mapping["bucket_resource_index"], reference_vertex.origin, S3_BUCKET_REFERENCE_ATTRIBUTE, True)
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved

def create_edge(self, origin_vertex_index: int, dest_vertex_index: int, label: str,
cross_variable_edges: bool = False) -> bool:
if origin_vertex_index == dest_vertex_index:
Expand Down