Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new SQL event broker and use it for local Rasa X #4097

Merged
merged 10 commits into from Jul 30, 2019
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -80,4 +80,5 @@ examples/moodbot/models*
docs/core/key
docs/core/key.pub
failed_stories.md
errors.json
errors.json
events.db
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions CHANGELOG.rst
Expand Up @@ -13,13 +13,15 @@ Added
-----
- ``TrainingFileImporter`` interface to support customizing the process of loading
training data
- Fill slots for custom templates
- fill slots for custom templates
- new event broker class: ``SQLProducer``. This event broker is now used when running locally with
Rasa X
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

Changed
-------
- ``Agent.update_model()`` and ``Agent.handle_message()`` now work without needing to set a domain
or a policy ensemble
- Update pytype to ``2019.7.11``
- update pytype to ``2019.7.11``

Removed
-------
Expand Down
4 changes: 4 additions & 0 deletions data/test_endpoints/event_brokers/sql_endpoint.yml
@@ -0,0 +1,4 @@
event_broker:
db: ":memory:"
dialect: "sqlite"
type: "sql"
56 changes: 54 additions & 2 deletions docs/api/event-brokers.rst
Expand Up @@ -27,7 +27,11 @@ tracker looks like this:
The ``event`` field takes the event's ``type_name`` (for more on event
types, check out the :ref:`events` docs).

Rasa enables two possible brokers producers: Pika Event Broker and Kafka Event Broker.
Rasa enables three possible broker types:

- `Pika Event Broker`_
- `Kafka Event Broker`_
- `SQL Event Broker`_

Pika Event Broker
-----------------
Expand Down Expand Up @@ -108,7 +112,7 @@ example:
Kafka Event Broker
------------------

It is possible to use `Kafka <https://kafka.apache.org/>`_ as main broker to you events. In this example
It is possible to use `Kafka <https://kafka.apache.org/>`_ as main broker for your events. In this example
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
we are going to use the `python-kafka <https://kafka-python.readthedocs.io/en/master/usage.html>`_
library, a Kafka client written in Python.

Expand Down Expand Up @@ -231,3 +235,51 @@ according to the security protocol being used. The following implementation show

for message in consumer:
print(message.value)

SQL Event Broker
-----------------
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

It is possible to use an SQL database as an event broker. Connections to databases are established using
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
`SQLAlchemy <https://www.sqlalchemy.org/>`_, a Python library which can interact with many
different types of SQL databases, such as `SQLite <https://sqlite.org/index.html>`_,
`PostgreSQL <https://www.postgresql.org/>`_ and more. The default Rasa installation allows connections to SQLite
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
and PostgreSQL databases, to see other options, please see the
`SQLAlchemy documentation on SQL dialects <https://docs.sqlalchemy.org/en/13/dialects/index.html>`_.


Adding a SQL Event Broker Using the Endpoint Configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can use an endpoint configuration file to instruct Rasa to stream
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
all events to your SQL event broker. To do so, add a ``event_broker`` section to your
endpoint configuration, e.g. ``endpoints.yml``. For example, a valid SQLite configuration
could look like the following:

.. code-block:: yaml

event_broker:
type: SQL
dialect: sqlite
db: events.db

PostgreSQL databases can be used as well:

.. code-block:: yaml
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

event_broker:
type: SQL
host: 127.0.0.1
port: 5432
dialect: postgresql
username: myuser
password: mypassword
db: mydatabse

Then, instruct Rasa to use the endpoint configuration and SQL producer by adding
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
``--endpoints <path to your endpoint configuration`` as following example:

.. code-block:: shell

rasa run -m models --endpoints endpoints.yml

Rasa will then create a table called ``events``, where all events will be added.
34 changes: 17 additions & 17 deletions rasa/cli/x.py
Expand Up @@ -28,7 +28,7 @@

logger = logging.getLogger(__name__)

DEFAULT_TRACKER_DB = "tracker.db"
DEFAULT_EVENTS_DB = "events.db"

if typing.TYPE_CHECKING:
from rasa.core.utils import AvailableEndpoints
Expand Down Expand Up @@ -115,32 +115,32 @@ def _overwrite_endpoints_for_local_x(
wait_time_between_pulls=2,
)

overwrite_existing_tracker_store = False
if endpoints.tracker_store and not _is_correct_tracker_store(
endpoints.tracker_store
):
overwrite_existing_event_broker = False
if endpoints.event_broker and not _is_correct_event_broker(endpoints.event_broker):
print_error(
"Rasa X currently only supports a SQLite tracker store with path '{}' "
"Rasa X currently only supports a SQLite event broker with path '{}' "
"when running locally. You can deploy Rasa X with Docker "
"(https://rasa.com/docs/rasa-x/deploy/) if you want to use "
"other tracker store configurations.".format(DEFAULT_TRACKER_DB)
"other event broker configurations.".format(DEFAULT_EVENTS_DB)
)
overwrite_existing_tracker_store = questionary.confirm(
"Do you want to continue with the default SQLite tracker store?"
overwrite_existing_event_broker = questionary.confirm(
"Do you want to continue with the default SQLite event broker?"
).ask()

if not overwrite_existing_tracker_store:
if not overwrite_existing_event_broker:
exit(0)

if not endpoints.tracker_store or overwrite_existing_tracker_store:
endpoints.tracker_store = EndpointConfig(type="sql", db=DEFAULT_TRACKER_DB)
if not endpoints.tracker_store or overwrite_existing_event_broker:
endpoints.event_broker = EndpointConfig(type="sql", db=DEFAULT_EVENTS_DB)


def _is_correct_tracker_store(tracker_endpoint: EndpointConfig) -> bool:
return (
tracker_endpoint.type == "sql"
and tracker_endpoint.kwargs.get("dialect", "").lower() == "sqlite"
and tracker_endpoint.kwargs.get("db") == DEFAULT_TRACKER_DB
def _is_correct_event_broker(event_broker: EndpointConfig) -> bool:
return all(
[
event_broker.type == "sql",
event_broker.kwargs.get("dialect", "").lower() == "sqlite",
event_broker.kwargs.get("db") == DEFAULT_EVENTS_DB,
]
)


Expand Down
51 changes: 51 additions & 0 deletions rasa/core/broker.py
Expand Up @@ -17,6 +17,8 @@ def from_endpoint_config(
return None
elif broker_config.type == "pika" or broker_config.type is None:
return PikaProducer.from_endpoint_config(broker_config)
elif broker_config.type.lower() == "sql":
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
return SQLProducer.from_endpoint_config(broker_config)
elif broker_config.type == "file":
return FileProducer.from_endpoint_config(broker_config)
elif broker_config.type == "kafka":
Expand Down Expand Up @@ -223,3 +225,52 @@ def _publish(self, event):

def _close(self):
self.producer.close()


class SQLProducer(EventChannel):
from sqlalchemy.ext.declarative import declarative_base
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

Base = declarative_base()

class SQLEvent(Base):
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
from sqlalchemy import Column, Integer, String, Float, Text
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

__tablename__ = "events"
id = Column(Integer, primary_key=True)
sender_id = Column(String(255))
data = Column(Text)

def __init__(
self,
dialect: Text = "sqlite",
host: Optional[Text] = None,
port: Optional[int] = None,
db: Text = "events.db",
username: Text = None,
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
password: Text = None,
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
login_db: Optional[Text] = None,
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
):
from rasa.core.tracker_store import SQLTrackerStore
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

engine_url = SQLTrackerStore.get_db_url(
dialect, host, port, db, username, password, login_db
)

logger.debug("SQLProducer: Connecting to database: {}".format(engine_url))
federicotdn marked this conversation as resolved.
Show resolved Hide resolved

self.engine = create_engine(engine_url)
self.Base.metadata.create_all(self.engine)
self.session = sessionmaker(bind=self.engine)()

@classmethod
def from_endpoint_config(cls, broker_config: EndpointConfig) -> "EventChannel":
return cls(host=broker_config.url, **broker_config.kwargs)

def publish(self, event: Dict[Text, Any]) -> None:
"""Publishes a json-formatted Rasa Core event into an event queue."""
self.session.add(
self.SQLEvent(sender_id=event.get("sender_id"), data=json.dumps(event))
)
self.session.commit()
4 changes: 2 additions & 2 deletions rasa/core/tracker_store.py
Expand Up @@ -312,7 +312,7 @@ def __init__(
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine_url = self._get_db_url(
engine_url = self.get_db_url(
dialect, host, port, db, username, password, login_db
)
logger.debug(
Expand Down Expand Up @@ -355,7 +355,7 @@ def __init__(
super(SQLTrackerStore, self).__init__(domain, event_broker)

@staticmethod
def _get_db_url(
def get_db_url(
federicotdn marked this conversation as resolved.
Show resolved Hide resolved
dialect: Text = "sqlite",
host: Optional[Text] = None,
port: Optional[int] = None,
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/test_rasa_x.py
Expand Up @@ -55,9 +55,9 @@ def test_prepare_credentials_if_already_valid(tmpdir_factory):


def test_if_endpoint_config_is_valid_in_local_mode():
config = EndpointConfig(type="sql", dialect="sqlite", db=x.DEFAULT_TRACKER_DB)
config = EndpointConfig(type="sql", dialect="sqlite", db=x.DEFAULT_EVENTS_DB)

assert x._is_correct_tracker_store(config)
assert x._is_correct_event_broker(config)


@pytest.mark.parametrize(
Expand All @@ -71,4 +71,4 @@ def test_if_endpoint_config_is_valid_in_local_mode():
def test_if_endpoint_config_is_invalid_in_local_mode(kwargs):
config = EndpointConfig(**kwargs)

assert not x._is_correct_tracker_store(config)
assert not x._is_correct_event_broker(config)
29 changes: 28 additions & 1 deletion tests/core/test_broker.py
@@ -1,7 +1,7 @@
import json

from rasa.core import broker
from rasa.core.broker import FileProducer, PikaProducer, KafkaProducer
from rasa.core.broker import FileProducer, PikaProducer, KafkaProducer, SQLProducer
from rasa.core.events import Event, Restarted, SlotSet, UserUttered
from rasa.utils.endpoints import EndpointConfig, read_endpoint_config
from tests.core.conftest import DEFAULT_ENDPOINTS_FILE
Expand Down Expand Up @@ -33,6 +33,33 @@ def test_no_broker_in_config():
assert actual is None


def test_sql_broker_from_config():
cfg = read_endpoint_config(
"data/test_endpoints/event_brokers/sql_endpoint.yml", "event_broker"
)
actual = broker.from_endpoint_config(cfg)

assert isinstance(actual, SQLProducer)
assert actual.engine.name == "sqlite"


def test_sql_broker_logs_to_sql_db():
cfg = read_endpoint_config(
"data/test_endpoints/event_brokers/sql_endpoint.yml", "event_broker"
)
actual = broker.from_endpoint_config(cfg)

for e in TEST_EVENTS:
actual.publish(e.as_dict())

events_types = [
json.loads(event.data)["event"]
for event in actual.session.query(actual.SQLEvent).all()
]

assert events_types == ["user", "slot", "restart"]


def test_file_broker_from_config():
cfg = read_endpoint_config(
"data/test_endpoints/event_brokers/file_endpoint.yml", "event_broker"
Expand Down
6 changes: 3 additions & 3 deletions tests/core/test_tracker_stores.py
Expand Up @@ -134,7 +134,7 @@ def test_tracker_store_from_invalid_string(default_domain):
],
)
def test_get_db_url_with_fully_specified_url(full_url):
assert SQLTrackerStore._get_db_url(host=full_url) == full_url
assert SQLTrackerStore.get_db_url(host=full_url) == full_url


def test_get_db_url_with_port_in_host():
Expand All @@ -145,7 +145,7 @@ def test_get_db_url_with_port_in_host():
expected = "{}://{}/{}".format(dialect, host, db)

assert (
str(SQLTrackerStore._get_db_url(dialect="postgresql", host=host, db=db))
str(SQLTrackerStore.get_db_url(dialect="postgresql", host=host, db=db))
== expected
)

Expand All @@ -155,7 +155,7 @@ def test_get_db_url_with_correct_host():

assert (
str(
SQLTrackerStore._get_db_url(
SQLTrackerStore.get_db_url(
dialect="postgresql", host="localhost", port=5005, db="mydb"
)
)
Expand Down