diff --git a/checkov/common/__init__.py b/checkov/common/__init__.py index e69de29bb2d..98b75088f9a 100644 --- a/checkov/common/__init__.py +++ b/checkov/common/__init__.py @@ -0,0 +1 @@ +from checkov.common.models import * # noqa diff --git a/checkov/common/util/json_utils.py b/checkov/common/util/json_utils.py index 06ffaf4a2de..136980aca5c 100644 --- a/checkov/common/util/json_utils.py +++ b/checkov/common/util/json_utils.py @@ -1,6 +1,8 @@ import datetime import json -from typing import Any +from copy import deepcopy +from json import JSONDecodeError +from typing import Any, Dict from lark import Tree from bc_jsonpath_ng import parse, JSONPath @@ -14,6 +16,7 @@ class CustomJSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: + from checkov.terraform.modules.module_objects import TFModule, TFDefinitionKey if isinstance(o, set): return list(o) elif isinstance(o, Tree): @@ -30,9 +33,65 @@ def default(self, o: Any) -> Any: return o.__dict__ elif isinstance(o, type_of_function): return str(o) + elif isinstance(o, TFDefinitionKey): + return str(o) + elif isinstance(o, TFModule): + return str(o) else: return json.JSONEncoder.default(self, o) + def encode(self, obj: Any) -> str: + return super().encode(self._encode(obj)) + + def _encode(self, obj: Any) -> Any: + if isinstance(obj, dict): + return {self.encode_key(k): v for k, v in obj.items()} + else: + return obj + + @staticmethod + def encode_key(key: Any) -> Any: + from checkov.terraform.modules.module_objects import TFModule, TFDefinitionKey + if isinstance(key, TFDefinitionKey): + return str(key) + if isinstance(key, TFModule): + return str(key) + else: + return key + + +def object_hook(dct: Dict[Any, Any]) -> Any: + from checkov.terraform.modules.module_objects import TFModule, TFDefinitionKey + from checkov.common.util.consts import RESOLVED_MODULE_ENTRY_NAME + try: + if dct is None: + return None + if isinstance(dct, dict): + dct_obj = deepcopy(dct) + if 'tf_source_modules' in dct and 'file_path' in dct: + return TFDefinitionKey(file_path=dct["file_path"], + tf_source_modules=object_hook(dct["tf_source_modules"])) + if 'path' in dct and 'name' in dct and 'foreach_idx' in dct and 'nested_tf_module' in dct: + return TFModule(path=dct['path'], name=dct['name'], foreach_idx=dct['foreach_idx'], + nested_tf_module=object_hook(dct['nested_tf_module'])) + for key, value in dct.items(): + if key == RESOLVED_MODULE_ENTRY_NAME: + resolved_classes = [] + for resolved_module in dct[RESOLVED_MODULE_ENTRY_NAME]: + if isinstance(resolved_module, str): + resolved_classes.append(object_hook(json.loads(resolved_module))) + dct_obj[RESOLVED_MODULE_ENTRY_NAME] = resolved_classes + if isinstance(key, str) and 'tf_source_modules' in key and 'file_path' in key: + tf_definition_key = json.loads(key) + tf_definition_key_obj = TFDefinitionKey(file_path=tf_definition_key["file_path"], tf_source_modules=object_hook( + tf_definition_key["tf_source_modules"])) + dct_obj[tf_definition_key_obj] = value + del dct_obj[key] + return dct_obj + return dct + except (KeyError, TypeError, JSONDecodeError): + return dct + def get_jsonpath_from_evaluated_key(evaluated_key: str) -> JSONPath: evaluated_key = evaluated_key.replace("/", ".") diff --git a/checkov/terraform/modules/module_objects.py b/checkov/terraform/modules/module_objects.py index b8c230f7462..d893df90dce 100644 --- a/checkov/terraform/modules/module_objects.py +++ b/checkov/terraform/modules/module_objects.py @@ -1,12 +1,13 @@ from __future__ import annotations - +import json from typing import Optional, Any class TFModule: __slots__ = ("path", "name", "foreach_idx", "nested_tf_module") - def __init__(self, path: str, name: str, nested_tf_module: Optional[TFModule] = None, foreach_idx: Optional[int | str] = None) -> None: + def __init__(self, path: str, name: str, nested_tf_module: Optional[TFModule] = None, + foreach_idx: Optional[int | str] = None) -> None: self.path = path self.name = name self.foreach_idx = foreach_idx @@ -20,7 +21,8 @@ def __eq__(self, other: Any) -> bool: def __lt__(self, other: Any) -> bool: if not isinstance(other, TFModule): return False - return (self.path, self.name, self.nested_tf_module, self.foreach_idx) < (other.path, other.name, other.nested_tf_module, other.foreach_idx) + return (self.path, self.name, self.nested_tf_module, self.foreach_idx) < ( + other.path, other.name, other.nested_tf_module, other.foreach_idx) def __repr__(self) -> str: return f'path:{self.path}, name:{self.name}, nested_tf_module:{self.nested_tf_module}, foreach_idx:{self.foreach_idx}' @@ -28,6 +30,23 @@ def __repr__(self) -> str: def __hash__(self) -> int: return hash((self.path, self.name, self.nested_tf_module, self.foreach_idx)) + def __iter__(self): + yield from { + "path": self.path, + "name": self.name, + "foreach_idx": self.foreach_idx, + "nested_tf_module": dict(self.nested_tf_module) if self.nested_tf_module else None + }.items() + + def __str__(self): + from checkov.common.util.json_utils import CustomJSONEncoder + return json.dumps(dict(self), cls=CustomJSONEncoder) + + @staticmethod + def from_json(json_dct): + return TFModule(path=json_dct['path'], name=json_dct['name'], foreach_idx=json_dct['foreach_idx'], + nested_tf_module=TFModule.from_json(json_dct['nested_tf_module'])) + class TFDefinitionKey: __slots__ = ("tf_source_modules", "file_path") @@ -51,3 +70,24 @@ def __repr__(self) -> str: def __hash__(self) -> int: return hash((self.file_path, self.tf_source_modules)) + + def __iter__(self): + yield from { + "file_path": self.file_path, + "tf_source_modules": dict(self.tf_source_modules) if self.tf_source_modules else None + }.items() + + def __str__(self): + from checkov.common.util.json_utils import CustomJSONEncoder + return json.dumps(self.to_json(), cls=CustomJSONEncoder) + + def to_json(self): + to_return = {"file_path": self.file_path, "tf_source_modules": None} + if self.tf_source_modules: + to_return["tf_source_modules"] = dict(self.tf_source_modules) + return to_return + + @staticmethod + def from_json(json_dct): + return TFDefinitionKey(file_path=json_dct['file_path'], + tf_source_modules=TFModule.from_json(json_dct['tf_source_modules'])) diff --git a/checkov/terraform/modules/module_utils.py b/checkov/terraform/modules/module_utils.py index f68fee82b39..0101f8ecdb8 100644 --- a/checkov/terraform/modules/module_utils.py +++ b/checkov/terraform/modules/module_utils.py @@ -15,7 +15,7 @@ import re from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR -from checkov.common.util.json_utils import CustomJSONEncoder +from checkov.common.util.json_utils import CustomJSONEncoder, object_hook if TYPE_CHECKING: from typing_extensions import TypeAlias @@ -285,4 +285,4 @@ def clean_parser_types_lst(values: list[Any]) -> list[Any]: def serialize_definitions(tf_definitions: dict[str, _Hcl2Payload]) -> dict[str, _Hcl2Payload]: - return json.loads(json.dumps(tf_definitions, cls=CustomJSONEncoder)) + return json.loads(json.dumps(tf_definitions, cls=CustomJSONEncoder), object_hook=object_hook) diff --git a/tests/terraform/parser/test_module.py b/tests/terraform/parser/test_module.py index cf144812906..acbadb3e39b 100644 --- a/tests/terraform/parser/test_module.py +++ b/tests/terraform/parser/test_module.py @@ -1,13 +1,31 @@ import os import unittest +import shutil import hcl2 -from checkov.terraform.modules.module_utils import validate_malformed_definitions, clean_bad_definitions +from checkov.terraform.modules.module_utils import validate_malformed_definitions, clean_bad_definitions, \ + clean_parser_types, serialize_definitions from checkov.terraform.parser import Parser +from checkov.terraform.tf_parser import TFParser +from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR class ModuleTest(unittest.TestCase): + + def setUp(self) -> None: + from checkov.terraform.module_loading.registry import ModuleLoaderRegistry + + # needs to be reset, because the cache belongs to the class not instance + ModuleLoaderRegistry.module_content_cache = {} + + self.resources_dir = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "./resources")) + self.external_module_path = '' + + def tearDown(self) -> None: + if os.path.exists(self.external_module_path): + shutil.rmtree(self.external_module_path) + def test_module_double_slash_cleanup(self): with open(os.path.join(os.path.dirname(__file__), 'resources', 'double_slash.tf')) as f: tf = hcl2.load(f) @@ -41,3 +59,12 @@ def test_module_double_slash_cleanup_string(self): print(module) self.assertEqual(1, len(module.blocks)) self.assertEqual('ingress.annotations.kubernetes\\.io/ingress\\.class', module.blocks[0].attributes['set.name']) + + def test_parse_hcl_module_serialize_definitions(self): + parser = TFParser() + directory = os.path.join(self.resources_dir, "parser_nested_modules") + self.external_module_path = os.path.join(directory, DEFAULT_EXTERNAL_MODULES_DIR) + tf_definitions = parser.parse_directory(directory=directory, out_evaluations_context={}) + tf_definitions = clean_parser_types(tf_definitions) + tf_definitions_encoded = serialize_definitions(tf_definitions) + self.assertEqual(tf_definitions_encoded, tf_definitions) diff --git a/tests/terraform/parser/test_new_parser_modules.py b/tests/terraform/parser/test_new_parser_modules.py index 9278a21cde5..24077ae53e4 100644 --- a/tests/terraform/parser/test_new_parser_modules.py +++ b/tests/terraform/parser/test_new_parser_modules.py @@ -2,7 +2,6 @@ import shutil import unittest from pathlib import Path -from unittest import mock import pytest