Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Definitions serialization with new definitions key/module objects #4655

Merged
merged 12 commits into from
Mar 13, 2023
1 change: 1 addition & 0 deletions checkov/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from checkov.common.models import * # noqa
57 changes: 57 additions & 0 deletions checkov/common/util/json_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
import json
from copy import deepcopy
from json import JSONDecodeError
from typing import Any

from lark import Tree
Expand All @@ -8,6 +10,8 @@
from checkov.common.bridgecrew.severities import Severity
from checkov.common.output.common import ImageDetails
from checkov.common.packaging.version import LegacyVersion, Version
from checkov.terraform.modules.module_objects import TFModule, TFDefinitionKey
from checkov.common.util.consts import RESOLVED_MODULE_ENTRY_NAME

type_of_function = type(lambda x: x)

Expand All @@ -30,9 +34,62 @@ def default(self, o: Any) -> Any:
return o.__dict__
elif isinstance(o, type_of_function):
return str(o)
elif isinstance(o, TFDefinitionKey):
return str(o)
elif isinstance(o, TFModule):
return str(o)
else:
return json.JSONEncoder.default(self, o)

def encode(self, obj):
return super().encode(self._encode(obj))

def _encode(self, obj):
if isinstance(obj, dict):
return {self.encode_key(k): v for k, v in obj.items()}
else:
return obj

@staticmethod
def encode_key(key):
if isinstance(key, TFDefinitionKey):
return str(key)
if isinstance(key, TFModule):
return str(key)
else:
return key


def object_hook(dct):
tronxd marked this conversation as resolved.
Show resolved Hide resolved
tronxd marked this conversation as resolved.
Show resolved Hide resolved
try:
if dct is None:
return None
if isinstance(dct, dict):
dct_obj = deepcopy(dct)
if 'tf_source_modules' in dct and 'file_path' in dct:
return TFDefinitionKey(file_path=dct["file_path"],
tf_source_modules=object_hook(dct["tf_source_modules"]))
if 'path' in dct and 'name' in dct and 'foreach_idx' in dct and 'nested_tf_module' in dct:
tronxd marked this conversation as resolved.
Show resolved Hide resolved
return TFModule(path=dct['path'], name=dct['name'], foreach_idx=dct['foreach_idx'],
nested_tf_module=object_hook(dct['nested_tf_module']))
for key, value in dct.items():
if key == RESOLVED_MODULE_ENTRY_NAME:
resolved_classes = []
for resolved_module in dct[RESOLVED_MODULE_ENTRY_NAME]:
if isinstance(resolved_module, str):
resolved_classes.append(object_hook(json.loads(resolved_module)))
dct_obj[RESOLVED_MODULE_ENTRY_NAME] = resolved_classes
if isinstance(key, str) and 'tf_source_modules' in key and 'file_path' in key:
tf_definition_key = json.loads(key)
tf_definition_key_obj = TFDefinitionKey(file_path=tf_definition_key["file_path"], tf_source_modules=object_hook(
tf_definition_key["tf_source_modules"]))
dct_obj[tf_definition_key_obj] = value
del dct_obj[key]
return dct_obj
return dct
except (KeyError, TypeError, JSONDecodeError):
return dct


def get_jsonpath_from_evaluated_key(evaluated_key: str) -> JSONPath:
evaluated_key = evaluated_key.replace("/", ".")
Expand Down
46 changes: 43 additions & 3 deletions checkov/terraform/modules/module_objects.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

import json
from typing import Optional, Any


class TFModule:
__slots__ = ("path", "name", "foreach_idx", "nested_tf_module")

def __init__(self, path: str, name: str, nested_tf_module: Optional[TFModule] = None, foreach_idx: Optional[int | str] = None) -> None:
def __init__(self, path: str, name: str, nested_tf_module: Optional[TFModule] = None,
foreach_idx: Optional[int | str] = None) -> None:
self.path = path
self.name = name
self.foreach_idx = foreach_idx
Expand All @@ -20,14 +21,32 @@ def __eq__(self, other: Any) -> bool:
def __lt__(self, other: Any) -> bool:
if not isinstance(other, TFModule):
return False
return (self.path, self.name, self.nested_tf_module, self.foreach_idx) < (other.path, other.name, other.nested_tf_module, other.foreach_idx)
return (self.path, self.name, self.nested_tf_module, self.foreach_idx) < (
other.path, other.name, other.nested_tf_module, other.foreach_idx)

def __repr__(self) -> str:
return f'path:{self.path}, name:{self.name}, nested_tf_module:{self.nested_tf_module}, foreach_idx:{self.foreach_idx}'

def __hash__(self) -> int:
return hash((self.path, self.name, self.nested_tf_module, self.foreach_idx))

def __iter__(self):
yield from {
"path": self.path,
"name": self.name,
"foreach_idx": self.foreach_idx,
"nested_tf_module": dict(self.nested_tf_module) if self.nested_tf_module else None
}.items()

def __str__(self):
from checkov.common.util.json_utils import CustomJSONEncoder
return json.dumps(dict(self), cls=CustomJSONEncoder)

@staticmethod
def from_json(json_dct):
return TFModule(path=json_dct['path'], name=json_dct['name'], foreach_idx=json_dct['foreach_idx'],
nested_tf_module=TFModule.from_json(json_dct['nested_tf_module']))


class TFDefinitionKey:
__slots__ = ("tf_source_modules", "file_path")
Expand All @@ -51,3 +70,24 @@ def __repr__(self) -> str:

def __hash__(self) -> int:
return hash((self.file_path, self.tf_source_modules))

def __iter__(self):
yield from {
"file_path": self.file_path,
"tf_source_modules": dict(self.tf_source_modules) if self.tf_source_modules else None
}.items()

def __str__(self):
from checkov.common.util.json_utils import CustomJSONEncoder
return json.dumps(self.to_json(), cls=CustomJSONEncoder)

def to_json(self):
to_return = {"file_path": self.file_path, "tf_source_modules": None}
if self.tf_source_modules:
to_return["tf_source_modules"] = dict(self.tf_source_modules)
return to_return

@staticmethod
def from_json(json_dct):
return TFDefinitionKey(file_path=json_dct['file_path'],
tf_source_modules=TFModule.from_json(json_dct['tf_source_modules']))
4 changes: 2 additions & 2 deletions checkov/terraform/modules/module_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import re

from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR
from checkov.common.util.json_utils import CustomJSONEncoder
from checkov.common.util.json_utils import CustomJSONEncoder, object_hook

if TYPE_CHECKING:
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -285,4 +285,4 @@ def clean_parser_types_lst(values: list[Any]) -> list[Any]:


def serialize_definitions(tf_definitions: dict[str, _Hcl2Payload]) -> dict[str, _Hcl2Payload]:
return json.loads(json.dumps(tf_definitions, cls=CustomJSONEncoder))
return json.loads(json.dumps(tf_definitions, cls=CustomJSONEncoder), object_hook=object_hook)
29 changes: 28 additions & 1 deletion tests/terraform/parser/test_module.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
import os
import unittest
import shutil

import hcl2

from checkov.terraform.modules.module_utils import validate_malformed_definitions, clean_bad_definitions
from checkov.terraform.modules.module_utils import validate_malformed_definitions, clean_bad_definitions, \
clean_parser_types, serialize_definitions
from checkov.terraform.parser import Parser
from checkov.terraform.tf_parser import TFParser
from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR


class ModuleTest(unittest.TestCase):

def setUp(self) -> None:
from checkov.terraform.module_loading.registry import ModuleLoaderRegistry

# needs to be reset, because the cache belongs to the class not instance
ModuleLoaderRegistry.module_content_cache = {}

self.resources_dir = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "./resources"))
self.external_module_path = ''

def tearDown(self) -> None:
if os.path.exists(self.external_module_path):
shutil.rmtree(self.external_module_path)

def test_module_double_slash_cleanup(self):
with open(os.path.join(os.path.dirname(__file__), 'resources', 'double_slash.tf')) as f:
tf = hcl2.load(f)
Expand Down Expand Up @@ -41,3 +59,12 @@ def test_module_double_slash_cleanup_string(self):
print(module)
self.assertEqual(1, len(module.blocks))
self.assertEqual('ingress.annotations.kubernetes\\.io/ingress\\.class', module.blocks[0].attributes['set.name'])

def test_parse_hcl_module_serialize_definitions(self):
parser = TFParser()
directory = os.path.join(self.resources_dir, "parser_nested_modules")
self.external_module_path = os.path.join(directory, DEFAULT_EXTERNAL_MODULES_DIR)
tf_definitions = parser.parse_directory(directory=directory, out_evaluations_context={})
tf_definitions = clean_parser_types(tf_definitions)
tf_definitions_encoded = serialize_definitions(tf_definitions)
self.assertEqual(tf_definitions_encoded, tf_definitions)
1 change: 0 additions & 1 deletion tests/terraform/parser/test_new_parser_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import shutil
import unittest
from pathlib import Path
from unittest import mock

import pytest

Expand Down