Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions charon/pkgs/radas_signature_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import os
import asyncio
import sys
import uuid
from typing import List, Any, Tuple, Callable, Dict, Optional
from charon.config import get_config, RadasConfig
from charon.pkgs.oras_client import OrasClient
from proton import Event
from proton import SSLDomain, Message, Event
from proton.handlers import MessagingHandler
from proton.reactor import Container

logger = logging.getLogger(__name__)

Expand All @@ -48,6 +50,7 @@ def __init__(self, sign_result_loc: str, request_id: str) -> None:
super().__init__()
self.sign_result_loc = sign_result_loc
self.request_id = request_id
self.conn = None
self.sign_result_status: Optional[str] = None
self.sign_result_errors: List[str] = []

Expand All @@ -63,8 +66,23 @@ def on_start(self, event: Event) -> None:
# explicit check to pass the type checker
if rconf is None:
sys.exit(1)
conn = event.container.connect(rconf.umb_target())
event.container.create_receiver(conn, rconf.result_queue())

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_credentials(
rconf.client_ca(),
rconf.client_key(),
rconf.client_key_password()
)
ssl_domain.set_trusted_ca_db(rconf.root_ca())
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)

self.conn = event.container.connect(
url=rconf.umb_target(),
ssl_domain=ssl_domain
)
event.container.create_receiver(
self.conn, rconf.result_queue(), dynamic=True
)
logger.info("Listening on %s, queue: %s", rconf.umb_target(), rconf.result_queue())

def on_message(self, event: Event) -> None:
Expand Down Expand Up @@ -122,6 +140,60 @@ def _process_message(self, msg: Any) -> None:
logger.info("Number of files pulled: %d, path: %s", len(files), files[0])


class RadasSender(MessagingHandler):
"""
This simple sender will send given string massage to UMB message queue to request signing.
Attributes:
payload (str): payload json string for radas to read,
this value construct from the cmd flag
"""
def __init__(self, payload: str):
super().__init__()
self.payload = payload
self.container = None
self.conn = None
self.sender = None

def on_start(self, event):
"""
On start callback
"""
conf = get_config()
if not (conf and conf.is_radas_enabled()):
sys.exit(1)

rconf = conf.get_radas_config()
if rconf is None:
sys.exit(1)

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_credentials(
rconf.client_ca(),
rconf.client_key(),
rconf.client_key_password()
)
ssl_domain.set_trusted_ca_db(rconf.root_ca())
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)

self.container = event.container
self.conn = event.container.connect(
url=rconf.umb_target(),
ssl_domain=ssl_domain
)
self.sender = event.container.create_sender(self.conn, rconf.request_queue())

def on_sendable(self):
"""
On message able to send callback
"""
request = self.payload
msg = Message(body=request)
if self.sender:
self.sender.send(msg)
if self.container:
self.container.stop()


def generate_radas_sign(top_level: str, sign_result_loc: str) -> Tuple[List[str], List[str]]:
"""
Generate .asc files based on RADAS sign result json file
Expand Down Expand Up @@ -215,4 +287,33 @@ def sign_in_radas(repo_url: str,
result_path: str,
ignore_patterns: List[str],
radas_config: RadasConfig):
logger.info("Start signing for %s", repo_url)
"""
This function will be responsible to do the overall controlling of the whole process,
like trigger the send and register the receiver, and control the wait and timeout there.
"""
logger.debug("params. repo_url: %s, requester: %s, sign_key: %s, result_path: %s,"
"radas_config: %s", repo_url, requester, sign_key, result_path, radas_config)
request_id = str(uuid.uuid4())
exclude = ignore_patterns if ignore_patterns else []

payload = {
"request_id": request_id,
"requested_by": requester,
"type": "mrrc",
"file_reference": repo_url,
"sig_keyname": sign_key,
"exclude": exclude
}

listener = RadasReceiver(result_path, request_id)
sender = RadasSender(json.dumps(payload))

try:
Container(sender).run()
logger.info("Successfully sent signing request ID: %s", request_id)
Container(listener).run()
finally:
if listener.conn is not None:
listener.conn.close()
if sender.conn is not None:
sender.conn.close()
98 changes: 98 additions & 0 deletions tests/test_radas_send_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import tempfile
import os
from unittest import mock
import unittest
from charon.pkgs.radas_signature_handler import sign_in_radas


class RadasSignHandlerTest(unittest.TestCase):
def setUp(self) -> None:
super().setUp()

def tearDown(self) -> None:
super().tearDown()

def test_sign_in_radas_normal_flow(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Mock configuration
mock_config = mock.MagicMock()
mock_config.is_radas_enabled.return_value = True
mock_radas_config = mock.MagicMock()
mock_config.get_radas_config.return_value = mock_radas_config

# Mock Container run to avoid real AMQP connection
with mock.patch(
"charon.pkgs.radas_signature_handler.Container") as mock_container, \
mock.patch(
"charon.pkgs.radas_signature_handler.get_config", return_value=mock_config), \
mock.patch(
"charon.pkgs.radas_signature_handler.uuid.uuid4", return_value="mocked-uuid"):

test_result_path = os.path.join(tmpdir, "results")
os.makedirs(test_result_path)

sign_in_radas(
repo_url="quay.io/test/repo",
requester="test-user",
sign_key="test-key",
result_path=test_result_path,
ignore_patterns=[],
radas_config=mock_radas_config
)

# Verify Container.run() was called twice (sender and receiver)
self.assertEqual(mock_container.call_count, 2)

# Verify request ID propagation
receiver_call = mock_container.call_args_list[1]
self.assertEqual(receiver_call.args[0].request_id, "mocked-uuid")

def test_sign_in_radas_with_disabled_config(self):
mock_config = mock.MagicMock()
mock_config.is_radas_enabled.return_value = False

with mock.patch(
"charon.pkgs.radas_signature_handler.get_config", return_value=mock_config), \
self.assertRaises(SystemExit):

sign_in_radas(
repo_url="quay.io/test/repo",
requester="test-user",
sign_key="test-key",
result_path="/tmp/results",
ignore_patterns=[],
radas_config=mock.MagicMock()
)

def test_sign_in_radas_connection_cleanup(self):
mock_config = mock.MagicMock()
mock_config.is_radas_enabled.return_value = True
mock_radas_config = mock.MagicMock()

with mock.patch("charon.pkgs.radas_signature_handler.Container") as mock_container, \
mock.patch("charon.pkgs.radas_signature_handler.get_config", return_value=mock_config):

mock_sender_conn = mock.MagicMock()
mock_listener_conn = mock.MagicMock()

def container_side_effect(*args, **kwargs):
if args[0].__class__.__name__ == "RadasReceiver":
args[0].conn = mock_listener_conn
elif args[0].__class__.__name__ == "RadasSender":
args[0].conn = mock_sender_conn
return mock.MagicMock()

mock_container.side_effect = container_side_effect

sign_in_radas(
repo_url="quay.io/test/repo",
requester="test-user",
sign_key="test-key",
result_path="/tmp/results",
ignore_patterns=[],
radas_config=mock_radas_config
)

# Verify connections are closed
mock_sender_conn.close.assert_called_once()
mock_listener_conn.close.assert_called_once()