Skip to content

Commit

Permalink
Merge pull request #34 from BeeMargarida/ms/notify_kafka
Browse files Browse the repository at this point in the history
Event Notify using Kafka
  • Loading branch information
joamag committed Sep 16, 2020
2 parents a9ed70e + 100420f commit ae5a1ab
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions src/appier_extras/parts/admin/models/event.py
Expand Up @@ -38,6 +38,8 @@
""" The license for the module """

import json
import socket
import datetime

import appier

Expand Down Expand Up @@ -116,15 +118,25 @@ def format(cls, arguments, all = False):
def notify_g(cls, name, handlers = None, arguments = {}):
logger = appier.get_logger()
logger.debug("Triggering '%s' event ..." % name)
kwargs = dict(name = name)

names = ["*"]
names_a = name.split(".")[:-1]
for name_i in range(len(names_a)):
name_join = ".".join([n for n in names_a[0:name_i + 1]])
names.append("%s.*" % name_join)

kwargs = dict()
kwargs["name"] = { "$in": names }
if handlers: kwargs["handler"] = {"$in" : handlers}
events = cls.find_e(**kwargs)
for event in events: event.notify(arguments = arguments)
for event in events:
event.name = name
event.notify(arguments = arguments)

@appier.operation(name = "Notify")
def notify(self, arguments = {}, delay = True):
cls = self.__class__
delay_s = ("a delayed" if delay else "an immediate")
delay_s = "a delayed" if delay else "an immediate"
logger = appier.get_logger()
logger.debug(
"Notifying handler '%s' for '%s' in %s fashion ..." %\
Expand Down Expand Up @@ -217,6 +229,32 @@ def notify_nexmo(self, arguments = {}):
api = nexmo.API()
return cls._retry(lambda: api.send_sms(sender, receiver, text), count = retries)

@classmethod
def notify_kafka(cls, arguments = {}):
producer = cls._get_kafka()

name = arguments["event"]
params = arguments["params"]
payload = params.get("payload", dict())

if "topic" in arguments: topic = arguments["topic"]
else: topic = name.split(".")[0]

data = json.dumps(
dict(
name = name,
origin = appier.get_name(),
hostname = socket.gethostname(),
datatype = "json",
timestamp = datetime.datetime.now(),
payload = payload
)
)
data_b = appier.legacy.bytes(data, encoding = "utf-8", force = True)

producer.send(topic, data_b)
producer.flush()

@appier.operation(name = "Duplicate", factory = True)
def duplicate_s(self):
cls = self.__class__
Expand All @@ -227,3 +265,36 @@ def duplicate_s(self):
)
event.save()
return event

@classmethod
def _get_kafka(cls):
if hasattr(cls, "_kafka"): return cls._kafka

appier.ensure_pip("kafka", package = "kafka-python")
import kafka

kafka_server = appier.conf("KAFKA_SERVER", "localhost:9092")
client_id = appier.conf("KAFKA_PRODUCER_CLIENT_ID", None)
compression_type = appier.conf("KAFKA_COMPRESSION_TYPE", None)
retries = appier.conf("KAFKA_RETRIES", 0)
batch_size = appier.conf("KAFKA_BATCH_SIZE", 16384)
linger_ms = appier.conf("KAFKA_LINGER", 0)
connections_max_idle_ms = appier.conf("KAFKA_CONNECTIONS_MAX_IDLE", 540000)
max_request_size = appier.conf("KAFKA_MAX_REQUEST_SIZE", 1048576)
retry_backoff_ms = appier.conf("KAFKA_RETRY_BACKOFF", 100)
request_timeout_ms = appier.conf("KAFKA_REQUEST_TIMEOUT", 30000)

cls._kafka = kafka.KafkaProducer(
bootstrap_servers = kafka_server,
client_id = client_id,
compression_type = compression_type,
retries = retries,
batch_size = batch_size,
linger_ms = linger_ms,
connections_max_idle_ms = connections_max_idle_ms,
max_request_size = max_request_size,
retry_backoff_ms = retry_backoff_ms,
request_timeout_ms = request_timeout_ms
)

return cls._kafka

0 comments on commit ae5a1ab

Please sign in to comment.