Skip to content

Commit

Permalink
Merge 8d8af80 into 6b9016c
Browse files Browse the repository at this point in the history
  • Loading branch information
ricwo committed Nov 2, 2019
2 parents 6b9016c + 8d8af80 commit 3c8e764
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 12 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.rst
Expand Up @@ -12,10 +12,13 @@ This project adheres to `Semantic Versioning`_ starting with version 1.0.

Added
-----
- ``PikaEventProducer`` adds the RabbitMQ ``App ID`` message property to published
messages with the value of the ``RASA_ENVIRONMENT`` environment variable. The
message property will not be assigned if this environment variable isn't set.

Changed
-------
- updated mattermost connector documentation to be more clear.
- Updated Mattermost connector documentation to be more clear.
- Updated format strings to f-strings where appropriate.

Removed
Expand Down
58 changes: 47 additions & 11 deletions rasa/core/brokers/pika.py
Expand Up @@ -12,10 +12,11 @@
from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
from rasa.core.brokers.event_channel import EventChannel
from rasa.utils.endpoints import EndpointConfig
from rasa.utils.io import DEFAULT_ENCODING

if typing.TYPE_CHECKING:
from pika.adapters.blocking_connection import BlockingChannel
from pika import SelectConnection, BlockingConnection
from pika import SelectConnection, BlockingConnection, BasicProperties
from pika.channel import Channel
from pika.connection import Parameters, Connection

Expand Down Expand Up @@ -139,16 +140,16 @@ def initialise_pika_channel(
"""Initialise a Pika channel with a durable queue.
Args:
host: Pika host
queue: Pika queue to declare
username: username for authentication with Pika host
password: password for authentication with Pika host
port: port of the Pika host
connection_attempts: number of channel attempts before giving up
retry_delay_in_seconds: delay in seconds between channel attempts
host: Pika host.
queue: Pika queue to declare.
username: Username for authentication with Pika host.
password: Password for authentication with Pika host.
port: port of the Pika host.
connection_attempts: Number of channel attempts before giving up.
retry_delay_in_seconds: Delay in seconds between channel attempts.
Returns:
Pika `BlockingChannel` with declared queue
Pika `BlockingChannel` with declared queue.
"""

Expand Down Expand Up @@ -206,6 +207,17 @@ def __init__(
ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
),
):
"""RabbitMQ event producer.
Args:
host: Pika host.
username: Username for authentication with Pika host.
password: Password for authentication with Pika host.
port: port of the Pika host.
queue: Pika queue to declare.
loglevel: Logging level.
"""
logging.getLogger("pika").setLevel(loglevel)

self.queue = queue
Expand All @@ -224,6 +236,10 @@ def __del__(self) -> None:
close_pika_channel(self.channel)
close_pika_connection(self.channel.connection)

@property
def rasa_environment(self) -> Optional[Text]:
return os.environ.get("RASA_ENVIRONMENT")

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
Expand Down Expand Up @@ -285,7 +301,6 @@ def publish(
body = json.dumps(event)

while retries:
# noinspection PyBroadException
try:
self._publish(body)
return
Expand All @@ -304,6 +319,22 @@ def publish(
"'{}':\n{}".format(self.queue, self.host, body)
)

@property
def _message_properties(self) -> "BasicProperties":
"""Create RabbitMQ message properties.
Returns:
pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment
variable as the properties' app_id value. If this variable is unset, empty
pika.spec.BasicProperties.
"""
from pika.spec import BasicProperties

kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {}

return BasicProperties(**kwargs)

def _publish(self, body: Text) -> None:
if self._pika_connection.is_closed:
# Try to reset connection
Expand All @@ -317,7 +348,12 @@ def _publish(self, body: Text) -> None:
)
self._unpublished_messages.append(body)
else:
self.channel.basic_publish("", self.queue, body)
self.channel.basic_publish(
"",
self.queue,
body.encode(DEFAULT_ENCODING),
properties=self._message_properties,
)

logger.debug(
f"Published Pika events to queue '{self.queue}' on host "
Expand Down
19 changes: 19 additions & 0 deletions tests/core/test_broker.py
@@ -1,4 +1,7 @@
import json
from unittest.mock import patch

from _pytest.monkeypatch import MonkeyPatch

import rasa.core.brokers.utils as broker_utils
from rasa.core.brokers.file_producer import FileProducer
Expand Down Expand Up @@ -28,6 +31,22 @@ def test_pika_broker_from_config():
assert actual.queue == "queue"


# noinspection PyProtectedMember
def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):
# patch PikaProducer so it doesn't try to connect to RabbitMQ on init
with patch.object(PikaProducer, "_run_pika", lambda _: None):
pika_producer = PikaProducer("", "", "")

# unset RASA_ENVIRONMENT env var results in empty App ID
monkeypatch.delenv("RASA_ENVIRONMENT", raising=False)
assert not pika_producer._message_properties.app_id

# setting it to some value results in that value as the App ID
rasa_environment = "some-test-environment"
monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment)
assert pika_producer._message_properties.app_id == rasa_environment


def test_no_broker_in_config():
cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker")

Expand Down

0 comments on commit 3c8e764

Please sign in to comment.