Skip to content

Commit

Permalink
blacked
Browse files Browse the repository at this point in the history
  • Loading branch information
tmbo committed Jun 13, 2019
1 parent 236d362 commit 566207e
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions rasa/core/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,74 @@ def _publish(self, event):

def _close(self):
self.producer.close()


class SQLProducer(EventChannel):
"""Log events to an SQL database.
The consumer is free to delete events it has consumed."""

def __init__(
self,
dialect: Text = "sqlite",
host: Optional[Text] = None,
port: Optional[int] = None,
db: Text = "rasa.db",
username: Text = None,
password: Text = None,
event_broker: Optional[EventChannel] = None,
login_db: Optional[Text] = None,
):

self.host = host
self.topic = topic
self.security_protocol = security_protocol
self.sasl_username = sasl_username
self.sasl_password = sasl_password
self.ssl_cafile = ssl_cafile
self.ssl_certfile = ssl_certfile
self.ssl_keyfile = ssl_keyfile
self.ssl_check_hostname = ssl_check_hostname

logging.getLogger("kafka").setLevel(loglevel)

@classmethod
def from_endpoint_config(cls, broker_config) -> Optional["KafkaProducer"]:
if broker_config is None:
return None

return cls(broker_config.url, **broker_config.kwargs)

def publish(self, event):
self._create_producer()
self._publish(event)
self._close()

def _create_producer(self):
import kafka

if self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism="PLAIN",
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
)

def _publish(self, event):
self.producer.send(self.topic, event)

def _close(self):
self.producer.close()

0 comments on commit 566207e

Please sign in to comment.