Skip to content

Commit

Permalink
Merge pull request #5318 from RasaHQ/export-header
Browse files Browse the repository at this point in the history
add RabbitMQ message header to "rasa export"
  • Loading branch information
tmbo committed Feb 26, 2020
2 parents 4234433 + 9ecb243 commit ebfcfc9
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 27 deletions.
10 changes: 10 additions & 0 deletions changelog/5317.improvement.rst
@@ -0,0 +1,10 @@
Events exported using ``rasa export`` receive a message header if published through a
``PikaEventBroker``. The header is added to the message's ``BasicProperties.headers``
under the ``rasa-export-process-id`` key
(``rasa.core.constants.RASA_EXPORT_PROCESS_ID_HEADER_NAME``). The value is a
UUID4 generated at each call of ``rasa export``. The resulting header is a key-value
pair that looks as follows:

.. code-block:: text
'rasa-export-process-id': 'd3b3d3ffe2bd4f379ccf21214ccfb261'
6 changes: 3 additions & 3 deletions rasa/cli/export.py
Expand Up @@ -14,7 +14,7 @@

if typing.TYPE_CHECKING:
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.tracker_store import TrackerStore
from rasa.core.exporter import Exporter
from rasa.core.utils import AvailableEndpoints
Expand Down Expand Up @@ -145,9 +145,9 @@ def _prepare_event_broker(event_broker: "EventBroker") -> None:
In addition, wait until the event broker reports a `ready` state.
"""
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer

if isinstance(event_broker, PikaEventBroker):
if isinstance(event_broker, (PikaEventBroker, PikaProducer)):
event_broker.should_keep_unpublished_messages = False
event_broker.raise_on_failure = True

Expand Down
67 changes: 49 additions & 18 deletions rasa/core/brokers/pika.py
Expand Up @@ -5,7 +5,7 @@
import typing
from collections import deque
from threading import Thread
from typing import Callable, Deque, Dict, Optional, Text, Union
from typing import Callable, Deque, Dict, Optional, Text, Union, Any

from rasa.constants import (
DEFAULT_LOG_LEVEL_LIBRARIES,
Expand Down Expand Up @@ -326,6 +326,7 @@ def _run_pika_io_loop_in_thread(self) -> None:
thread.start()

def _run_pika_io_loop(self) -> None:
# noinspection PyUnresolvedReferences
self._pika_connection.ioloop.start()

def is_ready(
Expand Down Expand Up @@ -353,18 +354,28 @@ def is_ready(
return False

def publish(
self, event: Dict, retries: int = 60, retry_delay_in_seconds: int = 5
self,
event: Dict[Text, Any],
retries: int = 60,
retry_delay_in_seconds: int = 5,
headers: Optional[Dict[Text, Text]] = None,
) -> None:
"""Publish `event` into Pika queue.
Perform `retries` publish attempts with `retry_delay_in_seconds` between them.
"""
Args:
event: Serialised event to be published.
retries: Number of retries if publishing fails
retry_delay_in_seconds: Delay in seconds between retries.
headers: Message headers to append to the published message (key-value
dictionary). The headers can be retrieved in the consumer from the
`headers` attribute of the message's `BasicProperties`.
"""
body = json.dumps(event)

while retries:
try:
self._publish(body)
self._publish(body, headers)
return
except Exception as e:
logger.error(
Expand All @@ -383,40 +394,60 @@ def publish(
"'{}':\n{}".format(self.queue, self.host, body)
)

@property
def _message_properties(self) -> "BasicProperties":
"""Create RabbitMQ message properties.
def _get_message_properties(
self, headers: Optional[Dict[Text, Text]] = None
) -> "BasicProperties":
"""Create RabbitMQ message `BasicProperties`.
The `app_id` property is set to the value of `self.rasa_environment` if
present, and the message delivery mode is set to 2 (persistent). In
addition, the `headers` property is set if supplied.
Args:
headers: Message headers to add to the message properties of the
published message (key-value dictionary). The headers can be retrieved in
the consumer from the `headers` attribute of the message's
`BasicProperties`.
Returns:
pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment
variable as the properties' `app_id` value. If this variable is unset, empty
pika.spec.BasicProperties.
`pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable
as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the
properties' headers.
"""
from pika.spec import BasicProperties

kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {}
# make message persistent
kwargs = {"delivery_mode": 2}

if self.rasa_environment:
kwargs["app_id"] = self.rasa_environment

return BasicProperties(delivery_mode=2, **kwargs) # make message persistent
if headers:
kwargs["headers"] = headers

def _basic_publish(self, body: Text) -> None:
return BasicProperties(**kwargs)

def _basic_publish(
self, body: Text, headers: Optional[Dict[Text, Text]] = None
) -> None:
self.channel.basic_publish(
"",
self.queue,
body.encode(DEFAULT_ENCODING),
properties=self._message_properties,
properties=self._get_message_properties(headers),
)

logger.debug(
f"Published Pika events to queue '{self.queue}' on host "
f"'{self.host}':\n{body}"
)

def _publish(self, body: Text) -> None:
def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None:
if self._pika_connection.is_closed:
# Try to reset connection
self._run_pika()
self._basic_publish(body)
self._basic_publish(body, headers)
elif not self.channel and self.should_keep_unpublished_messages:
logger.warning(
f"RabbitMQ channel has not been assigned. Adding message to "
Expand All @@ -426,7 +457,7 @@ def _publish(self, body: Text) -> None:
)
self._unpublished_messages.append(body)
else:
self._basic_publish(body)
self._basic_publish(body, headers)


def create_rabbitmq_ssl_options(
Expand Down
3 changes: 3 additions & 0 deletions rasa/core/constants.py
Expand Up @@ -58,3 +58,6 @@
RESPOND_PREFIX = "respond_"

DEFAULT_CATEGORICAL_SLOT_VALUE = "__other__"

# RabbitMQ message property header added to events published using `rasa export`
RASA_EXPORT_PROCESS_ID_HEADER_NAME = "rasa-export-process-id"
39 changes: 38 additions & 1 deletion rasa/core/exporter.py
@@ -1,11 +1,14 @@
import itertools
import logging
import uuid
from typing import Text, Optional, List, Set, Dict, Any

from tqdm import tqdm

import rasa.cli.utils as cli_utils
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.pika import PikaProducer, PikaEventBroker
from rasa.core.constants import RASA_EXPORT_PROCESS_ID_HEADER_NAME
from rasa.core.tracker_store import TrackerStore
from rasa.core.trackers import EventVerbosity
from rasa.exceptions import (
Expand Down Expand Up @@ -57,6 +60,9 @@ def publish_events(self) -> int:
Exits if the publishing of events is interrupted due to an error. In that case,
the CLI command to continue the export where it was interrupted is printed.
Returns:
The number of successfully published events.
"""
events = self._fetch_events_within_time_range()

Expand All @@ -67,10 +73,12 @@ def publish_events(self) -> int:
published_events = 0
current_timestamp = None

headers = self._get_message_headers()

for event in tqdm(events, "events"):
# noinspection PyBroadException
try:
self.event_broker.publish(event)
self._publish_with_message_headers(event, headers)
published_events += 1
current_timestamp = event["timestamp"]
except Exception as e:
Expand All @@ -81,6 +89,35 @@ def publish_events(self) -> int:

return published_events

def _get_message_headers(self) -> Optional[Dict[Text, Text]]:
"""Generate a message header for publishing events to a `PikaEventBroker`.
Returns:
Message headers with a randomly generated uuid under the
`RASA_EXPORT_PROCESS_ID_HEADER_NAME` key if `self.event_broker` is a
`PikaEventBroker`, else `None`.
"""
if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)):
return {RASA_EXPORT_PROCESS_ID_HEADER_NAME: uuid.uuid4().hex}

return None

def _publish_with_message_headers(
self, event: Dict[Text, Any], headers: Optional[Dict[Text, Text]]
) -> None:
"""Publish `event` to a message broker with `headers`.
Args:
event: Serialized event to be published.
headers: Message headers to be published if `self.event_broker` is a
`PikaEventBroker`.
"""
if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)):
self.event_broker.publish(event=event, headers=headers)
else:
self.event_broker.publish(event)

def _get_conversation_ids_in_tracker(self) -> Set[Text]:
"""Fetch conversation IDs in `self.tracker_store`.
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_broker.py
Expand Up @@ -39,12 +39,12 @@ def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):

# unset RASA_ENVIRONMENT env var results in empty App ID
monkeypatch.delenv("RASA_ENVIRONMENT", raising=False)
assert not pika_producer._message_properties.app_id
assert not pika_producer._get_message_properties().app_id

# setting it to some value results in that value as the App ID
rasa_environment = "some-test-environment"
monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment)
assert pika_producer._message_properties.app_id == rasa_environment
assert pika_producer._get_message_properties().app_id == rasa_environment


def test_no_broker_in_config():
Expand Down
52 changes: 49 additions & 3 deletions tests/core/test_exporter.py
Expand Up @@ -4,9 +4,11 @@
from unittest.mock import Mock

import pytest
from _pytest.logging import LogCaptureFixture

import rasa.utils.io as io_utils
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.brokers.sql import SQLEventBroker
from rasa.core.constants import RASA_EXPORT_PROCESS_ID_HEADER_NAME
from rasa.core.trackers import DialogueStateTracker
from rasa.exceptions import (
NoConversationsInTrackerStoreError,
Expand Down Expand Up @@ -197,8 +199,52 @@ def test_sort_and_select_events_by_timestamp_error():
exporter._sort_and_select_events_by_timestamp(events)


def _add_conversation_id_to_event(event: Dict, conversation_id: Text):
event["sender_id"] = conversation_id
def test_get_message_headers_pika_event_broker():
event_broker = Mock(spec=PikaEventBroker)
exporter = MockExporter(event_broker=event_broker)

# noinspection PyProtectedMember
headers = exporter._get_message_headers()

assert headers.get(RASA_EXPORT_PROCESS_ID_HEADER_NAME)


def test_get_message_headers_non_pika_broker():
event_broker = Mock()
exporter = MockExporter(event_broker=event_broker)

# noinspection PyProtectedMember
assert exporter._get_message_headers() is None


def test_publish_with_headers_pika_event_broker():
event_broker = Mock(spec=PikaEventBroker)
exporter = MockExporter(event_broker=event_broker)

headers = {"some": "header"}
event = {"some": "event"}

# noinspection PyProtectedMember
exporter._publish_with_message_headers(event, headers)

# the `PikaEventBroker`'s `publish()` method was called with both
# the `event` and `headers` arguments
event_broker.publish.assert_called_with(event=event, headers=headers)


def test_publish_with_headers_non_pika_event_broker():
event_broker = Mock(SQLEventBroker)
exporter = MockExporter(event_broker=event_broker)

headers = {"some": "header"}
event = {"some": "event"}

# noinspection PyProtectedMember
exporter._publish_with_message_headers(event, headers)

# the `SQLEventBroker`'s `publish()` method was called with only the `event`
# argument
event_broker.publish.assert_called_with(event)


def test_publishing_error():
Expand Down

0 comments on commit ebfcfc9

Please sign in to comment.