Skip to content

Commit

Permalink
fix(terraform): pod security policy removed in GKE 1.25 (#5675)
Browse files Browse the repository at this point in the history
* fix(terraform) pod security policy removed in 1.25

* handle string versions
  • Loading branch information
JamesWoolfenden committed Dec 21, 2023
1 parent bb5de6b commit 30dcb36
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories
from __future__ import annotations

from typing import Any

from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckCategories, CheckResult


class GKEPodSecurityPolicyEnabled(BaseResourceCheck):

"""
Pod Security Policy was removed from GKE clusters with version >= 1.25.0
"""

class GKEPodSecurityPolicyEnabled(BaseResourceValueCheck):
def __init__(self):
name = "Ensure PodSecurityPolicy controller is enabled on the Kubernetes Engine Clusters"
id = "CKV_GCP_24"
supported_resources = ['google_container_cluster']
categories = [CheckCategories.KUBERNETES]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return "pod_security_policy_config/[0]/enabled"
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

if conf.get('min_master_version') and isinstance(conf.get('min_master_version'), list):
raw = conf.get('min_master_version')[0]
splitter = raw.split(".")
if len(splitter) >= 2:
str_version = splitter[0] + "." + splitter[1]
version = float(str_version)
if version < 1.25:
if conf.get('pod_security_policy_config') and isinstance(conf.get('pod_security_policy_config'), list):
policy = conf.get('pod_security_policy_config')[0]
if policy.get('enabled') and isinstance(policy.get('enabled'), list):
secure = policy.get('enabled')[0]
if secure:
return CheckResult.PASSED
return CheckResult.FAILED
return CheckResult.UNKNOWN


check = GKEPodSecurityPolicyEnabled()
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
resource "google_container_cluster" "unknown" {
}

resource "google_container_cluster" "unknown2" {
min_master_version = "1.27"
}


resource "google_container_cluster" "pass" {
min_master_version = "1.24"
pod_security_policy_config {
enabled = true
}
}

resource "google_container_cluster" "fail" {
min_master_version = "1.24"
}

resource "google_container_cluster" "fail2" {
min_master_version = "1.24"
pod_security_policy_config {
enabled = false
}
}



Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
import unittest
import os

from checkov.terraform.checks.resource.gcp.GKEPodSecurityPolicyEnabled import check
from checkov.common.models.enums import CheckResult
from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


class TestGKEPodSecurityPolicyEnabled(unittest.TestCase):

def test_failure(self):
resource_conf = {'name': ['google_cluster'], 'enable_legacy_abac': [False], 'resource_labels': [{'Owner': ['SomeoneNotWorkingHere']}], 'node_config': [{'image_type': ['cos']}], 'ip_allocation_policy': [{}]}
def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

scan_result = check.scan_resource_conf(conf=resource_conf)
self.assertEqual(CheckResult.FAILED, scan_result)
test_files_dir = current_dir + "/example_GKEPodSecurityPolicyEnabled"
report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

def test_success(self):
resource_conf = {'name': ['google_cluster'], 'monitoring_service': ['monitoring.googleapis.com'], 'master_authorized_networks_config': [{}], 'master_auth': [{'client_certificate_config': [{'issue_client_certificate': [False]}]}], 'node_config': [{'image_type': ['not-cos']}], 'pod_security_policy_config': [{'enabled': [True]}]}
passing_resources = {
'google_container_cluster.pass',
}
failing_resources = {
'google_container_cluster.fail',
'google_container_cluster.fail2',
}

scan_result = check.scan_resource_conf(conf=resource_conf)
self.assertEqual(CheckResult.PASSED, scan_result)
passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary['passed'], len(passing_resources))
self.assertEqual(summary['failed'], len(failing_resources))
self.assertEqual(summary['skipped'], 0)
self.assertEqual(summary['parsing_errors'], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
unittest.main()

0 comments on commit 30dcb36

Please sign in to comment.