Skip to content

Commit

Permalink
Merge 873d344 into 807d9b0
Browse files Browse the repository at this point in the history
  • Loading branch information
ricwo committed Mar 17, 2020
2 parents 807d9b0 + 873d344 commit 6884610
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 86 deletions.
21 changes: 21 additions & 0 deletions changelog/5258.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Added ``PikaEventBroker`` (:ref:`event-brokers-pika`) support for publishing to
multiple queues. Messages are now published to a ``fanout`` exchange with name
``rasa-exchange`` (see
`here <https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-fanout>`_
for more information on ``fanout`` exchanges).

The former ``queue`` key is deprecated. Queues should now be
specified as a list in the ``endpoints.yml`` event broker config under a new key
``queues``. Example config:

.. code-block:: yaml
event_broker:
type: pika
url: localhost
username: username
password: password
queues:
- queue-1
- queue-2
- queue-3
8 changes: 6 additions & 2 deletions data/test_endpoints/event_brokers/pika_endpoint.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
event_broker:
type: pika
url: localhost
username: username
password: password
queue: queue
type: pika
queues:
- queue-1
# you may supply more than one queue to publish to
# - queue-2
# - queue-3
19 changes: 10 additions & 9 deletions docs/api/event-brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ Rasa enables three possible broker types:
Pika Event Broker
-----------------

The example implementation we're going to show you here uses `Pika <https://pika.readthedocs.io>`_ ,
the Python client library for `RabbitMQ <https://www.rabbitmq.com>`_.
The example implementation we're going to show you here uses
`Pika <https://pika.readthedocs.io>`_ , the Python client library for
`RabbitMQ <https://www.rabbitmq.com>`_.

Adding a Pika Event Broker Using the Endpoint Configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -72,7 +73,7 @@ Here is how you add it using Python code:
pika_broker = PikaEventBroker('localhost',
'username',
'password',
queue='rasa_core_events')
queues=['rasa_events'])
tracker_store = InMemoryTrackerStore(db=db, event_broker=pika_broker)
Expand Down Expand Up @@ -109,7 +110,7 @@ example:
# start consumption of channel
channel = connection.channel()
channel.basic_consume(_callback,
queue='rasa_core_events',
queue='rasa_events',
no_ack=True)
channel.start_consuming()
Expand Down Expand Up @@ -163,7 +164,7 @@ The code below shows an example on how to instantiate a Kafka producer in you sc
from rasa.core.tracker_store import InMemoryTrackerStore
kafka_broker = KafkaEventBroker(host='localhost:9092',
topic='rasa_core_events')
topic='rasa_events')
tracker_store = InMemoryTrackerStore(event_broker=kafka_broker)
Expand All @@ -182,7 +183,7 @@ list of strings. e.g.:
kafka_broker = KafkaEventBroker(host=['kafka_broker_1:9092',
'kafka_broker_2:2030',
'kafka_broker_3:9092'],
topic='rasa_core_events')
topic='rasa_events')
Authentication and authorization
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -203,7 +204,7 @@ previously configured in the broker server.
sasl_plain_username='kafka_username',
sasl_plain_password='kafka_password',
security_protocol='SASL_PLAINTEXT',
topic='rasa_core_events')
topic='rasa_events')
If the clients or the brokers in the kafka cluster are located in different
Expand All @@ -220,7 +221,7 @@ be provided as arguments, as well as the CA's root certificate.
ssl_keyfile='key.pem',
ssl_check_hostname=True,
security_protocol='SSL',
topic='rasa_core_events')
topic='rasa_events')
If the ``ssl_check_hostname`` parameter is enabled, the clients will verify
if the broker's hostname matches the certificate. It's used on client's connections
Expand All @@ -238,7 +239,7 @@ according to the security protocol being used. The following implementation show
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('rasa_core_events',
consumer = KafkaConsumer('rasa_events',
bootstrap_servers=['localhost:29093'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
security_protocol='SSL',
Expand Down
20 changes: 14 additions & 6 deletions rasa/cli/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

if typing.TYPE_CHECKING:
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.tracker_store import TrackerStore
from rasa.core.exporter import Exporter
from rasa.core.utils import AvailableEndpoints
Expand All @@ -26,16 +26,24 @@
def add_subparser(
subparsers: argparse._SubParsersAction, parents: List[argparse.ArgumentParser]
) -> None:
shell_parser = subparsers.add_parser(
"""Add subparser for `rasa export`.
Args:
subparsers: Subparsers action object to which `argparse.ArgumentParser`
objects can be added.
parents: `argparse.ArgumentParser` objects whose arguments should also be
included.
"""
export_parser = subparsers.add_parser(
"export",
parents=parents,
conflict_handler="resolve",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
help="Export conversations using an event broker.",
)
shell_parser.set_defaults(func=export_trackers)
export_parser.set_defaults(func=export_trackers)

arguments.set_export_arguments(shell_parser)
arguments.set_export_arguments(export_parser)


def _get_tracker_store(endpoints: "AvailableEndpoints") -> "TrackerStore":
Expand Down Expand Up @@ -145,9 +153,9 @@ def _prepare_event_broker(event_broker: "EventBroker") -> None:
In addition, wait until the event broker reports a `ready` state.
"""
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.brokers.pika import PikaEventBroker

if isinstance(event_broker, (PikaEventBroker, PikaProducer)):
if isinstance(event_broker, PikaEventBroker):
event_broker.should_keep_unpublished_messages = False
event_broker.raise_on_failure = True

Expand Down
1 change: 1 addition & 0 deletions rasa/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DOCS_URL_ACTIONS = DOCS_BASE_URL + "/core/actions/"
DOCS_URL_CONNECTORS = DOCS_BASE_URL + "/user-guide/connectors/"
DOCS_URL_EVENT_BROKERS = DOCS_BASE_URL + "/api/event-brokers/"
DOCS_URL_PIKA_EVENT_BROKER = DOCS_URL_EVENT_BROKERS + "#pika-event-broker"
DOCS_URL_TRACKER_STORES = DOCS_BASE_URL + "/api/tracker-stores/"
DOCS_URL_PIPELINE = DOCS_BASE_URL + "/nlu/choosing-a-pipeline/"
DOCS_URL_COMPONENTS = DOCS_BASE_URL + "/nlu/components/"
Expand Down
4 changes: 0 additions & 4 deletions rasa/core/brokers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def create(
obj: Union["EventBroker", EndpointConfig, None],
) -> Optional["EventBroker"]:
"""Factory to create an event broker."""

if isinstance(obj, EventBroker):
return obj

Expand All @@ -29,7 +28,6 @@ def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventBroker":

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")

def is_ready(self) -> bool:
Expand All @@ -42,7 +40,6 @@ def is_ready(self) -> bool:

def close(self) -> None:
"""Close the connection to an event broker."""

# default implementation does nothing
pass

Expand Down Expand Up @@ -81,7 +78,6 @@ def _create_from_endpoint_config(

def _load_from_module_string(broker_config: EndpointConfig,) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its class name."""

try:
event_broker_class = common.class_from_module_path(broker_config.type)
return event_broker_class.from_endpoint_config(broker_config)
Expand Down
Loading

0 comments on commit 6884610

Please sign in to comment.