diff --git a/charon/config.py b/charon/config.py index 86f826ea..35efe6eb 100644 --- a/charon/config.py +++ b/charon/config.py @@ -24,6 +24,69 @@ logger = logging.getLogger(__name__) +class RadasConfig(object): + def __init__(self, data: Dict): + self.__umb_host: str = data.get("umb_host", None) + self.__umb_host_port: str = data.get("umb_host_port", "5671") + self.__result_queue: str = data.get("result_queue", None) + self.__request_queue: str = data.get("request_queue", None) + self.__client_ca: str = data.get("client_ca", None) + self.__client_key: str = data.get("client_key", None) + self.__client_key_pass_file: str = data.get("client_key_pass_file", None) + self.__root_ca: str = data.get("root_ca", "/etc/pki/tls/certs/ca-bundle.crt") + + def validate(self) -> bool: + if not self.__umb_host: + logger.error("Missing host name setting for UMB!") + return False + if not self.__result_queue: + logger.error("Missing the queue setting to receive siging result in UMB!") + return False + if not self.__request_queue: + logger.error("Missing the queue setting to send signing request in UMB!") + return False + if self.__client_ca and not os.access(self.__client_ca, os.R_OK): + logger.error("The client CA file is not valid!") + return False + if self.__client_key and not os.access(self.__client_key, os.R_OK): + logger.error("The client key file is not valid!") + return False + if self.__client_key_pass_file and not os.access(self.__client_key_pass_file, os.R_OK): + logger.error("The client key password file is not valid!") + return False + if self.__root_ca and not os.access(self.__root_ca, os.R_OK): + logger.error("The root ca file is not valid!") + return False + return True + + def umb_target(self) -> str: + return f'amqps://{self.__umb_host}:{self.__umb_host_port}' + + def result_queue(self) -> str: + return self.__result_queue + + def request_queue(self) -> str: + return self.__request_queue + + def client_ca(self) -> str: + return self.__client_ca + + def client_key(self) -> str: + return self.__client_key + + def client_key_password(self) -> str: + pass_file = self.__client_key_pass_file + if os.access(pass_file, os.R_OK): + with open(pass_file, 'r') as f: + return f.read() + elif pass_file: + logger.warning("The key password file is not accessible. Will ignore the password.") + return "" + + def root_ca(self) -> str: + return self.__root_ca + + class CharonConfig(object): """CharonConfig is used to store all configurations for charon tools. @@ -39,6 +102,9 @@ def __init__(self, data: Dict): self.__ignore_signature_suffix: Dict = data.get("ignore_signature_suffix", None) self.__signature_command: str = data.get("detach_signature_command", None) self.__aws_cf_enable: bool = data.get("aws_cf_enable", False) + radas_config: Dict = data.get("radas", None) + if radas_config: + self.__radas_config__: RadasConfig = RadasConfig(radas_config) def get_ignore_patterns(self) -> List[str]: return self.__ignore_patterns @@ -67,6 +133,9 @@ def get_detach_signature_command(self) -> str: def is_aws_cf_enable(self) -> bool: return self.__aws_cf_enable + def get_radas_config(self) -> RadasConfig: + return self.__radas_config__ + def get_config(cfgPath=None) -> CharonConfig: config_file_path = cfgPath diff --git a/charon/schemas/charon.json b/charon/schemas/charon.json index f6a931d1..3ffde818 100644 --- a/charon/schemas/charon.json +++ b/charon/schemas/charon.json @@ -30,6 +30,44 @@ "type": "string", "description": "signature command to be used for signature" }, + "radas": { + "type": "object", + "descrition": "", + "properties": { + "umb_host": { + "type": "string", + "description": "The host of UMB" + }, + "umb_host_port": { + "type": "string", + "description": "The port of UMB host" + }, + "result_queue": { + "type": "string", + "description": "The queue in UMB to receive radas signing result" + }, + "request_queue": { + "type": "string", + "description": "The queue in UMB to send signing request to RADAS" + }, + "client_ca": { + "type": "string", + "description": "the client ca file path" + }, + "client_key": { + "type": "string", + "description": "the client key file path" + }, + "client_key_pass_file":{ + "type": "string", + "description": "the file contains password of the client key" + }, + "root_ca": { + "type": "string", + "description": "the root ca file path" + } + } + }, "targets": { "type": "object", "patternProperties": { diff --git a/tests/test_config_radas.py b/tests/test_config_radas.py new file mode 100644 index 00000000..152dc1c2 --- /dev/null +++ b/tests/test_config_radas.py @@ -0,0 +1,226 @@ +""" +Copyright (C) 2022 Red Hat, Inc. (https://github.com/Commonjava/charon) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest +import os +import charon.config as config +import shutil +import tempfile +from tests.base import BaseTest +from charon.utils.files import overwrite_file + + +class RadasConfigTest(unittest.TestCase): + def setUp(self) -> None: + self.__base = BaseTest() + self.__prepare_ca() + + def tearDown(self) -> None: + self.__base.tearDown() + self.__clear_ca() + + def test_full_radas_config(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + root_ca: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, + self.__client_key_pass_file, self.__root_ca) + print(radas_settings) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertTrue(rconf.validate()) + + def test_missing_umb_host(self): + radas_settings = """ +radas: + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_missing_result_queue(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_missing_request_queue(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_unaccessible_client_ca(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + os.remove(self.__client_ca_path) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_unaccessible_client_key(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + os.remove(self.__client_key_path) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_unaccessible_client_password_file(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file) + os.remove(self.__client_key_pass_file) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def test_unaccessible_root_ca(self): + radas_settings = """ +radas: + umb_host: test.umb.api.com + result_queue: queue.result.test + request_queue: queue.request.test + client_ca: {} + client_key: {} + client_key_pass_file: {} + root_ca: {} + +targets: + ga: + - bucket: charon-test + """.format(self.__client_ca_path, self.__client_key_path, + self.__client_key_pass_file, self.__root_ca) + os.remove(self.__root_ca) + self.__change_config_content(radas_settings) + conf = config.get_config() + self.assertIsNotNone(conf) + rconf = conf.get_radas_config() + self.assertIsNotNone(rconf) + self.assertFalse(rconf.validate()) + + def __change_config_content(self, content: str): + self.__base.change_home() + config_base = self.__base.get_config_base() + os.mkdir(config_base) + self.__base.prepare_config(config_base, content) + + def __prepare_ca(self): + self.__tempdir = tempfile.mkdtemp() + self.__client_ca_path = os.path.join(self.__tempdir, "client_ca.crt") + self.__client_key_path = os.path.join(self.__tempdir, "client_key.crt") + self.__client_key_pass_file = os.path.join(self.__tempdir, "client_key_password.txt") + self.__root_ca = os.path.join(self.__tempdir, "root_ca.crt") + overwrite_file(self.__client_ca_path, "client ca") + overwrite_file(self.__client_key_path, "client key") + overwrite_file(self.__client_key_pass_file, "it's password") + overwrite_file(self.__root_ca, "root ca") + + def __clear_ca(self): + shutil.rmtree(self.__tempdir)