Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified external class instantiation #4801

Merged
merged 13 commits into from
Dec 11, 2019
2 changes: 2 additions & 0 deletions changelog/4801.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Allow creation of natural language interpreter and generator by classname reference
in ``endpoints.yml``.
48 changes: 24 additions & 24 deletions docs/api/event-brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ Here is how you add it using Python code:

.. code-block:: python

from rasa.core.event_brokers.pika_producer import PikaProducer
from rasa.core.brokers.pika import PikaEventBroker
from rasa_platform.core.tracker_store import InMemoryTrackerStore

pika_broker = PikaProducer('localhost',
'username',
'password',
queue='rasa_core_events')
pika_broker = PikaEventBroker('localhost',
'username',
'password',
queue='rasa_core_events')

tracker_store = InMemoryTrackerStore(db=db, event_broker=pika_broker)

Expand Down Expand Up @@ -157,11 +157,11 @@ The code below shows an example on how to instantiate a Kafka producer in you sc

.. code-block:: python

from rasa.core.event_brokers.kafka_producer import KafkaProducer
from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.tracker_store import InMemoryTrackerStore

kafka_broker = KafkaProducer(host='localhost:9092',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='localhost:9092',
topic='rasa_core_events')

tracker_store = InMemoryTrackerStore(event_broker=kafka_broker)

Expand All @@ -177,10 +177,10 @@ list of strings. e.g.:

.. code-block:: python

kafka_broker = KafkaProducer(host=['kafka_broker_1:9092',
'kafka_broker_2:2030',
'kafka_broker_3:9092'],
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host=['kafka_broker_1:9092',
'kafka_broker_2:2030',
'kafka_broker_3:9092'],
topic='rasa_core_events')

Authentication and authorization
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -197,11 +197,11 @@ previously configured in the broker server.

.. code-block:: python

kafka_broker = KafkaProducer(host='kafka_broker:9092',
sasl_plain_username='kafka_username',
sasl_plain_password='kafka_password',
security_protocol='SASL_PLAINTEXT',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='kafka_broker:9092',
sasl_plain_username='kafka_username',
sasl_plain_password='kafka_password',
security_protocol='SASL_PLAINTEXT',
topic='rasa_core_events')


If the clients or the brokers in the kafka cluster are located in different
Expand All @@ -212,13 +212,13 @@ be provided as arguments, as well as the CA's root certificate.

.. code-block:: python

kafka_broker = KafkaProducer(host='kafka_broker:9092',
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem',
ssl_check_hostname=True,
security_protocol='SSL',
topic='rasa_core_events')
kafka_broker = KafkaEventBroker(host='kafka_broker:9092',
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem',
ssl_check_hostname=True,
security_protocol='SSL',
topic='rasa_core_events')

If the ``ssl_check_hostname`` parameter is enabled, the clients will verify
if the broker's hostname matches the certificate. It's used on client's connections
Expand Down
80 changes: 80 additions & 0 deletions rasa/core/brokers/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
import warnings
from typing import Any, Dict, Text, Optional, Union

from rasa.utils import common
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class EventBroker:
"""Base class for any event broker implementation."""

@staticmethod
def create(
obj: Union["EventBroker", EndpointConfig, None],
) -> Optional["EventBroker"]:
"""Factory to create an event broker."""

if isinstance(obj, EventBroker):
return obj
else:
return _create_from_endpoint_config(obj)

@classmethod
def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventBroker":
raise NotImplementedError(
"Event broker must implement the `from_endpoint_config` method."
)

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")


def _create_from_endpoint_config(
endpoint_config: Optional[EndpointConfig],
) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its configuration."""

if endpoint_config is None:
broker = None
elif endpoint_config.type is None or endpoint_config.type.lower() == "pika":
from rasa.core.brokers.pika import PikaEventBroker

# default broker if no type is set
broker = PikaEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "sql":
from rasa.core.brokers.sql import SQLEventBroker

broker = SQLEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "file":
from rasa.core.brokers.file import FileEventBroker

broker = FileEventBroker.from_endpoint_config(endpoint_config)
elif endpoint_config.type.lower() == "kafka":
from rasa.core.brokers.kafka import KafkaEventBroker

broker = KafkaEventBroker.from_endpoint_config(endpoint_config)
else:
broker = _load_from_module_string(endpoint_config)

if broker:
logger.debug(f"Instantiated event broker to '{broker.__class__.__name__}'.")
return broker


def _load_from_module_string(broker_config: EndpointConfig,) -> Optional["EventBroker"]:
"""Instantiate an event broker based on its class name."""

try:
event_broker_class = common.class_from_module_path(broker_config.type)
return event_broker_class.from_endpoint_config(broker_config)
except (AttributeError, ImportError) as e:
logger.warning(
f"The `EventBroker` type '{broker_config.type}' could not be found. "
f"Not using any event broker. Error: {e}"
)
return None
27 changes: 11 additions & 16 deletions rasa/core/brokers/event_channel.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import logging
from typing import Any, Dict, Text, Optional
import warnings

from rasa.utils.endpoints import EndpointConfig
from rasa.core.brokers.broker import EventBroker

logger = logging.getLogger(__name__)


class EventChannel:
@classmethod
def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventChannel":
raise NotImplementedError(
"Event broker must implement the `from_endpoint_config` method."
)

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""

raise NotImplementedError("Event broker must implement the `publish` method.")
# noinspection PyAbstractClass
class EventChannel(EventBroker):
warnings.warn(
"The `EventChannel` class is deprecated, please inherit from "
"`EventBroker` instead. `EventChannel` will be removed "
"in future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)
56 changes: 56 additions & 0 deletions rasa/core/brokers/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import logging
import typing
import warnings
from typing import Optional, Text, Dict

from rasa.core.brokers.broker import EventBroker

if typing.TYPE_CHECKING:
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class FileEventBroker(EventBroker):
"""Log events to a file in json format.

There will be one event per line and each event is stored as json."""

DEFAULT_LOG_FILE_NAME = "rasa_event.log"

def __init__(self, path: Optional[Text] = None) -> None:
self.path = path or self.DEFAULT_LOG_FILE_NAME
self.event_logger = self._event_logger()

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
) -> Optional["FileEventBroker"]:
if broker_config is None:
return None

# noinspection PyArgumentList
return cls(**broker_config.kwargs)

def _event_logger(self) -> logging.Logger:
"""Instantiate the file logger."""

logger_file = self.path
# noinspection PyTypeChecker
query_logger = logging.getLogger("event-logger")
query_logger.setLevel(logging.INFO)
handler = logging.FileHandler(logger_file)
handler.setFormatter(logging.Formatter("%(message)s"))
query_logger.propagate = False
query_logger.addHandler(handler)

logger.info(f"Logging events to '{logger_file}'.")

return query_logger

def publish(self, event: Dict) -> None:
"""Write event to file."""

self.event_logger.info(json.dumps(event))
self.event_logger.handlers[0].flush()
62 changes: 10 additions & 52 deletions rasa/core/brokers/file_producer.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,13 @@
import json
import logging
import typing
from typing import Optional, Text, Dict
import warnings

from rasa.core.brokers.event_channel import EventChannel
from rasa.core.brokers.file import FileEventBroker

if typing.TYPE_CHECKING:
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)


class FileProducer(EventChannel):
"""Log events to a file in json format.

There will be one event per line and each event is stored as json."""

DEFAULT_LOG_FILE_NAME = "rasa_event.log"

def __init__(self, path: Optional[Text] = None) -> None:
self.path = path or self.DEFAULT_LOG_FILE_NAME
self.event_logger = self._event_logger()

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
) -> Optional["FileProducer"]:
if broker_config is None:
return None

# noinspection PyArgumentList
return cls(**broker_config.kwargs)

def _event_logger(self) -> logging.Logger:
"""Instantiate the file logger."""

logger_file = self.path
# noinspection PyTypeChecker
query_logger = logging.getLogger("event-logger")
query_logger.setLevel(logging.INFO)
handler = logging.FileHandler(logger_file)
handler.setFormatter(logging.Formatter("%(message)s"))
query_logger.propagate = False
query_logger.addHandler(handler)

logger.info(f"Logging events to '{logger_file}'.")

return query_logger

def publish(self, event: Dict) -> None:
"""Write event to file."""

self.event_logger.info(json.dumps(event))
self.event_logger.handlers[0].flush()
class FileProducer(FileEventBroker):
warnings.warn(
"The `FileProducer` class is deprecated, please inherit from "
"`FileEventBroker` instead. `FileProducer` will be removed in "
"future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)
43 changes: 40 additions & 3 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import logging
import warnings
from typing import Optional

from rasa.core.brokers.event_channel import EventChannel
from rasa.core.brokers.broker import EventBroker
from rasa.utils.io import DEFAULT_ENCODING

logger = logging.getLogger(__name__)


class KafkaProducer(EventChannel):
class KafkaEventBroker(EventBroker):
def __init__(
self,
host,
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
logging.getLogger("kafka").setLevel(loglevel)

@classmethod
def from_endpoint_config(cls, broker_config) -> Optional["KafkaProducer"]:
def from_endpoint_config(cls, broker_config) -> Optional["KafkaEventBroker"]:
if broker_config is None:
return None

Expand Down Expand Up @@ -76,3 +77,39 @@ def _publish(self, event) -> None:

def _close(self) -> None:
self.producer.close()


class KafkaProducer(KafkaEventBroker):
def __init__(
self,
host,
sasl_username=None,
sasl_password=None,
ssl_cafile=None,
ssl_certfile=None,
ssl_keyfile=None,
ssl_check_hostname=False,
topic="rasa_core_events",
security_protocol="SASL_PLAINTEXT",
loglevel=logging.ERROR,
) -> None:
warnings.warn(
"The `KafkaProducer` class is deprecated, please inherit "
"from `KafkaEventBroker` instead. `KafkaProducer` will be "
"removed in future Rasa versions.",
DeprecationWarning,
stacklevel=2,
)

super(KafkaProducer, self).__init__(
host,
sasl_username,
sasl_password,
ssl_cafile,
ssl_certfile,
ssl_keyfile,
ssl_check_hostname,
topic,
security_protocol,
loglevel,
)
Loading