Skip to content

Commit

Permalink
Merge 64a1024 into c1e2473
Browse files Browse the repository at this point in the history
  • Loading branch information
ricwo committed Mar 9, 2020
2 parents c1e2473 + 64a1024 commit c81e041
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 72 deletions.
15 changes: 15 additions & 0 deletions changelog/5258.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Added ``PikaEventBroker`` support for publishing to multiple queues. Multiple queues
can be specified as a list in the ``endpoints.yml`` event broker config under a new key
``queues``. Example config:

.. code-block:: yaml
event_broker:
type: pika
url: localhost
username: username
password: password
queues:
- queue-1
- queue-2
- queue-3
8 changes: 6 additions & 2 deletions data/test_endpoints/event_brokers/pika_endpoint.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
event_broker:
type: pika
url: localhost
username: username
password: password
queue: queue
type: pika
queue: queue # to publish to a single queue
# alternatively, you can publish to multiple queues with:
# queues:
# - queue-1
# - queue-2
5 changes: 3 additions & 2 deletions docs/api/event-brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ Rasa enables three possible broker types:
Pika Event Broker
-----------------

The example implementation we're going to show you here uses `Pika <https://pika.readthedocs.io>`_ ,
the Python client library for `RabbitMQ <https://www.rabbitmq.com>`_.
The example implementation we're going to show you here uses
`Pika <https://pika.readthedocs.io>`_ , the Python client library for
`RabbitMQ <https://www.rabbitmq.com>`_.

Adding a Pika Event Broker Using the Endpoint Configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
6 changes: 3 additions & 3 deletions rasa/cli/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

if typing.TYPE_CHECKING:
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.tracker_store import TrackerStore
from rasa.core.exporter import Exporter
from rasa.core.utils import AvailableEndpoints
Expand Down Expand Up @@ -145,9 +145,9 @@ def _prepare_event_broker(event_broker: "EventBroker") -> None:
In addition, wait until the event broker reports a `ready` state.
"""
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.brokers.pika import PikaEventBroker

if isinstance(event_broker, (PikaEventBroker, PikaProducer)):
if isinstance(event_broker, PikaEventBroker):
event_broker.should_keep_unpublished_messages = False
event_broker.raise_on_failure = True

Expand Down
1 change: 1 addition & 0 deletions rasa/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DOCS_URL_ACTIONS = DOCS_BASE_URL + "/core/actions/"
DOCS_URL_CONNECTORS = DOCS_BASE_URL + "/user-guide/connectors/"
DOCS_URL_EVENT_BROKERS = DOCS_BASE_URL + "/api/event-brokers/"
DOCS_URL_PIKA_EVENT_BROKER = DOCS_URL_EVENT_BROKERS + "#pika-event-broker"
DOCS_URL_TRACKER_STORES = DOCS_BASE_URL + "/api/tracker-stores/"
DOCS_URL_PIPELINE = DOCS_BASE_URL + "/nlu/choosing-a-pipeline/"
DOCS_URL_COMPONENTS = DOCS_BASE_URL + "/nlu/components/"
Expand Down
5 changes: 1 addition & 4 deletions rasa/core/brokers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def create(
obj: Union["EventBroker", EndpointConfig, None],
) -> Optional["EventBroker"]:
"""Factory to create an event broker."""

if isinstance(obj, EventBroker):
return obj

Expand All @@ -29,20 +28,19 @@ def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventBroker":

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")

def is_ready(self) -> bool:
"""Determine whether or not the event broker is ready.
Returns:
`True` by default, but this may be overridden by subclasses.
"""
return True

def close(self) -> None:
"""Close the connection to an event broker."""

# default implementation does nothing
pass

Expand Down Expand Up @@ -81,7 +79,6 @@ def _create_from_endpoint_config(

def _load_from_module_string(broker_config: EndpointConfig,) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its class name."""

try:
event_broker_class = common.class_from_module_path(broker_config.type)
return event_broker_class.from_endpoint_config(broker_config)
Expand Down
Loading

0 comments on commit c81e041

Please sign in to comment.