New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add RabbitMQ message header to "rasa export" #5318
Changes from 6 commits
3bb4953
cc2a459
3fb5940
f3b0f1b
1134ec8
f7dbf13
9ecb243
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
Events exported using ``rasa export`` receive a message header if published through a | ||
``PikaEventBroker``. The header is added to the message's ``BasicProperties.headers`` | ||
under the ``rasa-export-process-id`` key | ||
(``rasa.core.constants.RASA_EXPORT_PROCESS_ID_HEADER_NAME``). The value is a | ||
UUID4 generated at each call of ``rasa export``. The resulting header is a key-value | ||
pair that looks as follows: | ||
|
||
.. code-block:: text | ||
|
||
'rasa-export-process-id': 'd3b3d3ffe2bd4f379ccf21214ccfb261' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,14 @@ | ||
import itertools | ||
import logging | ||
import uuid | ||
from typing import Text, Optional, List, Set, Dict, Any | ||
|
||
from tqdm import tqdm | ||
|
||
import rasa.cli.utils as cli_utils | ||
from rasa.core.brokers.broker import EventBroker | ||
from rasa.core.brokers.pika import PikaProducer, PikaEventBroker | ||
from rasa.core.constants import RASA_EXPORT_PROCESS_ID_HEADER_NAME | ||
from rasa.core.tracker_store import TrackerStore | ||
from rasa.core.trackers import EventVerbosity | ||
from rasa.exceptions import ( | ||
|
@@ -57,6 +60,9 @@ def publish_events(self) -> int: | |
Exits if the publishing of events is interrupted due to an error. In that case, | ||
the CLI command to continue the export where it was interrupted is printed. | ||
|
||
Returns: | ||
The number of successfully published events. | ||
|
||
""" | ||
events = self._fetch_events_within_time_range() | ||
|
||
|
@@ -67,10 +73,12 @@ def publish_events(self) -> int: | |
published_events = 0 | ||
current_timestamp = None | ||
|
||
headers = self._get_message_headers() | ||
|
||
for event in tqdm(events, "events"): | ||
# noinspection PyBroadException | ||
try: | ||
self.event_broker.publish(event) | ||
self._publish_with_message_headers(event, headers) | ||
published_events += 1 | ||
current_timestamp = event["timestamp"] | ||
except Exception as e: | ||
|
@@ -81,6 +89,36 @@ def publish_events(self) -> int: | |
|
||
return published_events | ||
|
||
def _get_message_headers(self) -> Optional[Dict[Text, Text]]: | ||
"""Generate a message header for publishing events to a `PikaEventBroker`. | ||
|
||
Returns: | ||
Message headers with a randomly generated uuid under the | ||
`RASA_EXPORT_PROCESS_ID_HEADER_NAME` key if `self.event_broker` is a | ||
`PikaEventBroker`, else `None`. | ||
|
||
""" | ||
if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)): | ||
return {RASA_EXPORT_PROCESS_ID_HEADER_NAME: uuid.uuid4().hex} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional: we could also just have this constant at the top of the this module since it's not used anywhere else There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, you're using this in Rasa X? Then it makes sense to have this in |
||
|
||
return None | ||
|
||
def _publish_with_message_headers( | ||
self, event: Dict[Text, Any], headers: Optional[Dict[Text, Text]] | ||
) -> None: | ||
"""Publish `event` to a message broker with `headers`. | ||
|
||
Args: | ||
event: Serialized event to be published. | ||
headers: Message headers to be published if `self.event_broker` is a | ||
`PikaEventBroker`. | ||
ricwo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
""" | ||
if isinstance(self.event_broker, (PikaEventBroker, PikaProducer)): | ||
self.event_broker.publish(event=event, headers=headers) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional: Should we make the headers an attribute so that we don't have different signatures? Also, I guess the headers wouldn't change between messages, right? |
||
else: | ||
self.event_broker.publish(event) | ||
|
||
def _get_conversation_ids_in_tracker(self) -> Set[Text]: | ||
"""Fetch conversation IDs in `self.tracker_store`. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: you could use
issubclass
and just check againstPikaEventBroker
which would be a tiny bit more robust