Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions charon/cmd/cmd_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ def sign(
try:
current = datetime.datetime.now().strftime("%Y%m%d%I%M")
_decide_mode("radas_sign", current, is_quiet=quiet, is_debug=debug)
if dryrun:
logger.info("Running in dry-run mode, no files will signed.")
conf = get_config(config)
if not conf:
logger.error("The charon configuration is not valid!")
Expand Down
3 changes: 3 additions & 0 deletions charon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def client_key_password(self) -> str:
def root_ca(self) -> str:
return self.__root_ca.strip()

def ssl_enabled(self) -> bool:
return bool(self.__client_ca and self.__client_key and self.__root_ca)

def quay_radas_registry_config(self) -> Optional[str]:
if self.__quay_radas_registry_config:
return self.__quay_radas_registry_config.strip()
Expand Down
126 changes: 66 additions & 60 deletions charon/pkgs/radas_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,57 +53,59 @@ def __init__(self, sign_result_loc: str, request_id: str, rconf: RadasConfig) ->
super().__init__()
self.sign_result_loc = sign_result_loc
self.request_id = request_id
self.conn: Optional[Connection] = None
self.message_handled = False
self.sign_result_status: Optional[str] = None
self.sign_result_errors: List[str] = []
self.rconf = rconf
self.start_time = 0.0
self.timeout_check_delay = 30.0
self.ssl = SSLDomain(SSLDomain.MODE_CLIENT)
self.ssl.set_trusted_ca_db(self.rconf.root_ca())
self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
self.ssl.set_credentials(
self.rconf.client_ca(),
self.rconf.client_key(),
self.rconf.client_key_password()
)
self._conn: Optional[Connection] = None
self._message_handled = False
self._start_time = 0.0
self._timeout_check_delay = 30.0
self._ssl: Optional[SSLDomain] = None
if rconf.ssl_enabled():
self._ssl = SSLDomain(SSLDomain.MODE_CLIENT)
self._ssl.set_trusted_ca_db(self.rconf.root_ca())
self._ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
self._ssl.set_credentials(
self.rconf.client_ca(),
self.rconf.client_key(),
self.rconf.client_key_password()
)
self.log = logging.getLogger("charon.pkgs.radas_sign.RadasReceiver")

def on_start(self, event: Event) -> None:
umb_target = self.rconf.umb_target()
container = event.container
self.conn = container.connect(
self._conn = container.connect(
url=umb_target,
ssl_domain=self.ssl,
ssl_domain=self._ssl,
heartbeat=500
)
receiver = container.create_receiver(
context=self.conn, source=self.rconf.result_queue(),
context=self._conn, source=self.rconf.result_queue(),
)
self.log.info("Listening on %s, queue: %s",
umb_target,
receiver.source.address)
self.start_time = time.time()
container.schedule(self.timeout_check_delay, self)
self._start_time = time.time()
container.schedule(self._timeout_check_delay, self)

def on_timer_task(self, event: Event) -> None:
current = time.time()
timeout = self.rconf.receiver_timeout()
idle_time = current - self.start_time
idle_time = current - self._start_time
self.log.debug("Checking timeout: passed %s seconds, timeout time %s seconds",
idle_time, timeout)
if idle_time > self.rconf.receiver_timeout():
self.log.error("The receiver did not receive messages for more than %s seconds,"
" and needs to stop receiving and quit.", timeout)
self._close(event)
else:
event.container.schedule(self.timeout_check_delay, self)
event.container.schedule(self._timeout_check_delay, self)

def on_message(self, event: Event) -> None:
self.log.debug("Got message: %s", event.message.body)
self._process_message(event.message.body)
if self.message_handled:
if self._message_handled:
self.log.debug("The signing result is handled.")
self._close(event)

Expand Down Expand Up @@ -137,7 +139,7 @@ def _process_message(self, msg: Any) -> None:
)
return

self.message_handled = True
self._message_handled = True
self.log.info(
"Start to process the sign event message, request_id %s is matched", msg_request_id
)
Expand Down Expand Up @@ -171,56 +173,60 @@ class RadasSender(MessagingHandler):
this value construct from the cmd flag
rconf (RadasConfig): the configurations for the radas messaging
system.
status (str): tell if status for message sending, only "success"
means the message is sent successfully.
"""
def __init__(self, payload: Any, rconf: RadasConfig):
super(RadasSender, self).__init__()
self.payload = payload
self.rconf = rconf
self.message_sent = False # Flag to track if message was sent
self.status: Optional[str] = None
self.retried = 0
self.pending: Optional[Message] = None
self.message: Optional[Message] = None
self.container: Optional[Container] = None
self.sender: Optional[Sender] = None
self.ssl = SSLDomain(SSLDomain.MODE_CLIENT)
self.ssl.set_trusted_ca_db(self.rconf.root_ca())
self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
self.ssl.set_credentials(
self.rconf.client_ca(),
self.rconf.client_key(),
self.rconf.client_key_password()
)
self._message_sent = False # Flag to track if message was sent
self._retried = 0
self._pending: Optional[Message] = None
self._message: Optional[Message] = None
self._container: Optional[Container] = None
self._sender: Optional[Sender] = None
self._ssl: Optional[SSLDomain] = None
if self.rconf.ssl_enabled():
self._ssl = SSLDomain(SSLDomain.MODE_CLIENT)
self._ssl.set_trusted_ca_db(self.rconf.root_ca())
self._ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
self._ssl.set_credentials(
self.rconf.client_ca(),
self.rconf.client_key(),
self.rconf.client_key_password()
)
self.log = logging.getLogger("charon.pkgs.radas_sign.RadasSender")

def on_start(self, event):
self.container = event.container
conn = self.container.connect(
self._container = event.container
conn = self._container.connect(
url=self.rconf.umb_target(),
ssl_domain=self.ssl
ssl_domain=self._ssl
)
if conn:
self.sender = self.container.create_sender(conn, self.rconf.request_queue())
self._sender = self._container.create_sender(conn, self.rconf.request_queue())

def on_sendable(self, event):
if not self.message_sent:
if not self._message_sent:
msg = Message(body=self.payload, durable=True)
self.log.debug("Sending message: %s to %s", msg.id, event.sender.target.address)
self._send_msg(msg)
self.message = msg
self.message_sent = True
self._message = msg
self._message_sent = True

def on_error(self, event):
self.log.error("Error happened during message sending, reason %s",
event.description)
self.status = "failed"

def on_rejected(self, event):
self.pending = self.message
self._pending = self._message
self._handle_failed_delivery("Rejected")

def on_released(self, event):
self.pending = self.message
self._pending = self._message
self._handle_failed_delivery("Released")

def on_accepted(self, event):
Expand All @@ -229,42 +235,42 @@ def on_accepted(self, event):
self.close() # Close connection after confirmation

def on_timer_task(self, event):
message_to_retry = self.message
message_to_retry = self._message
self._send_msg(message_to_retry)
self.pending = None
self._pending = None

def close(self):
self.log.info("Message has been sent successfully, close connection")
if self.sender:
self.sender.close()
if self.container:
self.container.stop()
if self._sender:
self._sender.close()
if self._container:
self._container.stop()

def _send_msg(self, msg: Message):
if self.sender and self.sender.credit > 0:
self.sender.send(msg)
if self._sender and self._sender.credit > 0:
self._sender.send(msg)
self.log.debug("Message %s sent", msg.id)
else:
self.log.warning("Sender not ready or no credit available")

def _handle_failed_delivery(self, reason: str):
if self.pending:
msg = self.pending
if self._pending:
msg = self._pending
self.log.warning("Message %s failed for reason: %s", msg.id, reason)
max_retries = self.rconf.radas_sign_timeout_retry_count()
if self.retried < max_retries:
if self._retried < max_retries:
# Schedule retry
self.retried = self.retried + 1
self._retried = self._retried + 1
self.log.info("Scheduling retry %s/%s for message %s",
self.retried, max_retries, msg.id)
self._retried, max_retries, msg.id)
# Schedule retry after delay
if self.container:
self.container.schedule(self.rconf.radas_sign_timeout_retry_interval(), self)
if self._container:
self._container.schedule(self.rconf.radas_sign_timeout_retry_interval(), self)
else:
# Max retries exceeded
self.log.error("Message %s failed after %s retries", msg.id, max_retries)
self.status = "failed"
self.pending = None
self._pending = None
else:
self.log.info("Message has been sent successfully, close connection")
self.close()
Expand Down
12 changes: 6 additions & 6 deletions tests/test_radas_sign_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def tearDown(self) -> None:
super().tearDown()

def reset_receiver(self, r_receiver: RadasReceiver) -> None:
r_receiver.message_handled = False
r_receiver._message_handled = False
r_receiver.sign_result_errors = []
r_receiver.sign_result_status = None

Expand Down Expand Up @@ -54,8 +54,8 @@ def test_radas_receiver(self):
r_receiver.on_start(event)
self.assertEqual(mock_container.connect.call_count, 1)
self.assertEqual(mock_container.create_receiver.call_count, 1)
self.assertTrue(r_receiver.start_time > 0.0)
self.assertTrue(r_receiver.start_time < time.time())
self.assertTrue(r_receiver._start_time > 0.0)
self.assertTrue(r_receiver._start_time < time.time())
self.assertEqual(mock_container.schedule.call_count, 1)

# test on_message: unmatched case
Expand All @@ -71,7 +71,7 @@ def test_radas_receiver(self):
r_receiver.on_message(event)
self.assertEqual(event.connection.close.call_count, 0)
self.assertEqual(mock_container.stop.call_count, 0)
self.assertFalse(r_receiver.message_handled)
self.assertFalse(r_receiver._message_handled)
self.assertIsNone(r_receiver.sign_result_status)
self.assertEqual(r_receiver.sign_result_errors, [])
self.assertEqual(oras_client.call_count, 0)
Expand All @@ -90,7 +90,7 @@ def test_radas_receiver(self):
r_receiver.on_message(event)
self.assertEqual(event.connection.close.call_count, 1)
self.assertEqual(mock_container.stop.call_count, 1)
self.assertTrue(r_receiver.message_handled)
self.assertTrue(r_receiver._message_handled)
self.assertEqual(r_receiver.sign_result_status, "failed")
self.assertEqual(r_receiver.sign_result_errors, ["error1", "error2"])
self.assertEqual(oras_client.call_count, 0)
Expand All @@ -109,7 +109,7 @@ def test_radas_receiver(self):
r_receiver.on_message(event)
self.assertEqual(event.connection.close.call_count, 2)
self.assertEqual(mock_container.stop.call_count, 2)
self.assertTrue(r_receiver.message_handled)
self.assertTrue(r_receiver._message_handled)
self.assertEqual(r_receiver.sign_result_status, "success")
self.assertEqual(r_receiver.sign_result_errors, [])
self.assertEqual(oras_client.call_count, 1)
Expand Down
28 changes: 14 additions & 14 deletions tests/test_radas_sign_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def test_radas_sender(self):
self.assertEqual(ssl_domain.call_count, 1)
self.assertEqual(r_sender.payload, json_payload)
self.assertIs(r_sender.rconf, mock_radas_config)
self.assertIsNone(r_sender.message)
self.assertIsNone(r_sender.pending)
self.assertIsNone(r_sender._message)
self.assertIsNone(r_sender._pending)

# test on_start
mock_sender = mock.MagicMock()
Expand All @@ -57,30 +57,30 @@ def test_radas_sender(self):
# test on_sendable
mock_sender.credit = 1
r_sender.on_sendable(event)
self.assertIsNotNone(r_sender.message)
self.assertIsNotNone(r_sender._message)
self.assertEqual(mock_sender.send.call_count, 1)

# test on_accepted
r_sender.on_accepted(event)
self.assertEqual(r_sender.status, "success")
self.assertEqual(r_sender.retried, 0)
self.assertEqual(r_sender.sender.close.call_count, 1)
self.assertEqual(r_sender.container.stop.call_count, 1)
self.assertEqual(r_sender._retried, 0)
self.assertEqual(r_sender._sender.close.call_count, 1)
self.assertEqual(r_sender._container.stop.call_count, 1)

# test on_rejected
r_sender.on_rejected(event)
self.assertIsNone(r_sender.pending)
self.assertEqual(r_sender.retried, 1)
self.assertEqual(r_sender.container.schedule.call_count, 1)
self.assertIsNone(r_sender._pending)
self.assertEqual(r_sender._retried, 1)
self.assertEqual(r_sender._container.schedule.call_count, 1)

# test on_released
r_sender.on_released(event)
self.assertIsNone(r_sender.pending)
self.assertEqual(r_sender.retried, 2)
self.assertEqual(r_sender.container.schedule.call_count, 2)
self.assertIsNone(r_sender._pending)
self.assertEqual(r_sender._retried, 2)
self.assertEqual(r_sender._container.schedule.call_count, 2)

# test on_released
r_sender.on_timer_task(event)
self.assertIsNone(r_sender.pending)
self.assertEqual(r_sender.retried, 2)
self.assertIsNone(r_sender._pending)
self.assertEqual(r_sender._retried, 2)
self.assertEqual(mock_sender.send.call_count, 2)