Skip to content

Commit

Permalink
Merge pull request #7018 from RasaHQ/security-protocols-fix
Browse files Browse the repository at this point in the history
Create correct KafkaProducer for PLAINTEXT and SASL_SSL security protocols.
  • Loading branch information
rasabot committed Oct 16, 2020
2 parents 4823d4a + 1c26d58 commit f7719c7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog/7018.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Create correct `KafkaProducer` for `PLAINTEXT` and `SASL_SSL` security protocols.
24 changes: 11 additions & 13 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def __init__(
url: Union[Text, List[Text], None],
topic: Text = "rasa_core_events",
client_id: Optional[Text] = None,
group_id: Optional[Text] = None,
sasl_username: Optional[Text] = None,
sasl_password: Optional[Text] = None,
ssl_cafile: Optional[Text] = None,
Expand Down Expand Up @@ -63,7 +62,6 @@ def __init__(
self.url = url
self.topic = topic
self.client_id = client_id
self.group_id = group_id
self.security_protocol = security_protocol.upper()
self.sasl_username = sasl_username
self.sasl_password = sasl_password
Expand Down Expand Up @@ -92,16 +90,16 @@ def _create_producer(self) -> None:
import kafka

if self.security_protocol == "PLAINTEXT":
self.producer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
group_id=self.group_id,
security_protocol="PLAINTEXT",
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
security_protocol=self.security_protocol,
ssl_check_hostname=False,
)
elif self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
Expand All @@ -111,6 +109,7 @@ def _create_producer(self) -> None:
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
Expand All @@ -120,19 +119,18 @@ def _create_producer(self) -> None:
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SASL_SSL":
self.producer = kafka.KafkaConsumer(
self.topic,
bootstrap_servers=self.url,
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
group_id=self.group_id,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=self.ssl_check_hostname,
security_protocol=self.security_protocol,
sasl_mechanism="PLAIN",
)
else:
raise ValueError(
Expand Down

0 comments on commit f7719c7

Please sign in to comment.