Skip to content

Commit

Permalink
Change to event name iteration before query
Browse files Browse the repository at this point in the history
  • Loading branch information
Margarida Silva committed Sep 16, 2020
1 parent ee04f15 commit 100420f
Showing 1 changed file with 56 additions and 39 deletions.
95 changes: 56 additions & 39 deletions src/appier_extras/parts/admin/models/event.py
Expand Up @@ -37,10 +37,9 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import datetime
import fnmatch
import json
import socket
import datetime

import appier

Expand Down Expand Up @@ -119,21 +118,25 @@ def format(cls, arguments, all = False):
def notify_g(cls, name, handlers = None, arguments = {}):
logger = appier.get_logger()
logger.debug("Triggering '%s' event ..." % name)

names = ["*"]
names_a = name.split(".")[:-1]
for name_i in range(len(names_a)):
name_join = ".".join([n for n in names_a[0:name_i + 1]])
names.append("%s.*" % name_join)

kwargs = dict()
kwargs["name"] = { "$in": names }
if handlers: kwargs["handler"] = {"$in" : handlers}
events = cls.find_e(**kwargs)

# verifies if the event name matches the name provided,
# allowing for wildcards to be used in event handlers names
for event in events:
if fnmatch.fnmatch(name, event.name):
event.name = name
event.notify(arguments=arguments)
for event in events:
event.name = name
event.notify(arguments = arguments)

@appier.operation(name = "Notify")
def notify(self, arguments = {}, delay = True):
cls = self.__class__
delay_s = ("a delayed" if delay else "an immediate")
delay_s = "a delayed" if delay else "an immediate"
logger = appier.get_logger()
logger.debug(
"Notifying handler '%s' for '%s' in %s fashion ..." %\
Expand Down Expand Up @@ -228,21 +231,60 @@ def notify_nexmo(self, arguments = {}):

@classmethod
def notify_kafka(cls, arguments = {}):
producer = cls._get_kafka()

name = arguments["event"]
params = arguments["params"]
payload = params.get("payload", dict())

if "topic" in arguments: topic = arguments["topic"]
else: topic = name.split(".")[0]

data = json.dumps(
dict(
name = name,
origin = appier.get_name(),
hostname = socket.gethostname(),
datatype = "json",
timestamp = datetime.datetime.now(),
payload = payload
)
)
data_b = appier.legacy.bytes(data, encoding = "utf-8", force = True)

producer.send(topic, data_b)
producer.flush()

@appier.operation(name = "Duplicate", factory = True)
def duplicate_s(self):
cls = self.__class__
event = cls(
name = self.name,
handler = self.handler,
arguments = self.arguments
)
event.save()
return event

@classmethod
def _get_kafka(cls):
if hasattr(cls, "_kafka"): return cls._kafka

appier.ensure_pip("kafka", package = "kafka-python")
import kafka

kafka_server = appier.conf("KAFKA_SERVER", "localhost:9092")
client_id = appier.conf("KAFKA_PRODUCER_CLIENT_ID", None)
compression_type = appier.conf("KAFKA_COMPRESSION_TYPE", None)
retries = appier.conf("KAFKA_RETRIES", 0)
batch_size = appier.conf("KAFKA_BATCH_SIZE", 16384)
linger_ms = appier.conf("KAFKA_LINGER", 0)
connections_max_idle_ms = appier.conf(
"KAFKA_CONNECTIONS_MAX_IDLE", 540000)
connections_max_idle_ms = appier.conf("KAFKA_CONNECTIONS_MAX_IDLE", 540000)
max_request_size = appier.conf("KAFKA_MAX_REQUEST_SIZE", 1048576)
retry_backoff_ms = appier.conf("KAFKA_RETRY_BACKOFF", 100)
request_timeout_ms = appier.conf("KAFKA_REQUEST_TIMEOUT", 30000)

producer = kafka.KafkaProducer(
cls._kafka = kafka.KafkaProducer(
bootstrap_servers = kafka_server,
client_id = client_id,
compression_type = compression_type,
Expand All @@ -255,29 +297,4 @@ def notify_kafka(cls, arguments = {}):
request_timeout_ms = request_timeout_ms
)

topic = arguments["topic"] if arguments["topic"] else arguments["event"].split(".")[0]
name = arguments["event"]
origin = appier.get_name()
hostname = socket.gethostname()
payload = arguments["params"].get("payload", dict())

producer.send(topic, json.dumps(dict(
name = name,
origin = origin,
hostname = hostname,
datatype = "json",
timestamp = datetime.datetime.now(),
payload = json.dumps(payload)
)).encode("utf-8"))
producer.flush()

@appier.operation(name = "Duplicate", factory = True)
def duplicate_s(self):
cls = self.__class__
event = cls(
name = self.name,
handler = self.handler,
arguments = self.arguments
)
event.save()
return event
return cls._kafka

0 comments on commit 100420f

Please sign in to comment.