diff --git a/changelog/5317.improvement.rst b/changelog/5317.improvement.rst new file mode 100644 index 000000000000..e685c340cd4d --- /dev/null +++ b/changelog/5317.improvement.rst @@ -0,0 +1,10 @@ +Events exported using ``rasa export`` receive a message header if published through a +``PikaEventBroker``. The header is added to the message's ``BasicProperties.headers`` +under the ``rasa-export-process-id`` key +(``rasa.core.constants.RASA_EXPORT_PROCESS_ID_HEADER_NAME``). The value is a +UUID4 generated at each call of ``rasa export``. The resulting header is a key-value +pair that looks as follows: + +.. code-block:: text + + 'rasa-export-process-id': 'd3b3d3ffe2bd4f379ccf21214ccfb261' diff --git a/rasa/cli/export.py b/rasa/cli/export.py index 03f3c7500685..c03c5c0f5d4e 100644 --- a/rasa/cli/export.py +++ b/rasa/cli/export.py @@ -14,7 +14,7 @@ if typing.TYPE_CHECKING: from rasa.core.brokers.broker import EventBroker - from rasa.core.brokers.pika import PikaEventBroker + from rasa.core.brokers.pika import PikaEventBroker, PikaProducer from rasa.core.tracker_store import TrackerStore from rasa.core.exporter import Exporter from rasa.core.utils import AvailableEndpoints @@ -145,9 +145,9 @@ def _prepare_event_broker(event_broker: "EventBroker") -> None: In addition, wait until the event broker reports a `ready` state. """ - from rasa.core.brokers.pika import PikaEventBroker + from rasa.core.brokers.pika import PikaEventBroker, PikaProducer - if isinstance(event_broker, PikaEventBroker): + if isinstance(event_broker, (PikaEventBroker, PikaProducer)): event_broker.should_keep_unpublished_messages = False event_broker.raise_on_failure = True diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 45949ff4fd6d..0c8ed8682ee7 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -5,7 +5,7 @@ import typing from collections import deque from threading import Thread -from typing import Callable, Deque, Dict, Optional, Text, Union +from typing import Callable, Deque, Dict, Optional, Text, Union, Any from rasa.constants import ( DEFAULT_LOG_LEVEL_LIBRARIES, @@ -326,6 +326,7 @@ def _run_pika_io_loop_in_thread(self) -> None: thread.start() def _run_pika_io_loop(self) -> None: + # noinspection PyUnresolvedReferences self._pika_connection.ioloop.start() def is_ready( @@ -353,18 +354,28 @@ def is_ready( return False def publish( - self, event: Dict, retries: int = 60, retry_delay_in_seconds: int = 5 + self, + event: Dict[Text, Any], + retries: int = 60, + retry_delay_in_seconds: int = 5, + headers: Optional[Dict[Text, Text]] = None, ) -> None: """Publish `event` into Pika queue. - Perform `retries` publish attempts with `retry_delay_in_seconds` between them. - """ + Args: + event: Serialised event to be published. + retries: Number of retries if publishing fails + retry_delay_in_seconds: Delay in seconds between retries. + headers: Message headers to append to the published message (key-value + dictionary). The headers can be retrieved in the consumer from the + `headers` attribute of the message's `BasicProperties`. + """ body = json.dumps(event) while retries: try: - self._publish(body) + self._publish(body, headers) return except Exception as e: logger.error( @@ -383,28 +394,48 @@ def publish( "'{}':\n{}".format(self.queue, self.host, body) ) - @property - def _message_properties(self) -> "BasicProperties": - """Create RabbitMQ message properties. + def _get_message_properties( + self, headers: Optional[Dict[Text, Text]] = None + ) -> "BasicProperties": + """Create RabbitMQ message `BasicProperties`. + + The `app_id` property is set to the value of `self.rasa_environment` if + present, and the message delivery mode is set to 2 (persistent). In + addition, the `headers` property is set if supplied. + + Args: + headers: Message headers to add to the message properties of the + published message (key-value dictionary). The headers can be retrieved in + the consumer from the `headers` attribute of the message's + `BasicProperties`. Returns: - pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment - variable as the properties' `app_id` value. If this variable is unset, empty - pika.spec.BasicProperties. + `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable + as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the + properties' headers. """ from pika.spec import BasicProperties - kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {} + # make message persistent + kwargs = {"delivery_mode": 2} + + if self.rasa_environment: + kwargs["app_id"] = self.rasa_environment - return BasicProperties(delivery_mode=2, **kwargs) # make message persistent + if headers: + kwargs["headers"] = headers - def _basic_publish(self, body: Text) -> None: + return BasicProperties(**kwargs) + + def _basic_publish( + self, body: Text, headers: Optional[Dict[Text, Text]] = None + ) -> None: self.channel.basic_publish( "", self.queue, body.encode(DEFAULT_ENCODING), - properties=self._message_properties, + properties=self._get_message_properties(headers), ) logger.debug( @@ -412,11 +443,11 @@ def _basic_publish(self, body: Text) -> None: f"'{self.host}':\n{body}" ) - def _publish(self, body: Text) -> None: + def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None: if self._pika_connection.is_closed: # Try to reset connection self._run_pika() - self._basic_publish(body) + self._basic_publish(body, headers) elif not self.channel and self.should_keep_unpublished_messages: logger.warning( f"RabbitMQ channel has not been assigned. Adding message to " @@ -426,7 +457,7 @@ def _publish(self, body: Text) -> None: ) self._unpublished_messages.append(body) else: - self._basic_publish(body) + self._basic_publish(body, headers) def create_rabbitmq_ssl_options( diff --git a/rasa/core/constants.py b/rasa/core/constants.py index cbbf2b76e63a..a4b2799b523c 100644 --- a/rasa/core/constants.py +++ b/rasa/core/constants.py @@ -58,3 +58,6 @@ RESPOND_PREFIX = "respond_" DEFAULT_CATEGORICAL_SLOT_VALUE = "__other__" + +# RabbitMQ message property header added to events published using `rasa export` +RASA_EXPORT_PROCESS_ID_HEADER_NAME = "rasa-export-process-id" diff --git a/rasa/core/exporter.py b/rasa/core/exporter.py index 527fda24d70a..d529895990aa 100644 --- a/rasa/core/exporter.py +++ b/rasa/core/exporter.py @@ -1,11 +1,14 @@ import itertools import logging +import uuid from typing import Text, Optional, List, Set, Dict, Any from tqdm import tqdm import rasa.cli.utils as cli_utils from rasa.core.brokers.broker import EventBroker +from rasa.core.brokers.pika import PikaProducer, PikaEventBroker +from rasa.core.constants import RASA_EXPORT_PROCESS_ID_HEADER_NAME from rasa.core.tracker_store import TrackerStore from rasa.core.trackers import EventVerbosity from rasa.exceptions import ( @@ -57,6 +60,9 @@ def publish_events(self) -> int: Exits if the publishing of events is interrupted due to an error. In that case, the CLI command to continue the export where it was interrupted is printed. + Returns: + The number of successfully published events. + """ events = self._fetch_events_within_time_range() @@ -67,10 +73,12 @@ def publish_events(self) -> int: published_events = 0 current_timestamp = None + headers = self._get_message_headers() + for event in tqdm(events, "events"): # noinspection PyBroadException try: - self.event_broker.publish(event) + self._publish_with_message_headers(event, headers) published_events += 1 current_timestamp = event["timestamp"] except Exception as e: @@ -81,6 +89,35 @@ def publish_events(self) -> int: return published_events + def _get_message_headers(self) -> Optional[Dict[Text, Text]]: + """Generate a message header for publishing events to a `PikaEventBroker`. + + Returns: + Message headers with a randomly generated uuid under the + `RASA_EXPORT_PROCESS_ID_HEADER_NAME` key if `self.event_broker` is a + `PikaEventBroker`, else `None`. + + """ + if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)): + return {RASA_EXPORT_PROCESS_ID_HEADER_NAME: uuid.uuid4().hex} + + return None + + def _publish_with_message_headers( + self, event: Dict[Text, Any], headers: Optional[Dict[Text, Text]] + ) -> None: + """Publish `event` to a message broker with `headers`. + + Args: + event: Serialized event to be published. + headers: Message headers to be published if `self.event_broker` is a + `PikaEventBroker`. + """ + if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)): + self.event_broker.publish(event=event, headers=headers) + else: + self.event_broker.publish(event) + def _get_conversation_ids_in_tracker(self) -> Set[Text]: """Fetch conversation IDs in `self.tracker_store`. diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 4e3453013f0f..5c33d62665d3 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -39,12 +39,12 @@ def test_pika_message_property_app_id(monkeypatch: MonkeyPatch): # unset RASA_ENVIRONMENT env var results in empty App ID monkeypatch.delenv("RASA_ENVIRONMENT", raising=False) - assert not pika_producer._message_properties.app_id + assert not pika_producer._get_message_properties().app_id # setting it to some value results in that value as the App ID rasa_environment = "some-test-environment" monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment) - assert pika_producer._message_properties.app_id == rasa_environment + assert pika_producer._get_message_properties().app_id == rasa_environment def test_no_broker_in_config(): diff --git a/tests/core/test_exporter.py b/tests/core/test_exporter.py index a91be89f159b..6c858e9f3855 100644 --- a/tests/core/test_exporter.py +++ b/tests/core/test_exporter.py @@ -4,9 +4,11 @@ from unittest.mock import Mock import pytest -from _pytest.logging import LogCaptureFixture import rasa.utils.io as io_utils +from rasa.core.brokers.pika import PikaEventBroker +from rasa.core.brokers.sql import SQLEventBroker +from rasa.core.constants import RASA_EXPORT_PROCESS_ID_HEADER_NAME from rasa.core.trackers import DialogueStateTracker from rasa.exceptions import ( NoConversationsInTrackerStoreError, @@ -197,8 +199,52 @@ def test_sort_and_select_events_by_timestamp_error(): exporter._sort_and_select_events_by_timestamp(events) -def _add_conversation_id_to_event(event: Dict, conversation_id: Text): - event["sender_id"] = conversation_id +def test_get_message_headers_pika_event_broker(): + event_broker = Mock(spec=PikaEventBroker) + exporter = MockExporter(event_broker=event_broker) + + # noinspection PyProtectedMember + headers = exporter._get_message_headers() + + assert headers.get(RASA_EXPORT_PROCESS_ID_HEADER_NAME) + + +def test_get_message_headers_non_pika_broker(): + event_broker = Mock() + exporter = MockExporter(event_broker=event_broker) + + # noinspection PyProtectedMember + assert exporter._get_message_headers() is None + + +def test_publish_with_headers_pika_event_broker(): + event_broker = Mock(spec=PikaEventBroker) + exporter = MockExporter(event_broker=event_broker) + + headers = {"some": "header"} + event = {"some": "event"} + + # noinspection PyProtectedMember + exporter._publish_with_message_headers(event, headers) + + # the `PikaEventBroker`'s `publish()` method was called with both + # the `event` and `headers` arguments + event_broker.publish.assert_called_with(event=event, headers=headers) + + +def test_publish_with_headers_non_pika_event_broker(): + event_broker = Mock(SQLEventBroker) + exporter = MockExporter(event_broker=event_broker) + + headers = {"some": "header"} + event = {"some": "event"} + + # noinspection PyProtectedMember + exporter._publish_with_message_headers(event, headers) + + # the `SQLEventBroker`'s `publish()` method was called with only the `event` + # argument + event_broker.publish.assert_called_with(event) def test_publishing_error():