Skip to content

Commit

Permalink
Keep STOMP connection open (release-engineering#141)
Browse files Browse the repository at this point in the history
This would save a lot of time on consecutive POST requests.

Inactive connection is be closed/disconnected automatically after
a short time.

JIRA: RHELWF-10659
  • Loading branch information
hluk committed Feb 21, 2024
1 parent 991c85f commit 3fc31c4
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
21 changes: 13 additions & 8 deletions resultsdb/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import abc
import json
from threading import Lock

import pkg_resources
import stomp
Expand Down Expand Up @@ -216,6 +217,11 @@ def __init__(self, **kwargs):
if getattr(self, attr, None) is None:
raise ValueError(f"Missing {attr!r} option for STOMP messaging plugin")

self.conn_lock = Lock()
self.conn = stomp.connect.StompConnection11(**self.connection)
if self.use_ssl:
self.conn.set_ssl(**self.ssl_args)

def publish(self, msg):
# Add telemetry information. This includes an extra key
# traceparent.
Expand All @@ -227,17 +233,16 @@ def publish(self, msg):

@retry(stop=STOMP_RETRY_STOP, wait=STOMP_RETRY_WAIT, reraise=True)
def _publish_with_retry(self, **kwargs):
conn = stomp.connect.StompConnection11(**self.connection)
with self.conn_lock:
if not self.conn.is_connected():
log.info("Connecting to message bus")

if self.use_ssl:
conn.set_ssl(**self.ssl_args)
# Inactive connection is be closed/disconnected automatically
# after a short time.
self.conn.connect(wait=True)

conn.connect(wait=True)
try:
conn.send(**kwargs)
self.conn.send(**kwargs)
log.debug("Published message through stomp: %s", kwargs["body"])
finally:
conn.disconnect()


def load_messaging_plugin(name, plugin_args):
Expand Down
3 changes: 0 additions & 3 deletions testing/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def test_stomp_publish(self, mock_stomp):
plugin.publish({})
mock_stomp().connect.assert_called_once()
mock_stomp().send.assert_called_once()
mock_stomp().disconnect.assert_called_once()

def test_stomp_publish_connect_failed(self, mock_stomp):
plugin = messaging.load_messaging_plugin("stomp", MESSAGE_BUS_KWARGS)
Expand All @@ -214,7 +213,6 @@ def test_stomp_publish_connect_failed(self, mock_stomp):

assert mock_stomp().connect.call_count == 3
mock_stomp().send.assert_not_called()
mock_stomp().disconnect.assert_not_called()

def test_stomp_publish_send_failed(self, mock_stomp):
plugin = messaging.load_messaging_plugin("stomp", MESSAGE_BUS_KWARGS)
Expand All @@ -225,7 +223,6 @@ def test_stomp_publish_send_failed(self, mock_stomp):

assert mock_stomp().connect.call_count == 3
assert mock_stomp().send.call_count == 3
assert mock_stomp().disconnect.call_count == 3


class TestGetResultsParseArgs:
Expand Down

0 comments on commit 3fc31c4

Please sign in to comment.