Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ app_id message property #4715

Merged
merged 8 commits into from Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.rst
Expand Up @@ -12,10 +12,13 @@ This project adheres to `Semantic Versioning`_ starting with version 1.0.

Added
-----
- ``PikaEventProducer`` adds the RabbitMQ ``App ID`` message property to published
messages with the value of the ``RASA_ENVIRONMENT`` environment variable. The
message property will not be assigned if this environment variable isn't set.

Changed
-------
- updated mattermost connector documentation to be more clear.
- Updated Mattermost connector documentation to be more clear.
- Updated format strings to f-strings where appropriate.

Removed
Expand Down
58 changes: 47 additions & 11 deletions rasa/core/brokers/pika.py
Expand Up @@ -12,10 +12,11 @@
from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
from rasa.core.brokers.event_channel import EventChannel
from rasa.utils.endpoints import EndpointConfig
from rasa.utils.io import DEFAULT_ENCODING

if typing.TYPE_CHECKING:
from pika.adapters.blocking_connection import BlockingChannel
from pika import SelectConnection, BlockingConnection
from pika import SelectConnection, BlockingConnection, BasicProperties
from pika.channel import Channel
from pika.connection import Parameters, Connection

Expand Down Expand Up @@ -139,16 +140,16 @@ def initialise_pika_channel(
"""Initialise a Pika channel with a durable queue.

Args:
host: Pika host
queue: Pika queue to declare
username: username for authentication with Pika host
password: password for authentication with Pika host
port: port of the Pika host
connection_attempts: number of channel attempts before giving up
retry_delay_in_seconds: delay in seconds between channel attempts
host: Pika host.
queue: Pika queue to declare.
username: Username for authentication with Pika host.
password: Password for authentication with Pika host.
port: port of the Pika host.
connection_attempts: Number of channel attempts before giving up.
retry_delay_in_seconds: Delay in seconds between channel attempts.

Returns:
Pika `BlockingChannel` with declared queue
Pika `BlockingChannel` with declared queue.

"""

Expand Down Expand Up @@ -206,6 +207,17 @@ def __init__(
ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
),
):
"""RabbitMQ event producer.

Args:
host: Pika host.
username: Username for authentication with Pika host.
password: Password for authentication with Pika host.
port: port of the Pika host.
queue: Pika queue to declare.
loglevel: Logging level.

"""
logging.getLogger("pika").setLevel(loglevel)

self.queue = queue
Expand All @@ -224,6 +236,10 @@ def __del__(self) -> None:
close_pika_channel(self.channel)
close_pika_connection(self.channel.connection)

@property
def rasa_environment(self) -> Optional[Text]:
return os.environ.get("RASA_ENVIRONMENT")

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
Expand Down Expand Up @@ -285,7 +301,6 @@ def publish(
body = json.dumps(event)

while retries:
# noinspection PyBroadException
try:
self._publish(body)
return
Expand All @@ -304,6 +319,22 @@ def publish(
"'{}':\n{}".format(self.queue, self.host, body)
)

@property
def _message_properties(self) -> "BasicProperties":
"""Create RabbitMQ message properties.

Returns:
pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment
variable as the properties' app_id value. If this variable is unset, empty
ricwo marked this conversation as resolved.
Show resolved Hide resolved
pika.spec.BasicProperties.

"""
from pika.spec import BasicProperties

kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {}
wochinge marked this conversation as resolved.
Show resolved Hide resolved

return BasicProperties(**kwargs)

def _publish(self, body: Text) -> None:
if self._pika_connection.is_closed:
# Try to reset connection
Expand All @@ -317,7 +348,12 @@ def _publish(self, body: Text) -> None:
)
self._unpublished_messages.append(body)
else:
self.channel.basic_publish("", self.queue, body)
self.channel.basic_publish(
"",
self.queue,
body.encode(DEFAULT_ENCODING),
properties=self._message_properties,
)

logger.debug(
f"Published Pika events to queue '{self.queue}' on host "
Expand Down
19 changes: 19 additions & 0 deletions tests/core/test_broker.py
@@ -1,4 +1,7 @@
import json
from unittest.mock import patch

from _pytest.monkeypatch import MonkeyPatch

import rasa.core.brokers.utils as broker_utils
from rasa.core.brokers.file_producer import FileProducer
Expand Down Expand Up @@ -28,6 +31,22 @@ def test_pika_broker_from_config():
assert actual.queue == "queue"


# noinspection PyProtectedMember
def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):
# patch PikaProducer so it doesn't try to connect to RabbitMQ on init
with patch.object(PikaProducer, "_run_pika", lambda _: None):
pika_producer = PikaProducer("", "", "")

# unset RASA_ENVIRONMENT env var results in empty App ID
monkeypatch.delenv("RASA_ENVIRONMENT", raising=False)
assert not pika_producer._message_properties.app_id

# setting it to some value results in that value as the App ID
rasa_environment = "some-test-environment"
monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment)
assert pika_producer._message_properties.app_id == rasa_environment


def test_no_broker_in_config():
cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker")

Expand Down