Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding capability of running automated process rules tests #127

Merged
merged 6 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion JUSTFILE
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,8 @@ gen-report:
allure generate tests/allure/results --clean -o tests/allure/reports && cp tests/allure/reports/history/* tests/allure/results/history/. && allure open tests/allure/reports

run-tests:
helm test cloudbeat-tests --namespace kube-system
helm test cloudbeat-tests --namespace kube-system --logs

build-load-run-tests: build-pytest-docker load-pytest-kind run-tests

prepare-local-helm-cluster: create-kind-cluster build-cloudbeat load-cloudbeat-image deploy-local-tests-helm
111 changes: 107 additions & 4 deletions tests/commonlib/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,121 @@ def exec_command(container_name: str, command: str, param_value: str, resource:
# return []

if container_name == '':
raise Exception(f"Unknown {container_name} is sent")
raise Exception("Unknown container name is sent")

current_resource = Path(resource)
if not current_resource.is_file():
raise Exception(f"File {resource} does not exist or mount missing.")
raise Exception(
f"File {resource} does not exist or mount missing.")

if command == 'chmod':
os.chmod(path=resource, mode=int(param_value))
elif command == 'chown':
uid_gid = param_value.split(':')
if len(uid_gid) != 2:
raise Exception("User and group parameter shall be separated by ':' ")
raise Exception(
"User and group parameter shall be separated by ':' ")
shutil.chown(path=resource, user=uid_gid[0], group=uid_gid[1])
else:
raise Exception(f"Command '{command}' still not implemented in test framework")
raise Exception(
f"Command '{command}' still not implemented in test framework")

@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we plz add some description to the function?

def edit_process_file(container_name: str, dictionary, resource: str):
"""
This function edits a process file
@param container_name: Container node
@param dictionary: Process parameters to set/unset
@param resource: File / Resource path
@return: None
"""
if container_name == '':
raise Exception(f"Unknown container name is sent")

current_resource = Path(resource)
if not current_resource.is_file():
raise Exception(
f"File {resource} does not exist or mount missing.")

# Open and load the YAML into variable
with current_resource.open() as f:
r_file = yaml.safe_load(f)

# Get process configuration arguments
command = r_file["spec"]["containers"][0]["command"]

# Collect set/unset keys and values from the dictionary
set_dict = dictionary.get("set", {})
unset_list = dictionary.get("unset", [])

# Cycle across set items from the dictionary
for skey, svalue in set_dict.items():
uri-weisman marked this conversation as resolved.
Show resolved Hide resolved
# Find if set key exists already in the configuration arguments
if any(skey == x.split("=")[0] for x in command):
# Replace the value of the key with the new value from the set items
command = list(map(lambda x: x.replace(
x, skey + "=" + svalue) if skey == x.split("=")[0] else x, command))
else:
# In case of non existing key in the configuration arguments, append the key/value from set items
command.append(skey + "=" + svalue)

# Cycle across unset items from the dictionary
for uskey in unset_list:
# Filter out the unset keys from the configuration arguments
command = [x for x in command if uskey != x.split("=")[0]]

# Override the the configuration arguments with the newly built configuration arguments
r_file["spec"]["containers"][0]["command"] = command

# Write the newly build configuration arguments
with current_resource.open(mode="w") as f:
yaml.dump(r_file, f)

@staticmethod
uri-weisman marked this conversation as resolved.
Show resolved Hide resolved
def edit_config_file(container_name: str, dictionary, resource: str):
"""
This function edits a config file
@param container_name: Container node
@param dictionary: Config parameters to set/unset
@param resource: Config path
@return: None
"""
if container_name == '':
raise Exception("Unknown container name is sent")

current_resource = Path(resource)
if not current_resource.is_file():
raise Exception(
f"File {resource} does not exist or mount missing.")

# Open and load the YAML into variable
with current_resource.open() as f:
r_file = yaml.safe_load(f)

# Collect set/unset keys and values from the dictionary
set_dict = dictionary.get("set", {})
unset_list = dictionary.get("unset", [])

# Merge two dictionaries with priority for the set items
r_file = { **r_file, **set_dict }

# Cycle across unset items from the dictionary
for uskey in unset_list:
# Parsed dot separated key values
keys = uskey.split('.')
key_to_del = keys.pop()
p = r_file

# Advance inside the dictionary for nested keys
for key in keys:
p = p.get(key, None)
if p is None:
# Non existing nested key
break
# Remove nested keys when all path exists
if p:
del p[key_to_del]

# Write the newly build config
with current_resource.open(mode="w") as f:
yaml.dump(r_file, f)
16 changes: 11 additions & 5 deletions tests/commonlib/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, is_in_cluster_config: bool = False):
self.core_v1_client = client.CoreV1Api()
self.app_api = client.AppsV1Api()
self.rbac_api = client.RbacAuthorizationV1Api()
self.coordination_v1_api = client.CoordinationV1Api()
self.api_client = client.api_client.ApiClient(configuration=self.config)

self.dispatch_list = {
Expand All @@ -28,7 +29,8 @@ def __init__(self, is_in_cluster_config: bool = False):
'Role': self.rbac_api.list_namespaced_role,
'RoleBinding': self.rbac_api.list_namespaced_role_binding,
'ClusterRoleBinding': self.rbac_api.list_cluster_role_binding,
'ClusterRole': self.rbac_api.list_cluster_role
'ClusterRole': self.rbac_api.list_cluster_role,
'Lease': self.coordination_v1_api.list_namespaced_lease,
}

self.dispatch_delete = {
Expand All @@ -39,7 +41,8 @@ def __init__(self, is_in_cluster_config: bool = False):
'Role': self.rbac_api.delete_namespaced_role,
'RoleBinding': self.rbac_api.delete_namespaced_role_binding,
'ClusterRoleBinding': self.rbac_api.delete_cluster_role_binding,
'ClusterRole': self.rbac_api.delete_cluster_role
'ClusterRole': self.rbac_api.delete_cluster_role,
'Lease': self.coordination_v1_api.delete_namespaced_lease
}

self.dispatch_patch = {
Expand All @@ -50,7 +53,8 @@ def __init__(self, is_in_cluster_config: bool = False):
'Role': self.rbac_api.patch_namespaced_role,
'RoleBinding': self.rbac_api.patch_namespaced_role_binding,
'ClusterRoleBinding': self.rbac_api.patch_cluster_role_binding,
'ClusterRole': self.rbac_api.patch_cluster_role
'ClusterRole': self.rbac_api.patch_cluster_role,
'Lease': self.coordination_v1_api.patch_namespaced_lease
}

self.dispatch_create = {
Expand All @@ -61,7 +65,8 @@ def __init__(self, is_in_cluster_config: bool = False):
'Role': self.rbac_api.create_namespaced_role,
'RoleBinding': self.rbac_api.create_namespaced_role_binding,
'ClusterRoleBinding': self.rbac_api.create_cluster_role_binding,
'ClusterRole': self.rbac_api.create_cluster_role
'ClusterRole': self.rbac_api.create_cluster_role,
'Lease': self.coordination_v1_api.create_namespaced_lease
}

self.dispatch_get = {
Expand All @@ -72,7 +77,8 @@ def __init__(self, is_in_cluster_config: bool = False):
'Role': self.rbac_api.read_namespaced_role,
'RoleBinding': self.rbac_api.read_namespaced_role_binding,
'ClusterRoleBinding': self.rbac_api.read_cluster_role_binding,
'ClusterRole': self.rbac_api.read_cluster_role
'ClusterRole': self.rbac_api.read_cluster_role,
'Lease': self.coordination_v1_api.read_namespaced_lease
}

def get_agent_pod_instances(self, agent_name: str, namespace: str):
Expand Down
62 changes: 62 additions & 0 deletions tests/product/tests/test_process_api_server_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
uri-weisman marked this conversation as resolved.
Show resolved Hide resolved
Kubernetes CIS rules verification.
This module verifies correctness of retrieved findings by manipulating audit and remediation actions
"""
from datetime import datetime

import pytest
import time

from commonlib.utils import get_evaluation
from product.tests.tests.process.process_test_cases import *


@pytest.mark.rules
@pytest.mark.parametrize(
("rule_tag", "dictionary", "resource", "expected"),
api_server_rules,
)
def test_process_api_server(config_node_pre_test,
rule_tag,
dictionary,
resource,
expected):
"""
This data driven test verifies rules and findings return by cloudbeat agent.
In order to add new cases @pytest.mark.parameterize section shall be updated.
Setup and teardown actions are defined in data method.
This test creates cloudbeat agent instance, changes node resources (modes, users, groups) and verifies,
that cloudbeat returns correct finding.
@param rule_tag: Name of rule to be verified.
@param dictionary: Set and Unset dictionary
@param resource: Full path to resource / file
@param expected: Result to be found in finding evaluation field.
@return: None - Test Pass / Fail result is generated.
"""
k8s_client, api_client, cloudbeat_agent = config_node_pre_test

if not "edit_process_file" in dir(api_client):
pytest.skip("skipping process rules run in non-containerized api_client")

# Currently, single node is used, in the future may be extended for all nodes.
node = k8s_client.get_cluster_nodes()[0]
pods = k8s_client.get_agent_pod_instances(agent_name=cloudbeat_agent.name, namespace=cloudbeat_agent.namespace)

api_client.edit_process_file(container_name=node.metadata.name,
dictionary=dictionary,
resource=resource)

# Wait for process reboot
# TODO: Implement a more optimal way of waiting
time.sleep(60)

evaluation = get_evaluation(
k8s=k8s_client,
timeout=cloudbeat_agent.findings_timeout,
pod_name=pods[0].metadata.name,
namespace=cloudbeat_agent.namespace,
rule_tag=rule_tag,
exec_timestamp=datetime.utcnow()
)

assert evaluation == expected, f"Rule {rule_tag} verification failed."
62 changes: 62 additions & 0 deletions tests/product/tests/test_process_controller_manager_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Kubernetes CIS rules verification.
This module verifies correctness of retrieved findings by manipulating audit and remediation actions
"""
from datetime import datetime

import pytest
import time

from commonlib.utils import get_evaluation
from product.tests.tests.process.process_test_cases import *


@pytest.mark.rules
@pytest.mark.parametrize(
("rule_tag", "dictionary", "resource", "expected"),
controller_manager_rules,
)
def test_process_controller_manager(config_node_pre_test,
rule_tag,
dictionary,
resource,
expected):
"""
This data driven test verifies rules and findings return by cloudbeat agent.
In order to add new cases @pytest.mark.parameterize section shall be updated.
Setup and teardown actions are defined in data method.
This test creates cloudbeat agent instance, changes node resources (modes, users, groups) and verifies,
that cloudbeat returns correct finding.
@param rule_tag: Name of rule to be verified.
@param dictionary: Set and Unset dictionary
@param resource: Full path to resource / file
@param expected: Result to be found in finding evaluation field.
@return: None - Test Pass / Fail result is generated.
"""
k8s_client, api_client, cloudbeat_agent = config_node_pre_test

if not "edit_process_file" in dir(api_client):
pytest.skip("skipping process rules run in non-containerized api_client")

# Currently, single node is used, in the future may be extended for all nodes.
node = k8s_client.get_cluster_nodes()[0]
pods = k8s_client.get_agent_pod_instances(agent_name=cloudbeat_agent.name, namespace=cloudbeat_agent.namespace)

api_client.edit_process_file(container_name=node.metadata.name,
dictionary=dictionary,
resource=resource)

# Wait for process reboot
# TODO: Implement a more optimal way of waiting
time.sleep(60)

evaluation = get_evaluation(
k8s=k8s_client,
timeout=cloudbeat_agent.findings_timeout,
pod_name=pods[0].metadata.name,
namespace=cloudbeat_agent.namespace,
rule_tag=rule_tag,
exec_timestamp=datetime.utcnow()
)

assert evaluation == expected, f"Rule {rule_tag} verification failed."
62 changes: 62 additions & 0 deletions tests/product/tests/test_process_etcd_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Kubernetes CIS rules verification.
This module verifies correctness of retrieved findings by manipulating audit and remediation actions
"""
from datetime import datetime

import pytest
import time

from commonlib.utils import get_evaluation
from product.tests.tests.process.process_test_cases import *


@pytest.mark.rules
@pytest.mark.parametrize(
("rule_tag", "dictionary", "resource", "expected"),
etcd_rules,
)
def test_process_etcd(config_node_pre_test,
rule_tag,
dictionary,
resource,
expected):
"""
This data driven test verifies rules and findings return by cloudbeat agent.
In order to add new cases @pytest.mark.parameterize section shall be updated.
Setup and teardown actions are defined in data method.
This test creates cloudbeat agent instance, changes node resources (modes, users, groups) and verifies,
that cloudbeat returns correct finding.
@param rule_tag: Name of rule to be verified.
@param dictionary: Set and Unset dictionary
@param resource: Full path to resource / file
@param expected: Result to be found in finding evaluation field.
@return: None - Test Pass / Fail result is generated.
"""
k8s_client, api_client, cloudbeat_agent = config_node_pre_test

if not "edit_process_file" in dir(api_client):
pytest.skip("skipping process rules run in non-containerized api_client")

# Currently, single node is used, in the future may be extended for all nodes.
node = k8s_client.get_cluster_nodes()[0]
pods = k8s_client.get_agent_pod_instances(agent_name=cloudbeat_agent.name, namespace=cloudbeat_agent.namespace)

api_client.edit_process_file(container_name=node.metadata.name,
dictionary=dictionary,
resource=resource)

# Wait for process reboot
# TODO: Implement a more optimal way of waiting
time.sleep(60)

evaluation = get_evaluation(
k8s=k8s_client,
timeout=cloudbeat_agent.findings_timeout,
pod_name=pods[0].metadata.name,
namespace=cloudbeat_agent.namespace,
rule_tag=rule_tag,
exec_timestamp=datetime.utcnow()
)

assert evaluation == expected, f"Rule {rule_tag} verification failed."
Loading