Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update KafkaEventBroker to support SASL_SSL and PLAINTEXT protocols #6943

Merged
merged 12 commits into from
Oct 8, 2020
1 change: 1 addition & 0 deletions changelog/6943.improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update `KafkaEventBroker` to support `SASL_SSL` and `PLAINTEXT` protocols.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
event_broker:
security_protocol: SOMETHING
type: kafka
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
event_broker:
url: localhost
sasl_username: username
sasl_password: password
topic: topic
security_protocol: SASL_PLAINTEXT
client_id: kafka-python-rasa
security_protocol: PLAINTEXT
type: kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
event_broker:
client_id: kafka-python-rasa
security_protocol: PLAINTEXT
type: kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
event_broker:
url: localhost
sasl_username: username
sasl_password: password
topic: topic
security_protocol: SASL_PLAINTEXT
type: kafka
7 changes: 7 additions & 0 deletions data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
event_broker:
url: localhost
sasl_username: username
sasl_password: password
topic: topic
security_protocol: SASL_SSL
type: kafka
4 changes: 0 additions & 4 deletions data/test_endpoints/event_brokers/kafka_ssl_endpoint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,4 @@ event_broker:
url: localhost
topic: topic
security_protocol: SSL
ssl_cafile: CARoot.pem
ssl_certfile: certificate.pem
ssl_keyfile: key.pem
ssl_check_hostname: True
type: kafka
101 changes: 85 additions & 16 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Optional
from typing import Any, Text, List, Optional, Union, Dict

from rasa.core.brokers.broker import EventBroker
from rasa.shared.utils.io import DEFAULT_ENCODING
Expand All @@ -11,29 +11,69 @@
class KafkaEventBroker(EventBroker):
def __init__(
self,
host,
sasl_username=None,
sasl_password=None,
ssl_cafile=None,
ssl_certfile=None,
ssl_keyfile=None,
ssl_check_hostname=False,
topic="rasa_core_events",
security_protocol="SASL_PLAINTEXT",
loglevel=logging.ERROR,
url: Union[Text, List[Text], None],
alwx marked this conversation as resolved.
Show resolved Hide resolved
topic: Text = "rasa_core_events",
client_id: Optional[Text] = None,
group_id: Optional[Text] = None,
sasl_username: Optional[Text] = None,
sasl_password: Optional[Text] = None,
ssl_cafile: Optional[Text] = None,
ssl_certfile: Optional[Text] = None,
ssl_keyfile: Optional[Text] = None,
ssl_check_hostname: bool = False,
security_protocol: Text = "SASL_PLAINTEXT",
loglevel: Union[int, Text] = logging.ERROR,
**kwargs: Any,
) -> None:
"""Kafka event broker.

Args:
url: 'url[:port]' string (or list of 'url[:port]'
strings) that the producer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request.
topic: Topics to subscribe to.
client_id: A name for this client. This string is passed in each request
to servers and can be used to identify specific server-side log entries
that correspond to this client. Also submitted to `GroupCoordinator` for
logging with respect to producer group administration.
group_id: The name of the producer group to join for dynamic partition
assignment (if enabled), and to use for fetching and committing offsets.
If None, auto-partition assignment (via group coordinator) and offset
commits are disabled.
sasl_username: Username for plain authentication.
sasl_password: Password for plain authentication.
ssl_cafile: Optional filename of ca file to use in certificate
verification.
ssl_certfile: Optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity.
ssl_keyfile: Optional filename containing the client private key.
ssl_check_hostname: Flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
security_protocol: Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
loglevel: Logging level of the kafka logger.

"""
import kafka

self.producer = None
self.host = host
self.url = url
self.topic = topic
self.security_protocol = security_protocol
self.client_id = client_id
self.group_id = group_id
self.security_protocol = security_protocol.upper()
self.sasl_username = sasl_username
self.sasl_password = sasl_password
self.ssl_cafile = ssl_cafile
self.ssl_certfile = ssl_certfile
self.ssl_keyfile = ssl_keyfile
self.ssl_check_hostname = ssl_check_hostname

self.producer: Optional[kafka.KafkaConsumer] = None

logging.getLogger("kafka").setLevel(loglevel)

@classmethod
Expand All @@ -51,9 +91,18 @@ def publish(self, event) -> None:
def _create_producer(self) -> None:
import kafka

if self.security_protocol == "SASL_PLAINTEXT":
if self.security_protocol == "PLAINTEXT":
self.producer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
client_id=self.client_id,
group_id=self.group_id,
security_protocol="PLAINTEXT",
ssl_check_hostname=False,
)
elif self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
Expand All @@ -62,14 +111,34 @@ def _create_producer(self) -> None:
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SASL_SSL":
self.producer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
client_id=self.client_id,
group_id=self.group_id,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=self.ssl_check_hostname,
)
else:
raise ValueError(
f"Cannot initialise `KafkaEventBroker`: "
f"Invalid `security_protocol` ('{self.security_protocol}')."
)

def _publish(self, event) -> None:
self.producer.send(self.topic, event)
Expand Down
47 changes: 37 additions & 10 deletions tests/core/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
import logging
from pathlib import Path
import textwrap

from typing import Union, Text, List, Optional, Type

import kafka
import pytest
from _pytest.logging import LogCaptureFixture

from pathlib import Path
from typing import Union, Text, List, Optional, Type, Any
from _pytest.logging import LogCaptureFixture
from _pytest.monkeypatch import MonkeyPatch
from tests.core.conftest import DEFAULT_ENDPOINTS_FILE

import rasa.shared.utils.io
import rasa.utils.io
Expand All @@ -19,7 +19,6 @@
from rasa.core.brokers.sql import SQLEventBroker
from rasa.shared.core.events import Event, Restarted, SlotSet, UserUttered
from rasa.utils.endpoints import EndpointConfig, read_endpoint_config
from tests.core.conftest import DEFAULT_ENDPOINTS_FILE

TEST_EVENTS = [
UserUttered("/greet", {"name": "greet", "confidence": 1.0}, []),
Expand Down Expand Up @@ -194,25 +193,53 @@ def test_load_non_existent_custom_broker_name():


def test_kafka_broker_from_config():
endpoints_path = "data/test_endpoints/event_brokers/kafka_plaintext_endpoint.yml"
endpoints_path = (
"data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml"
)
cfg = read_endpoint_config(endpoints_path, "event_broker")

actual = KafkaEventBroker.from_endpoint_config(cfg)

expected = KafkaEventBroker(
"localhost",
"username",
"password",
sasl_username="username",
sasl_password="password",
topic="topic",
security_protocol="SASL_PLAINTEXT",
)

assert actual.host == expected.host
assert actual.url == expected.url
assert actual.sasl_username == expected.sasl_username
assert actual.sasl_password == expected.sasl_password
assert actual.topic == expected.topic


@pytest.mark.parametrize(
"file,exception",
[
# `_create_producer()` raises `kafka.errors.NoBrokersAvailable` exception
# which means that the configuration seems correct but a connection to
# the broker cannot be established
("kafka_sasl_plaintext_endpoint.yml", kafka.errors.NoBrokersAvailable),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: parametrize directly over the YAML strings instead of the filenames, and get rid of the files. I think it's a bit cleaner having the content in the test, especially for small files, but really this is up to you

("kafka_plaintext_endpoint.yml", kafka.errors.NoBrokersAvailable),
("kafka_sasl_ssl_endpoint.yml", kafka.errors.NoBrokersAvailable),
("kafka_ssl_endpoint.yml", kafka.errors.NoBrokersAvailable),
# `ValueError` exception is raised when the `security_protocol` is incorrect
("kafka_invalid_security_protocol.yml", ValueError),
# `TypeError` exception is raised when there is no `url` specified
("kafka_plaintext_endpoint_no_url.yml", TypeError),
],
)
def test_kafka_broker_security_protocols(file: Text, exception: Exception):
endpoints_path = f"data/test_endpoints/event_brokers/{file}"
cfg = read_endpoint_config(endpoints_path, "event_broker")

actual = KafkaEventBroker.from_endpoint_config(cfg)
with pytest.raises(exception):
# noinspection PyProtectedMember
actual._create_producer()


def test_no_pika_logs_if_no_debug_mode(caplog: LogCaptureFixture):
from rasa.core.brokers import pika

Expand Down