Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion charon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def validate(self) -> bool:
return True

def umb_target(self) -> str:
return f"amqps://{self.__umb_host.strip()}:{self.__umb_host_port}"
if self.ssl_enabled():
return f"amqps://{self.__umb_host.strip()}:{self.__umb_host_port}"
else:
return f"amqp://{self.__umb_host.strip()}:{self.__umb_host_port}"

def result_queue(self) -> str:
return self.__result_queue.strip()
Expand Down
24 changes: 15 additions & 9 deletions charon/pkgs/radas_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,25 @@ def __init__(self, payload: Any, rconf: RadasConfig):

def on_start(self, event):
self._container = event.container
self.log.debug("Start creating connection for sender")
self.log.debug("Start creating connection for sender to %s", self.rconf.umb_target())
conn = self._container.connect(
url=self.rconf.umb_target(),
ssl_domain=self._ssl
ssl_domain=self._ssl,
heartbeat=500
)
self.log.debug("Connection to %s is created.", conn.hostname)
if conn:
self.log.debug("Start creating sender")
self._sender = self._container.create_sender(conn, self.rconf.request_channel())
self.log.debug("Sender created. Remote address: %s", self._sender.target.address)

def on_connection_opened(self, event):
conn = event.connection
self.log.debug("Connection to %s is created.", conn.hostname)

def on_sendable(self, event):
if not self._message_sent:
msg = Message(body=self.payload, durable=True)
self.log.debug("Sending message: %s to %s", msg.id, event.sender.target.address)
self.log.debug("Sending message: %s to %s", msg.body, event.sender.target.address)
self._send_msg(msg)
self._message = msg
self._message_sent = True
Expand All @@ -232,7 +238,7 @@ def on_released(self, event):
self._handle_failed_delivery("Released")

def on_accepted(self, event):
self.log.info("Message accepted by receiver: %s", event.delivery)
self.log.info("Message accepted by receiver: %s", event.delivery.link.target.address)
self.status = "success"
self.close() # Close connection after confirmation

Expand All @@ -251,26 +257,26 @@ def close(self):
def _send_msg(self, msg: Message):
if self._sender and self._sender.credit > 0:
self._sender.send(msg)
self.log.debug("Message %s sent", msg.id)
self.log.debug("Message %s sent", msg.body)
else:
self.log.warning("Sender not ready or no credit available")

def _handle_failed_delivery(self, reason: str):
if self._pending:
msg = self._pending
self.log.warning("Message %s failed for reason: %s", msg.id, reason)
self.log.warning("Message %s failed for reason: %s", msg.body, reason)
max_retries = self.rconf.radas_sign_timeout_retry_count()
if self._retried < max_retries:
# Schedule retry
self._retried = self._retried + 1
self.log.info("Scheduling retry %s/%s for message %s",
self._retried, max_retries, msg.id)
self._retried, max_retries, msg.body)
# Schedule retry after delay
if self._container:
self._container.schedule(self.rconf.radas_sign_timeout_retry_interval(), self)
else:
# Max retries exceeded
self.log.error("Message %s failed after %s retries", msg.id, max_retries)
self.log.error("Message %s failed after %s retries", msg.body, max_retries)
self.status = "failed"
self._pending = None
else:
Expand Down