Skip to content

Commit

Permalink
Update KafkaEventBroker to support SASL_SSL and PLAINTEXT proto…
Browse files Browse the repository at this point in the history
…cols.
  • Loading branch information
alwx committed Oct 7, 2020
1 parent 5be14cf commit 9f0a6b4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
105 changes: 89 additions & 16 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Optional
from typing import Any, Text, List, Optional, Union, Dict

from rasa.core.brokers.broker import EventBroker
from rasa.shared.utils.io import DEFAULT_ENCODING
Expand All @@ -11,22 +11,66 @@
class KafkaEventBroker(EventBroker):
def __init__(
self,
host,
sasl_username=None,
sasl_password=None,
ssl_cafile=None,
ssl_certfile=None,
ssl_keyfile=None,
ssl_check_hostname=False,
topic="rasa_core_events",
security_protocol="SASL_PLAINTEXT",
loglevel=logging.ERROR,
url: Union[Text, List[Text], None],
topic: Text = "rasa_core_events",
client_id: Optional[Text] = None,
group_id: Optional[Text] = None,
sasl_username: Union[Text, int, None] = None,
sasl_password: Optional[Text] = None,
ssl_cafile: Optional[Text] = None,
ssl_certfile: Optional[Text] = None,
ssl_keyfile: Optional[Text] = None,
ssl_check_hostname: bool = False,
security_protocol: Text = "SASL_PLAINTEXT",
loglevel: Union[int, Text] = logging.ERROR,
**kwargs: Any,
) -> None:
"""Kafka event broker.
Args:
url: 'url[:port]' string (or list of 'url[:port]'
strings) that the consumer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. The default port is 9092. If no servers are
specified, it will default to `localhost:9092`.
topic: Topics to subscribe to. If not set, call subscribe() or assign()
before consuming records
client_id: A name for this client. This string is passed in each request
to servers and can be used to identify specific server-side log entries
that correspond to this client. Also submitted to `GroupCoordinator` for
logging with respect to consumer group administration.
Default: ‘kafka-python-{version}’
group_id: The name of the consumer group to join for dynamic partition
assignment (if enabled), and to use for fetching and committing offsets.
If None, auto-partition assignment (via group coordinator) and offset
commits are disabled. Default: None
sasl_username: Username for sasl PLAIN authentication.
Required if `sasl_mechanism` is `PLAIN`.
sasl_password: Password for sasl PLAIN authentication.
Required if `sasl_mechanism` is PLAIN.
ssl_cafile: Optional filename of ca file to use in certificate
verification. Default: None.
ssl_certfile: Optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. Default: None.
ssl_keyfile: Optional filename containing the client private key.
Default: None.
ssl_check_hostname: Flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
Default: False.
security_protocol: Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
loglevel: Logging level of the kafka logger.
"""
self.producer = None
self.host = host
self.url = url
self.topic = topic
self.security_protocol = security_protocol
self.client_id = client_id
self.group_id = group_id
self.security_protocol = security_protocol.upper()
self.sasl_username = sasl_username
self.sasl_password = sasl_password
self.ssl_cafile = ssl_cafile
Expand All @@ -51,9 +95,18 @@ def publish(self, event) -> None:
def _create_producer(self) -> None:
import kafka

if self.security_protocol == "SASL_PLAINTEXT":
if self.security_protocol == "PLAINTEXT":
self.consumer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
client_id=self.client_id,
group_id=self.group_id,
security_protocol="PLAINTEXT",
ssl_check_hostname=False,
)
elif self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
Expand All @@ -62,14 +115,34 @@ def _create_producer(self) -> None:
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SASL_SSL":
self.consumer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
client_id=self.client_id,
group_id=self.group_id,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=self.ssl_check_hostname,
)
else:
raise ValueError(
f"Cannot initialise `KafkaEventBroker` "
f"with security protocol '{self.security_protocol}'."
)

def _publish(self, event) -> None:
self.producer.send(self.topic, event)
Expand Down
6 changes: 3 additions & 3 deletions tests/core/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ def test_kafka_broker_from_config():

expected = KafkaEventBroker(
"localhost",
"username",
"password",
sasl_username="username",
sasl_password="password",
topic="topic",
security_protocol="SASL_PLAINTEXT",
)

assert actual.host == expected.host
assert actual.url == expected.url
assert actual.sasl_username == expected.sasl_username
assert actual.sasl_password == expected.sasl_password
assert actual.topic == expected.topic
Expand Down

0 comments on commit 9f0a6b4

Please sign in to comment.