Skip to content

Commit

Permalink
Merge b534bbe into 322da75
Browse files Browse the repository at this point in the history
  • Loading branch information
tmbo committed Dec 11, 2019
2 parents 322da75 + b534bbe commit 49e9da1
Show file tree
Hide file tree
Showing 25 changed files with 699 additions and 474 deletions.
2 changes: 2 additions & 0 deletions changelog/4801.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Allow creation of natural language interpreter and generator by classname reference
in ``endpoints.yml``.
48 changes: 24 additions & 24 deletions docs/api/event-brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ Here is how you add it using Python code:

.. code-block:: python
from rasa.core.event_brokers.pika_producer import PikaProducer
from rasa.core.brokers.pika import PikaEventBroker
from rasa_platform.core.tracker_store import InMemoryTrackerStore
pika_broker = PikaProducer('localhost',
'username',
'password',
queue='rasa_core_events')
pika_broker = PikaEventBroker('localhost',
'username',
'password',
queue='rasa_core_events')
tracker_store = InMemoryTrackerStore(db=db, event_broker=pika_broker)
Expand Down Expand Up @@ -157,11 +157,11 @@ The code below shows an example on how to instantiate a Kafka producer in you sc

.. code-block:: python
from rasa.core.event_brokers.kafka_producer import KafkaProducer
from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.tracker_store import InMemoryTrackerStore
kafka_broker = KafkaProducer(host='localhost:9092',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='localhost:9092',
topic='rasa_core_events')
tracker_store = InMemoryTrackerStore(event_broker=kafka_broker)
Expand All @@ -177,10 +177,10 @@ list of strings. e.g.:

.. code-block:: python
kafka_broker = KafkaProducer(host=['kafka_broker_1:9092',
'kafka_broker_2:2030',
'kafka_broker_3:9092'],
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host=['kafka_broker_1:9092',
'kafka_broker_2:2030',
'kafka_broker_3:9092'],
topic='rasa_core_events')
Authentication and authorization
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -197,11 +197,11 @@ previously configured in the broker server.

.. code-block:: python
kafka_broker = KafkaProducer(host='kafka_broker:9092',
sasl_plain_username='kafka_username',
sasl_plain_password='kafka_password',
security_protocol='SASL_PLAINTEXT',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='kafka_broker:9092',
sasl_plain_username='kafka_username',
sasl_plain_password='kafka_password',
security_protocol='SASL_PLAINTEXT',
topic='rasa_core_events')
If the clients or the brokers in the kafka cluster are located in different
Expand All @@ -212,13 +212,13 @@ be provided as arguments, as well as the CA's root certificate.

.. code-block:: python
kafka_broker = KafkaProducer(host='kafka_broker:9092',
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem',
ssl_check_hostname=True,
security_protocol='SSL',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='kafka_broker:9092',
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem',
ssl_check_hostname=True,
security_protocol='SSL',
topic='rasa_core_events')
If the ``ssl_check_hostname`` parameter is enabled, the clients will verify
if the broker's hostname matches the certificate. It's used on client's connections
Expand Down
80 changes: 80 additions & 0 deletions rasa/core/brokers/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
import warnings
from typing import Any, Dict, Text, Optional, Union

from rasa.utils import common
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class EventBroker:
"""Base class for any event broker implementation."""

@staticmethod
def create(
obj: Union["EventBroker", EndpointConfig, None],
) -> Optional["EventBroker"]:
"""Factory to create an event broker."""

if isinstance(obj, EventBroker):
return obj
else:
return _create_from_endpoint_config(obj)

@classmethod
def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventBroker":
raise NotImplementedError(
"Event broker must implement the `from_endpoint_config` method."
)

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")


def _create_from_endpoint_config(
endpoint_config: Optional[EndpointConfig],
) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its configuration."""

if endpoint_config is None:
broker = None
elif endpoint_config.type is None or endpoint_config.type.lower() == "pika":
from rasa.core.brokers.pika import PikaEventBroker

# default broker if no type is set
broker = PikaEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "sql":
from rasa.core.brokers.sql import SQLEventBroker

broker = SQLEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "file":
from rasa.core.brokers.file import FileEventBroker

broker = FileEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "kafka":
from rasa.core.brokers.kafka import KafkaEventBroker

broker = KafkaEventBroker.from_endpoint_config(endpoint_config)
else:
broker = _load_from_module_string(endpoint_config)

if broker:
logger.debug(f"Instantiated event broker to '{broker.__class__.__name__}'.")
return broker


def _load_from_module_string(broker_config: EndpointConfig,) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its class name."""

try:
event_broker_class = common.class_from_module_path(broker_config.type)
return event_broker_class.from_endpoint_config(broker_config)
except (AttributeError, ImportError) as e:
logger.warning(
f"The `EventBroker` type '{broker_config.type}' could not be found. "
f"Not using any event broker. Error: {e}"
)
return None
27 changes: 11 additions & 16 deletions rasa/core/brokers/event_channel.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import logging
from typing import Any, Dict, Text, Optional
import warnings

from rasa.utils.endpoints import EndpointConfig
from rasa.core.brokers.broker import EventBroker

logger = logging.getLogger(__name__)


class EventChannel:
@classmethod
def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventChannel":
raise NotImplementedError(
"Event broker must implement the `from_endpoint_config` method."
)

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")
# noinspection PyAbstractClass
class EventChannel(EventBroker):
warnings.warn(
"The `EventChannel` class is deprecated, please inherit from "
"`EventBroker` instead. `EventChannel` will be removed "
"in future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)
56 changes: 56 additions & 0 deletions rasa/core/brokers/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import logging
import typing
import warnings
from typing import Optional, Text, Dict

from rasa.core.brokers.broker import EventBroker

if typing.TYPE_CHECKING:
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class FileEventBroker(EventBroker):
"""Log events to a file in json format.
There will be one event per line and each event is stored as json."""

DEFAULT_LOG_FILE_NAME = "rasa_event.log"

def __init__(self, path: Optional[Text] = None) -> None:
self.path = path or self.DEFAULT_LOG_FILE_NAME
self.event_logger = self._event_logger()

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
) -> Optional["FileEventBroker"]:
if broker_config is None:
return None

# noinspection PyArgumentList
return cls(**broker_config.kwargs)

def _event_logger(self) -> logging.Logger:
"""Instantiate the file logger."""

logger_file = self.path
# noinspection PyTypeChecker
query_logger = logging.getLogger("event-logger")
query_logger.setLevel(logging.INFO)
handler = logging.FileHandler(logger_file)
handler.setFormatter(logging.Formatter("%(message)s"))
query_logger.propagate = False
query_logger.addHandler(handler)

logger.info(f"Logging events to '{logger_file}'.")

return query_logger

def publish(self, event: Dict) -> None:
"""Write event to file."""

self.event_logger.info(json.dumps(event))
self.event_logger.handlers[0].flush()
62 changes: 10 additions & 52 deletions rasa/core/brokers/file_producer.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,13 @@
import json
import logging
import typing
from typing import Optional, Text, Dict
import warnings

from rasa.core.brokers.event_channel import EventChannel
from rasa.core.brokers.file import FileEventBroker

if typing.TYPE_CHECKING:
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class FileProducer(EventChannel):
"""Log events to a file in json format.
There will be one event per line and each event is stored as json."""

DEFAULT_LOG_FILE_NAME = "rasa_event.log"

def __init__(self, path: Optional[Text] = None) -> None:
self.path = path or self.DEFAULT_LOG_FILE_NAME
self.event_logger = self._event_logger()

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
) -> Optional["FileProducer"]:
if broker_config is None:
return None

# noinspection PyArgumentList
return cls(**broker_config.kwargs)

def _event_logger(self) -> logging.Logger:
"""Instantiate the file logger."""

logger_file = self.path
# noinspection PyTypeChecker
query_logger = logging.getLogger("event-logger")
query_logger.setLevel(logging.INFO)
handler = logging.FileHandler(logger_file)
handler.setFormatter(logging.Formatter("%(message)s"))
query_logger.propagate = False
query_logger.addHandler(handler)

logger.info(f"Logging events to '{logger_file}'.")

return query_logger

def publish(self, event: Dict) -> None:
"""Write event to file."""

self.event_logger.info(json.dumps(event))
self.event_logger.handlers[0].flush()
class FileProducer(FileEventBroker):
warnings.warn(
"The `FileProducer` class is deprecated, please inherit from "
"`FileEventBroker` instead. `FileProducer` will be removed in "
"future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)
43 changes: 40 additions & 3 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import logging
import warnings
from typing import Optional

from rasa.core.brokers.event_channel import EventChannel
from rasa.core.brokers.broker import EventBroker
from rasa.utils.io import DEFAULT_ENCODING

logger = logging.getLogger(__name__)


class KafkaProducer(EventChannel):
class KafkaEventBroker(EventBroker):
def __init__(
self,
host,
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
logging.getLogger("kafka").setLevel(loglevel)

@classmethod
def from_endpoint_config(cls, broker_config) -> Optional["KafkaProducer"]:
def from_endpoint_config(cls, broker_config) -> Optional["KafkaEventBroker"]:
if broker_config is None:
return None

Expand Down Expand Up @@ -76,3 +77,39 @@ def _publish(self, event) -> None:

def _close(self) -> None:
self.producer.close()


class KafkaProducer(KafkaEventBroker):
def __init__(
self,
host,
sasl_username=None,
sasl_password=None,
ssl_cafile=None,
ssl_certfile=None,
ssl_keyfile=None,
ssl_check_hostname=False,
topic="rasa_core_events",
security_protocol="SASL_PLAINTEXT",
loglevel=logging.ERROR,
) -> None:
warnings.warn(
"The `KafkaProducer` class is deprecated, please inherit "
"from `KafkaEventBroker` instead. `KafkaProducer` will be "
"removed in future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)

super(KafkaProducer, self).__init__(
host,
sasl_username,
sasl_password,
ssl_cafile,
ssl_certfile,
ssl_keyfile,
ssl_check_hostname,
topic,
security_protocol,
loglevel,
)
Loading

0 comments on commit 49e9da1

Please sign in to comment.