Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions charon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,69 @@
logger = logging.getLogger(__name__)


class RadasConfig(object):
def __init__(self, data: Dict):
self.__umb_host: str = data.get("umb_host", None)
self.__umb_host_port: str = data.get("umb_host_port", "5671")
self.__result_queue: str = data.get("result_queue", None)
self.__request_queue: str = data.get("request_queue", None)
self.__client_ca: str = data.get("client_ca", None)
self.__client_key: str = data.get("client_key", None)
self.__client_key_pass_file: str = data.get("client_key_pass_file", None)
self.__root_ca: str = data.get("root_ca", "/etc/pki/tls/certs/ca-bundle.crt")

def validate(self) -> bool:
if not self.__umb_host:
logger.error("Missing host name setting for UMB!")
return False
if not self.__result_queue:
logger.error("Missing the queue setting to receive siging result in UMB!")
return False
if not self.__request_queue:
logger.error("Missing the queue setting to send signing request in UMB!")
return False
if self.__client_ca and not os.access(self.__client_ca, os.R_OK):
logger.error("The client CA file is not valid!")
return False
if self.__client_key and not os.access(self.__client_key, os.R_OK):
logger.error("The client key file is not valid!")
return False
if self.__client_key_pass_file and not os.access(self.__client_key_pass_file, os.R_OK):
logger.error("The client key password file is not valid!")
return False
if self.__root_ca and not os.access(self.__root_ca, os.R_OK):
logger.error("The root ca file is not valid!")
return False
return True

def umb_target(self) -> str:
return f'amqps://{self.__umb_host}:{self.__umb_host_port}'

def result_queue(self) -> str:
return self.__result_queue

def request_queue(self) -> str:
return self.__request_queue

def client_ca(self) -> str:
return self.__client_ca

def client_key(self) -> str:
return self.__client_key

def client_key_password(self) -> str:
pass_file = self.__client_key_pass_file
if os.access(pass_file, os.R_OK):
with open(pass_file, 'r') as f:
return f.read()
elif pass_file:
logger.warning("The key password file is not accessible. Will ignore the password.")
return ""

def root_ca(self) -> str:
return self.__root_ca


class CharonConfig(object):
"""CharonConfig is used to store all configurations for charon
tools.
Expand All @@ -39,6 +102,9 @@ def __init__(self, data: Dict):
self.__ignore_signature_suffix: Dict = data.get("ignore_signature_suffix", None)
self.__signature_command: str = data.get("detach_signature_command", None)
self.__aws_cf_enable: bool = data.get("aws_cf_enable", False)
radas_config: Dict = data.get("radas", None)
if radas_config:
self.__radas_config__: RadasConfig = RadasConfig(radas_config)

def get_ignore_patterns(self) -> List[str]:
return self.__ignore_patterns
Expand Down Expand Up @@ -67,6 +133,9 @@ def get_detach_signature_command(self) -> str:
def is_aws_cf_enable(self) -> bool:
return self.__aws_cf_enable

def get_radas_config(self) -> RadasConfig:
return self.__radas_config__


def get_config(cfgPath=None) -> CharonConfig:
config_file_path = cfgPath
Expand Down
38 changes: 38 additions & 0 deletions charon/schemas/charon.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,44 @@
"type": "string",
"description": "signature command to be used for signature"
},
"radas": {
"type": "object",
"descrition": "",
"properties": {
"umb_host": {
"type": "string",
"description": "The host of UMB"
},
"umb_host_port": {
"type": "string",
"description": "The port of UMB host"
},
"result_queue": {
"type": "string",
"description": "The queue in UMB to receive radas signing result"
},
"request_queue": {
"type": "string",
"description": "The queue in UMB to send signing request to RADAS"
},
"client_ca": {
"type": "string",
"description": "the client ca file path"
},
"client_key": {
"type": "string",
"description": "the client key file path"
},
"client_key_pass_file":{
"type": "string",
"description": "the file contains password of the client key"
},
"root_ca": {
"type": "string",
"description": "the root ca file path"
}
}
},
"targets": {
"type": "object",
"patternProperties": {
Expand Down
226 changes: 226 additions & 0 deletions tests/test_config_radas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""
Copyright (C) 2022 Red Hat, Inc. (https://github.com/Commonjava/charon)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
import os
import charon.config as config
import shutil
import tempfile
from tests.base import BaseTest
from charon.utils.files import overwrite_file


class RadasConfigTest(unittest.TestCase):
def setUp(self) -> None:
self.__base = BaseTest()
self.__prepare_ca()

def tearDown(self) -> None:
self.__base.tearDown()
self.__clear_ca()

def test_full_radas_config(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}
root_ca: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path,
self.__client_key_pass_file, self.__root_ca)
print(radas_settings)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertTrue(rconf.validate())

def test_missing_umb_host(self):
radas_settings = """
radas:
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_missing_result_queue(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_missing_request_queue(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_unaccessible_client_ca(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
os.remove(self.__client_ca_path)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_unaccessible_client_key(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
os.remove(self.__client_key_path)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_unaccessible_client_password_file(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path, self.__client_key_pass_file)
os.remove(self.__client_key_pass_file)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def test_unaccessible_root_ca(self):
radas_settings = """
radas:
umb_host: test.umb.api.com
result_queue: queue.result.test
request_queue: queue.request.test
client_ca: {}
client_key: {}
client_key_pass_file: {}
root_ca: {}

targets:
ga:
- bucket: charon-test
""".format(self.__client_ca_path, self.__client_key_path,
self.__client_key_pass_file, self.__root_ca)
os.remove(self.__root_ca)
self.__change_config_content(radas_settings)
conf = config.get_config()
self.assertIsNotNone(conf)
rconf = conf.get_radas_config()
self.assertIsNotNone(rconf)
self.assertFalse(rconf.validate())

def __change_config_content(self, content: str):
self.__base.change_home()
config_base = self.__base.get_config_base()
os.mkdir(config_base)
self.__base.prepare_config(config_base, content)

def __prepare_ca(self):
self.__tempdir = tempfile.mkdtemp()
self.__client_ca_path = os.path.join(self.__tempdir, "client_ca.crt")
self.__client_key_path = os.path.join(self.__tempdir, "client_key.crt")
self.__client_key_pass_file = os.path.join(self.__tempdir, "client_key_password.txt")
self.__root_ca = os.path.join(self.__tempdir, "root_ca.crt")
overwrite_file(self.__client_ca_path, "client ca")
overwrite_file(self.__client_key_path, "client key")
overwrite_file(self.__client_key_pass_file, "it's password")
overwrite_file(self.__root_ca, "root ca")

def __clear_ca(self):
shutil.rmtree(self.__tempdir)