Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(general): add policy metadata filter exception flag #6132

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, bc_integration: BcPlatformIntegration) -> None:
self.ckv_id_to_source_incident_id_mapping: dict[str, str] = {}
self.severity_key = 'severity'
self.filtered_policy_ids: list[str] = []
self.filtered_exception_policy_ids: list[str] = []

def is_valid(self) -> bool:
return (
Expand All @@ -39,7 +40,10 @@ def pre_scan(self) -> None:
if self.bc_integration.customer_run_config_response:
self._handle_customer_run_config(self.bc_integration.customer_run_config_response)
if self.bc_integration.is_prisma_integration():
self._handle_customer_prisma_policy_metadata(self.bc_integration.prisma_policies_response)
# build a list of policy ids included using the --prisma-metadata-filter flag
self._handle_customer_prisma_policy_metadata(self.bc_integration.prisma_policies_response, exclude_policies=False)
# build a list of policy ids excluded using the --prisma-metadata-filter-exception flag
self._handle_customer_prisma_policy_metadata(self.bc_integration.prisma_policies_exception_response, exclude_policies=True)
elif self.bc_integration.public_metadata_response:
self._handle_public_metadata(self.bc_integration.public_metadata_response)
else:
Expand Down Expand Up @@ -158,41 +162,51 @@ def _handle_customer_run_config(self, run_config: dict[str, Any]) -> None:
if source_incident_id:
self.ckv_id_to_source_incident_id_mapping[custom_policy['id']] = source_incident_id

def _handle_customer_prisma_policy_metadata(self, prisma_policy_metadata: list[dict[str, Any]]) -> None:
def _handle_customer_prisma_policy_metadata(self, prisma_policy_metadata: list[dict[str, Any]], exclude_policies: bool) -> None:
policy_ids = list()
if isinstance(prisma_policy_metadata, list):
for metadata in prisma_policy_metadata:
logging.debug(f"Parsing filtered_policy_ids from metadata: {json.dumps(metadata)}")
pc_id = metadata.get('policyId')
if pc_id:
ckv_id = self.get_ckv_id_from_pc_id(pc_id)
if ckv_id:
self.filtered_policy_ids.append(ckv_id)
self._add_ckv_id_for_filtered_cloned_checks()
policy_ids.append(ckv_id)
if exclude_policies:
self.filtered_exception_policy_ids = policy_ids
self._add_ckv_id_for_filtered_cloned_checks(self.filtered_exception_policy_ids, exclude_policies)
else:
self.filtered_policy_ids = policy_ids
self._add_ckv_id_for_filtered_cloned_checks(self.filtered_policy_ids, exclude_policies)

def _add_ckv_id_for_filtered_cloned_checks(self) -> None:
def _add_ckv_id_for_filtered_cloned_checks(self, policy_ids: list[str], exclude_policies: bool) -> None:
"""
Filtered checks are the policies that are returned by --policy-metadata-filter.
Filtered exclusion checks are the policies that are returned by --policy-metadata-filter-exclusion.
Cloned checks are policies that have modified metadata in Prisma (severity, title etc).
Filtered checks do not have a definition if they are cloned, instead they have a sourceIncidentId
which corresponds to the BC ID of the original source check.
This method adds the CKV ID for that source check to the list of filtered policies to ensure it is run.
Example:
Input:
filtered_policy_ids = [ "org_AWS_1609123441" ]
ckv_id_to_source_incident_id_mapping = { "org_AWS_1609123441": "BC__AWS_GENERAL_123" }
bc_id_to_ckv_id_mapping = { "BC__AWS_GENERAL_123": "CKV_AWS_123" }
ckv_id_to_source_incident_id_mapping = { "org_AWS_1609123441": "BC_AWS_GENERAL_123" }
bc_id_to_ckv_id_mapping = { "BC_AWS_GENERAL_123": "CKV_AWS_123" }
Output:
filtered_policy_ids = [ "org_AWS_1609123441", "CKV_AWS_123" ]
"""
ckv_ids = []
for policy_id in self.filtered_policy_ids:
for policy_id in policy_ids:
source_bc_id = self.get_source_incident_id_from_ckv_id(policy_id)
if not source_bc_id:
continue
ckv_id = self.get_ckv_id_from_bc_id(source_bc_id)
if not ckv_id:
continue
ckv_ids.append(ckv_id)
if exclude_policies:
self.filtered_exception_policy_ids += ckv_ids
return
self.filtered_policy_ids += ckv_ids

def pre_runner(self, runner: _BaseRunner) -> None:
Expand Down
22 changes: 13 additions & 9 deletions checkov/common/bridgecrew/platform_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def clean(self) -> None:
self.setup_api_urls()
self.customer_run_config_response = None
self.runtime_run_config_response = None
self.prisma_policies_response = None
self.prisma_policies_response: dict[str, str] | None = None
self.prisma_policies_exception_response: dict[str, str] | None = None
self.public_metadata_response = None
self.use_s3_integration = False
self.s3_setup_failed = False
Expand Down Expand Up @@ -1061,35 +1062,39 @@ def get_runtime_run_config(self) -> None:
except Exception:
logging.debug('could not get runtime info for this repo')

def get_prisma_build_policies(self, policy_filter: str) -> None:
def get_prisma_build_policies(self, policy_filter: str, policy_filter_exception: str) -> None:
"""
Get Prisma policy for enriching runConfig with metadata
Filters: https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policy-filters-and-options
:param policy_filter: comma separated filter string. Example, policy.label=A,cloud.type=aws
:param policy_filter_exception: comma separated filter string. Example, policy.label=A,cloud.type=aws
:return:
"""
if self.skip_download is True:
logging.debug("Skipping prisma policy API call")
return
if not policy_filter:
if not policy_filter and not policy_filter_exception:
return
if not self.is_prisma_integration():
return
if not self.bc_api_key or not self.is_integration_configured():
raise Exception(
"Tried to get prisma build policy metadata, "
"but the API key was missing or the integration was not set up")
self.prisma_policies_response = self.get_prisma_policies_for_filter(policy_filter)
self.prisma_policies_exception_response = self.get_prisma_policies_for_filter(policy_filter_exception)

def get_prisma_policies_for_filter(self, policy_filter: str) -> dict[Any, Any] | None:
request = None

filtered_policies = None
try:
token = self.get_auth_token()
headers = merge_dicts(get_prisma_auth_header(token), get_prisma_get_headers(), self.custom_auth_headers)

self.setup_http_manager()
if not self.http:
logging.error("HTTP manager was not correctly created")
return
return filtered_policies

logging.debug(f'Prisma policy URL: {self.prisma_policies_url}')
query_params = convert_prisma_policy_filter_to_dict(policy_filter)
Expand All @@ -1103,14 +1108,13 @@ def get_prisma_build_policies(self, policy_filter: str) -> None:
headers=headers,
fields=query_params,
)
self.prisma_policies_response = json.loads(request.data.decode("utf8"))
logging.debug("Got Prisma build policy metadata")
else:
logging.warning("Skipping get prisma build policies. --policy-metadata-filter will not be applied.")
filtered_policies = json.loads(request.data.decode("utf8"))
except Exception:
response_message = f': {request.status} - {request.reason}' if request else ''
logging.warning(
f"Failed to get prisma build policy metadata from {self.prisma_policies_url}{response_message}", exc_info=True)
return filtered_policies

def get_prisma_policy_filters(self) -> Dict[str, Dict[str, Any]]:
request = None
Expand Down Expand Up @@ -1162,7 +1166,7 @@ def is_valid_policy_filter(policy_filter: dict[str, str], valid_filters: dict[st
logging.warning(f"Filter value not allowed: {filter_value}")
logging.warning("Available options: True")
return False
logging.debug("--policy-metadata-filter is valid")
logging.debug("policy filter is valid")
return True

def get_public_run_config(self) -> None:
Expand Down
11 changes: 11 additions & 0 deletions checkov/common/util/ext_argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,17 @@ def add_parser_args(self) -> None:
self.add(
"--policy-metadata-filter",
help="comma separated key:value string to filter policies based on Prisma Cloud policy metadata. "
"When used with --policy-metadata-filter-exception, the exceptions override any policies selected as"
"a result of the --policy-metadata-filter flag."
"See https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policy-filters-and-options for "
"information on allowed filters. Format: policy.label=test,cloud.type=aws ",
default=None,
)
self.add(
"--policy-metadata-filter-exception",
help="comma separated key:value string to exclude filtered policies based on Prisma Cloud policy metadata. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description of what happens when you use both options (the subtraction logic), to this as well as to the other one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ✅

"When used with --policy-metadata-filter, the exceptions override any policies selected as"
"a result of the --policy-metadata-filter flag."
"See https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policy-filters-and-options for "
"information on allowed filters. Format: policy.label=test,cloud.type=aws ",
default=None,
Expand Down
14 changes: 10 additions & 4 deletions checkov/docs_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ def get_compare_key(c: list[str] | tuple[str, ...]) -> list[tuple[str, str, int,


def print_checks(frameworks: Optional[List[str]] = None, use_bc_ids: bool = False,
include_all_checkov_policies: bool = True, filtered_policy_ids: Optional[List[str]] = None) -> None:
include_all_checkov_policies: bool = True, filtered_policy_ids: Optional[List[str]] = None,
filtered_exception_policy_ids: Optional[List[str]] = None) -> None:
framework_list = frameworks if frameworks else ["all"]
printable_checks_list = get_checks(framework_list, use_bc_ids=use_bc_ids,
include_all_checkov_policies=include_all_checkov_policies,
filtered_policy_ids=filtered_policy_ids or [])
filtered_policy_ids=filtered_policy_ids or [],
filtered_exception_policy_ids=filtered_exception_policy_ids or [])
print(
tabulate(printable_checks_list, headers=["Id", "Type", "Entity", "Policy", "IaC", "Resource Link"], tablefmt="github",
showindex=True))
Expand All @@ -83,11 +85,15 @@ def get_check_link(absolute_path: str) -> str:


def get_checks(frameworks: Optional[List[str]] = None, use_bc_ids: bool = False,
include_all_checkov_policies: bool = True, filtered_policy_ids: Optional[List[str]] = None) -> List[Tuple[str, str, int, int, str, str]]:
include_all_checkov_policies: bool = True, filtered_policy_ids: Optional[List[str]] = None,
filtered_exception_policy_ids: Optional[List[str]] = None) -> List[Tuple[str, str, int, int, str, str]]:
framework_list = frameworks if frameworks else ["all"]
printable_checks_list: list[tuple[str, str, str, str, str, str]] = []
filtered_policy_ids = filtered_policy_ids or []
runner_filter = RunnerFilter(include_all_checkov_policies=include_all_checkov_policies, filtered_policy_ids=filtered_policy_ids)
filtered_exception_policy_ids = filtered_exception_policy_ids or []
runner_filter = RunnerFilter(include_all_checkov_policies=include_all_checkov_policies,
filtered_policy_ids=filtered_policy_ids,
filtered_exception_policy_ids=filtered_exception_policy_ids)

def add_from_repository(registry: Union[BaseCheckRegistry, BaseGraphRegistry], checked_type: str, iac: str,
runner_filter: RunnerFilter = runner_filter) -> None:
Expand Down
11 changes: 7 additions & 4 deletions checkov/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def normalize_config(self) -> None:
if self.config.bc_api_key and not self.config.repo_id and not self.config.list:
self.parser.error('--repo-id is required when using a platform API key')

if self.config.policy_metadata_filter and not (self.config.bc_api_key and self.config.prisma_api_url):
if (self.config.policy_metadata_filter or self.config.policy_metadata_filter_exception) and not (self.config.bc_api_key and self.config.prisma_api_url):
logger.warning(
'--policy-metadata-filter flag was used without a Prisma Cloud API key. Policy filtering will be skipped.'
'--policy-metadata-filter or --policy-metadata-filter-exception flag was used without a Prisma Cloud API key. Policy filtering will be skipped.'
)

logging.debug('Normalizing --framework')
Expand Down Expand Up @@ -460,7 +460,7 @@ def run(self, banner: str = checkov_banner, tool: str = checkov_tool, source_typ
if removed_check_types:
logger.warning(f"Following runners won't run as they are not supported for on-premises integrations: {removed_check_types}")

bc_integration.get_prisma_build_policies(self.config.policy_metadata_filter)
bc_integration.get_prisma_build_policies(self.config.policy_metadata_filter, self.config.policy_metadata_filter_exception)

# set config to make it usable inside the integration features
integration_feature_registry.config = self.config
Expand All @@ -476,7 +476,9 @@ def run(self, banner: str = checkov_banner, tool: str = checkov_tool, source_typ
runner_filter.run_image_referencer = licensing_integration.should_run_image_referencer()

runner_filter.filtered_policy_ids = policy_metadata_integration.filtered_policy_ids
runner_filter.filtered_exception_policy_ids = policy_metadata_integration.filtered_exception_policy_ids
logger.debug(f"Filtered list of policies: {runner_filter.filtered_policy_ids}")
logger.debug(f"Filtered excluded list of policies: {runner_filter.filtered_exception_policy_ids}")

runner_filter.excluded_paths = runner_filter.excluded_paths + list(repo_config_integration.skip_paths)
policy_level_suppression = suppressions_integration.get_policy_level_suppressions()
Expand All @@ -491,7 +493,8 @@ def run(self, banner: str = checkov_banner, tool: str = checkov_tool, source_typ
if self.config.list:
print_checks(frameworks=self.config.framework, use_bc_ids=self.config.output_bc_ids,
include_all_checkov_policies=self.config.include_all_checkov_policies,
filtered_policy_ids=runner_filter.filtered_policy_ids)
filtered_policy_ids=runner_filter.filtered_policy_ids,
filtered_exception_policy_ids=runner_filter.filtered_exception_policy_ids)
return None

baseline = None
Expand Down
17 changes: 15 additions & 2 deletions checkov/runner_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
skip_cve_package: Optional[List[str]] = None,
use_enforcement_rules: bool = False,
filtered_policy_ids: Optional[List[str]] = None,
filtered_exception_policy_ids: Optional[List[str]] = None,
show_progress_bar: Optional[bool] = True,
run_image_referencer: bool = False,
enable_secret_scan_all_files: bool = False,
Expand Down Expand Up @@ -132,6 +133,7 @@ def __init__(
self.var_files = var_files
self.skip_cve_package = skip_cve_package
self.filtered_policy_ids = filtered_policy_ids or []
self.filtered_exception_policy_ids = filtered_exception_policy_ids or []
self.run_image_referencer = run_image_referencer
self.enable_secret_scan_all_files = enable_secret_scan_all_files
self.block_list_secret_scan = block_list_secret_scan
Expand Down Expand Up @@ -229,6 +231,7 @@ def should_run_check(
implicit_run = not self.checks and not check_threshold
is_external = RunnerFilter.is_external_check(check_id)
is_policy_filtered = self.is_policy_filtered(check_id)
is_policy_exception = self.is_policy_exception(check_id)
# True if this check is present in the allow list, or if there is no allow list
# this is not necessarily the return value (need to apply other filters)
should_run_check = (
Expand All @@ -247,6 +250,10 @@ def should_run_check(
if not is_policy_filtered:
logging.debug(f'not is_policy_filtered {check_id}: should_run_check = False')
should_run_check = False
# If a policy is present in the list of filter exception policies, it should not be run - implicitly or explicitly.
if is_policy_exception:
logging.debug(f'is_policy_exception {check_id}: should_run_check = False')
Dismissed Show dismissed Hide dismissed
should_run_check = False

skip_severity = severity and skip_check_threshold and severity.level <= skip_check_threshold.level
explicit_skip = self.skip_checks and self.check_matches(check_id, bc_check_id, self.skip_checks)
Expand Down Expand Up @@ -333,6 +340,11 @@ def is_policy_filtered(self, check_id: str) -> bool:
return True
return check_id in self.filtered_policy_ids

def is_policy_exception(self, check_id: str) -> bool:
if not self.filtered_exception_policy_ids:
return False
return check_id in self.filtered_exception_policy_ids

def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for key, value in self.__dict__.items():
Expand Down Expand Up @@ -368,6 +380,7 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter:
if use_enforcement_rules is None:
use_enforcement_rules = False
filtered_policy_ids = obj.get('filtered_policy_ids')
filtered_exception_policy_ids = obj.get('filtered_exception_policy_ids')
show_progress_bar = obj.get('show_progress_bar')
if show_progress_bar is None:
show_progress_bar = True
Expand All @@ -379,8 +392,8 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter:
runner_filter = RunnerFilter(framework, checks, skip_checks, include_all_checkov_policies,
download_external_modules, external_modules_download_path, evaluate_variables,
runners, skip_framework, excluded_paths, all_external, var_files,
skip_cve_package, use_enforcement_rules, filtered_policy_ids, show_progress_bar,
run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan)
skip_cve_package, use_enforcement_rules, filtered_policy_ids, filtered_exception_policy_ids,
show_progress_bar, run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan)
return runner_filter

def set_suppressed_policies(self, policy_level_suppressions: List[str]) -> None:
Expand Down