From 03ff837f2f22ef68c8669fd8b9e389797ae1a039 Mon Sep 17 00:00:00 2001 From: ricwo Date: Thu, 31 Oct 2019 17:03:08 +0100 Subject: [PATCH 1/4] wip --- rasa/core/brokers/pika.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 6912d8dd40f2..30c26da1e960 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -139,16 +139,16 @@ def initialise_pika_channel( """Initialise a Pika channel with a durable queue. Args: - host: Pika host - queue: Pika queue to declare - username: username for authentication with Pika host - password: password for authentication with Pika host - port: port of the Pika host - connection_attempts: number of channel attempts before giving up - retry_delay_in_seconds: delay in seconds between channel attempts + host: Pika host. + queue: Pika queue to declare. + username: Username for authentication with Pika host. + password: Password for authentication with Pika host. + port: port of the Pika host. + connection_attempts: Number of channel attempts before giving up. + retry_delay_in_seconds: Delay in seconds between channel attempts. Returns: - Pika `BlockingChannel` with declared queue + Pika `BlockingChannel` with declared queue. """ @@ -206,6 +206,17 @@ def __init__( ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES ), ): + """RabbitMQ event producer. + + Args: + host: Pika host. + username: Username for authentication with Pika host. + password: Password for authentication with Pika host. + port: port of the Pika host. + queue: Pika queue to declare. + loglevel: Logging level. + + """ logging.getLogger("pika").setLevel(loglevel) self.queue = queue @@ -285,7 +296,6 @@ def publish( body = json.dumps(event) while retries: - # noinspection PyBroadException try: self._publish(body) return From 2c715933d0658f13121ba0c1ce49da059091e19d Mon Sep 17 00:00:00 2001 From: ricwo Date: Thu, 31 Oct 2019 17:54:14 +0100 Subject: [PATCH 2/4] add test --- CHANGELOG.rst | 3 +++ rasa/core/brokers/pika.py | 30 ++++++++++++++++++++++++++++-- tests/core/test_broker.py | 27 +++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d1fab0e0b660..0dbbfbc36516 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,9 @@ This project adheres to `Semantic Versioning`_ starting with version 1.0. Added ----- +- ``PikaEventProducer`` adds the RabbitMQ ``App ID`` message property to published + messages with the value of the ``RASA_ENVIRONMENT`` environment variable. The + message property will not be assigned if this environment variable isn't set. Changed ------- diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 30c26da1e960..59e10dbb9189 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -12,10 +12,11 @@ from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES from rasa.core.brokers.event_channel import EventChannel from rasa.utils.endpoints import EndpointConfig +from rasa.utils.io import DEFAULT_ENCODING if typing.TYPE_CHECKING: from pika.adapters.blocking_connection import BlockingChannel - from pika import SelectConnection, BlockingConnection + from pika import SelectConnection, BlockingConnection, BasicProperties from pika.channel import Channel from pika.connection import Parameters, Connection @@ -235,6 +236,10 @@ def __del__(self) -> None: close_pika_channel(self.channel) close_pika_connection(self.channel.connection) + @property + def rasa_environment(self) -> Optional[Text]: + return os.environ.get("RASA_ENVIRONMENT") + @classmethod def from_endpoint_config( cls, broker_config: Optional["EndpointConfig"] @@ -314,6 +319,22 @@ def publish( "'{}':\n{}".format(self.queue, self.host, body) ) + @property + def _message_properties(self) -> "BasicProperties": + """Create RabbitMQ message properties. + + Returns: + pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment + variable as the properties' app_id value. If this variable is unset, empty + pika.spec.BasicProperties. + + """ + from pika.spec import BasicProperties + + kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {} + + return BasicProperties(**kwargs) + def _publish(self, body: Text) -> None: if self._pika_connection.is_closed: # Try to reset connection @@ -327,7 +348,12 @@ def _publish(self, body: Text) -> None: ) self._unpublished_messages.append(body) else: - self.channel.basic_publish("", self.queue, body) + self.channel.basic_publish( + "", + self.queue, + body.encode(DEFAULT_ENCODING), + properties=self._message_properties, + ) logger.debug( f"Published Pika events to queue '{self.queue}' on host " diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index aeb0e6be9f3e..136fd20ddcca 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -1,5 +1,8 @@ import json +from _pytest.monkeypatch import MonkeyPatch + +from unittest.mock import patch import rasa.core.brokers.utils as broker_utils from rasa.core.brokers.file_producer import FileProducer from rasa.core.brokers.kafka import KafkaProducer @@ -28,6 +31,30 @@ def test_pika_broker_from_config(): assert actual.queue == "queue" +def test_pika_message_property_app_id(monkeypatch: MonkeyPatch): + rasa_environment = "some-test-environment" + + monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment) + + # patch pika producer init + with patch.object(PikaProducer, "__init__", lambda: None): + pika_producer = PikaProducer() + + # noinspection PyProtectedMember + assert pika_producer._message_properties.app_id == rasa_environment + + +def test_pika_message_property_unset_app_id(monkeypatch: MonkeyPatch): + monkeypatch.delenv("RASA_ENVIRONMENT", raising=False) + + # patch pika producer so it doesn't run `_run_pika()` + with patch.object(PikaProducer, "__init__", lambda: None): + pika_producer = PikaProducer() + + # noinspection PyProtectedMember + assert not pika_producer._message_properties.app_id + + def test_no_broker_in_config(): cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker") From 7584ade11fbf007c73dc0db362ca3a13e4aa6246 Mon Sep 17 00:00:00 2001 From: ricwo Date: Fri, 1 Nov 2019 11:58:01 +0100 Subject: [PATCH 3/4] fix test --- tests/core/test_broker.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 136fd20ddcca..e7a47bd17355 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -1,8 +1,8 @@ import json +from unittest.mock import patch from _pytest.monkeypatch import MonkeyPatch -from unittest.mock import patch import rasa.core.brokers.utils as broker_utils from rasa.core.brokers.file_producer import FileProducer from rasa.core.brokers.kafka import KafkaProducer @@ -31,29 +31,21 @@ def test_pika_broker_from_config(): assert actual.queue == "queue" +# noinspection PyProtectedMember def test_pika_message_property_app_id(monkeypatch: MonkeyPatch): - rasa_environment = "some-test-environment" - - monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment) - - # patch pika producer init - with patch.object(PikaProducer, "__init__", lambda: None): - pika_producer = PikaProducer() - - # noinspection PyProtectedMember - assert pika_producer._message_properties.app_id == rasa_environment + # patch PikaProducer so it doesn't try to connect to RabbitMQ on init + with patch.object(PikaProducer, "_run_pika", lambda _: None): + pika_producer = PikaProducer("", "", "") - -def test_pika_message_property_unset_app_id(monkeypatch: MonkeyPatch): + # unset RASA_ENVIRONMENT env var results in empty App ID monkeypatch.delenv("RASA_ENVIRONMENT", raising=False) - - # patch pika producer so it doesn't run `_run_pika()` - with patch.object(PikaProducer, "__init__", lambda: None): - pika_producer = PikaProducer() - - # noinspection PyProtectedMember assert not pika_producer._message_properties.app_id + # setting it to some value results in that value as the App ID + rasa_environment = "some-test-environment" + monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment) + assert pika_producer._message_properties.app_id == rasa_environment + def test_no_broker_in_config(): cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker") From 8d8af8049370d2c90c991b69965978f127f5fbd3 Mon Sep 17 00:00:00 2001 From: ricwo Date: Sat, 2 Nov 2019 10:23:37 +0100 Subject: [PATCH 4/4] merge 1.4.x --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b6b9483f7bae..977f2b490371 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -18,7 +18,7 @@ Added Changed ------- -- updated mattermost connector documentation to be more clear. +- Updated Mattermost connector documentation to be more clear. - Updated format strings to f-strings where appropriate. Removed