Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"topics_config": "s3://<redacted>/topics.json",
"token_provider_url": "https://<redacted>",
"token_public_key_url": "https://<redacted>",
"kafka_bootstrap_server": "localhost:9092"
"kafka_bootstrap_server": "localhost:9092",
"event_bus_arn": "arn:aws:events:<redacted>"
}
72 changes: 54 additions & 18 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

aws_session = boto3.Session()
aws_s3 = aws_session.resource('s3', verify=False)
aws_eventbridge = boto3.client('events')

if CONFIG["topics_config"].startswith("s3://"):
name_parts = CONFIG["topics_config"].split('/')
Expand All @@ -61,6 +62,12 @@
ACCESS = json.load(file)

TOKEN_PROVIDER_URL = CONFIG["token_provider_url"]

if "event_bus_arn" in CONFIG:
EVENT_BUS_ARN = CONFIG["event_bus_arn"]
else:
EVENT_BUS_ARN = ""

logger.info("Loaded configs")

token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"]
Expand All @@ -85,7 +92,7 @@
kafka_producer = Producer(producer_config)
logger.info("Initialized kafka producer")

def kafkaWrite(topicName, message):
def kafka_write(topicName, message):
logger.info(f"Sending to kafka {topicName}")
error = []
kafka_producer.produce(topicName,
Expand All @@ -94,34 +101,49 @@ def kafkaWrite(topicName, message):
callback = lambda err, msg: error.append(err) if err is not None else None)
kafka_producer.flush()
if error:
logger.error(error)
return 500
else:
logger.info("OK")
return 202

def getApi():
raise Exception(error)

def event_bridge_write(topicName, message):
if not EVENT_BUS_ARN:
logger.info("No EventBus Arn - skipping")
return

logger.info(f"Sending to eventBridge {topicName}")
response = aws_eventbridge.put_events(
Entries=[
{
"Source": topicName,
'DetailType': 'JSON',
'Detail': json.dumps(message),
'EventBusName': EVENT_BUS_ARN,
}
]
)
if response["FailedEntryCount"] > 0:
raise Exception(response)

def get_api():
return {
"statusCode": 200,
"body": API
}

def getToken():
def get_token():
logger.info("Handling GET Token")
return {
"statusCode": 303,
"headers": {"Location": TOKEN_PROVIDER_URL}
}

def getTopics():
def get_topics():
logger.info("Handling GET Topics")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps([topicName for topicName in TOPICS])
}

def getTopicSchema(topicName):
def get_topic_schema(topicName):
logger.info(f"Handling GET TopicSchema({topicName})")
if topicName not in TOPICS:
return { "statusCode": 404 }
Expand All @@ -132,7 +154,7 @@ def getTopicSchema(topicName):
"body": json.dumps(TOPICS[topicName])
}

def postTopicMessage(topicName, topicMessage, tokenEncoded):
def post_topic_message(topicName, topicMessage, tokenEncoded):
logger.info(f"Handling POST {topicName}")
try:
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
Expand All @@ -159,21 +181,35 @@ def postTopicMessage(topicName, topicMessage, tokenEncoded):
"body": e.message
}

return {"statusCode": kafkaWrite(topicName, topicMessage)}
wasError = False
try:
kafka_write(topicName, topicMessage)
except Exception as e:
logger.error(str(e))
wasError = True
try:
event_bridge_write(topicName, topicMessage)
except Exception as e:
logger.error(str(e))
wasError = True
if wasError:
return {"statusCode": 500}
else:
return {"statusCode": 202}

def lambda_handler(event, context):
try:
if event["resource"].lower() == "/api":
return getApi()
return get_api()
if event["resource"].lower() == "/token":
return getToken()
return get_token()
if event["resource"].lower() == "/topics":
return getTopics()
return get_topics()
if event["resource"].lower() == "/topics/{topic_name}":
if event["httpMethod"] == "GET":
return getTopicSchema(event["pathParameters"]["topic_name"].lower())
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
if event["httpMethod"] == "POST":
return postTopicMessage(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"])
return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), event["headers"]["bearer"])
if event["resource"].lower() == "/terminate":
sys.exit("TERMINATING")
return {"statusCode": 404}
Expand Down