Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/check_pr_release_notes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ jobs:
with:
github-repository: ${{ github.repository }}
pr-number: ${{ github.event.number }}
title: '(\*\*[Rr]elease [Nn]otes\*\*|[Rr]elease [Nn]otes:)'
15 changes: 15 additions & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
59 changes: 59 additions & 0 deletions src/conf_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

# Module providing reusable configuration directory resolution.
# Resolution order:
# 1. CONF_DIR env var if it exists and points to a directory
# 2. <project_root>/conf (project_root = parent of this file's directory)
# 3. <this_module_dir>/conf (flattened deployment)
# 4. Fallback to <project_root>/conf even if missing (subsequent file operations will raise)


def resolve_conf_dir(env_var: str = "CONF_DIR"):
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
current_dir = os.path.dirname(__file__)

env_conf = os.environ.get(env_var)
invalid_env = None
conf_dir = None

if env_conf:
candidate = os.path.abspath(env_conf)
if os.path.isdir(candidate):
conf_dir = candidate
else:
invalid_env = candidate

if conf_dir is None:
parent_conf = os.path.join(project_root, "conf")
if os.path.isdir(parent_conf):
conf_dir = parent_conf

if conf_dir is None:
current_conf = os.path.join(current_dir, "conf")
if os.path.isdir(current_conf):
conf_dir = current_conf

if conf_dir is None:
conf_dir = os.path.join(project_root, "conf")

return conf_dir, invalid_env


CONF_DIR, INVALID_CONF_ENV = resolve_conf_dir()

__all__ = ["resolve_conf_dir", "CONF_DIR", "INVALID_CONF_ENV"]
131 changes: 77 additions & 54 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#
#
# Copyright 2024 ABSA Group Limited
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
import base64
import json
import logging
Expand All @@ -27,9 +27,14 @@
from jsonschema import validate
from jsonschema.exceptions import ValidationError

# Resolve project root (parent directory of this file's directory)
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
_CONF_DIR = os.path.join(_PROJECT_ROOT, 'conf')
try:
from .conf_path import CONF_DIR, INVALID_CONF_ENV
except ImportError: # fallback when executed outside package context
from conf_path import CONF_DIR, INVALID_CONF_ENV

# Use imported symbols for internal variables
_CONF_DIR = CONF_DIR
_INVALID_CONF_ENV = INVALID_CONF_ENV

sys.path.append(os.path.join(os.path.dirname(__file__)))

Expand All @@ -44,95 +49,112 @@
logger.setLevel(log_level)
logger.addHandler(logging.StreamHandler())
logger.debug("Initialized LOGGER")
logger.debug(f"Using CONF_DIR={_CONF_DIR}")
if _INVALID_CONF_ENV:
logger.warning(
f"CONF_DIR env var set to non-existent path: {_INVALID_CONF_ENV}; fell back to {_CONF_DIR}"
)

with open(os.path.join(_CONF_DIR, 'api.yaml'), 'r') as file:
with open(os.path.join(_CONF_DIR, "api.yaml"), "r") as file:
API = file.read()
logger.debug("Loaded API definition")

TOPICS = {}
with open(os.path.join(_CONF_DIR, 'topic_runs.json'), 'r') as file:
with open(os.path.join(_CONF_DIR, "topic_runs.json"), "r") as file:
TOPICS["public.cps.za.runs"] = json.load(file)
with open(os.path.join(_CONF_DIR, 'topic_dlchange.json'), 'r') as file:
with open(os.path.join(_CONF_DIR, "topic_dlchange.json"), "r") as file:
TOPICS["public.cps.za.dlchange"] = json.load(file)
with open(os.path.join(_CONF_DIR, 'topic_test.json'), 'r') as file:
with open(os.path.join(_CONF_DIR, "topic_test.json"), "r") as file:
TOPICS["public.cps.za.test"] = json.load(file)
logger.debug("Loaded TOPICS")

with open(os.path.join(_CONF_DIR, 'config.json'), 'r') as file:
with open(os.path.join(_CONF_DIR, "config.json"), "r") as file:
CONFIG = json.load(file)
logger.debug("Loaded main CONFIG")

aws_s3 = boto3.Session().resource('s3', verify=False)
aws_s3 = boto3.Session().resource("s3", verify=False)
logger.debug("Initialized AWS S3 Client")

if CONFIG["access_config"].startswith("s3://"):
name_parts = CONFIG["access_config"].split('/')
name_parts = CONFIG["access_config"].split("/")
bucket_name = name_parts[2]
bucket_object = "/".join(name_parts[3:])
ACCESS = json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object).get()["Body"].read().decode("utf-8"))
ACCESS = json.loads(
aws_s3.Bucket(bucket_name)
.Object(bucket_object)
.get()["Body"]
.read()
.decode("utf-8")
)
else:
with open(CONFIG["access_config"], "r") as file:
ACCESS = json.load(file)
logger.debug("Loaded ACCESS definitions")

TOKEN_PROVIDER_URL = CONFIG["token_provider_url"]
token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"]
TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded))
token_public_key_encoded = requests.get(
CONFIG["token_public_key_url"], verify=False
).json()["key"]
TOKEN_PUBLIC_KEY = serialization.load_der_public_key(
base64.b64decode(token_public_key_encoded)
)
logger.debug("Loaded TOKEN_PUBLIC_KEY")

writer_eventbridge.init(logger, CONFIG)
writer_kafka.init(logger, CONFIG)
writer_postgres.init(logger)


def _error_response(status, err_type, message):
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}]
})
"body": json.dumps(
{
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}],
}
),
}


def get_api():
return {
"statusCode": 200,
"body": API
}
return {"statusCode": 200, "body": API}


def get_token():
logger.debug("Handling GET Token")
return {
"statusCode": 303,
"headers": {"Location": TOKEN_PROVIDER_URL}
}

return {"statusCode": 303, "headers": {"Location": TOKEN_PROVIDER_URL}}


def get_topics():
logger.debug("Handling GET Topics")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps([topicName for topicName in TOPICS])
"body": json.dumps([topicName for topicName in TOPICS]),
}



def get_topic_schema(topicName):
logger.debug(f"Handling GET TopicSchema({topicName})")
if topicName not in TOPICS:
return _error_response(404, "topic", f"Topic '{topicName}' not found")

return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(TOPICS[topicName])
"body": json.dumps(TOPICS[topicName]),
}


def post_topic_message(topicName, topicMessage, tokenEncoded):
logger.debug(f"Handling POST {topicName}")
try:
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
except Exception:
return _error_response(401, "auth", "Invalid or missing token")
return _error_response(401, "auth", "Invalid or missing token")

if topicName not in TOPICS:
return _error_response(404, "topic", f"Topic '{topicName}' not found")
Expand All @@ -144,8 +166,8 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
try:
validate(instance=topicMessage, schema=TOPICS[topicName])
except ValidationError as e:
return _error_response(400, "validation", e.message)
return _error_response(400, "validation", e.message)

# Run all writers independently (avoid short-circuit so failures in one don't skip others)
kafka_ok, kafka_err = writer_kafka.write(topicName, topicMessage)
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topicName, topicMessage)
Expand All @@ -163,31 +185,28 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": False,
"statusCode": 500,
"errors": errors
})
"body": json.dumps({"success": False, "statusCode": 500, "errors": errors}),
}

return {
"statusCode": 202,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": True,
"statusCode": 202
})
"body": json.dumps({"success": True, "statusCode": 202}),
}


def extract_token(eventHeaders):
# Initial implementation used bearer header directly
if "bearer" in eventHeaders:
return eventHeaders["bearer"]

if "Authorization" in eventHeaders and eventHeaders["Authorization"].startswith("Bearer "):
return eventHeaders["Authorization"][len("Bearer "):]

return "" # Will result in 401

if "Authorization" in eventHeaders and eventHeaders["Authorization"].startswith(
"Bearer "
):
return eventHeaders["Authorization"][len("Bearer ") :]

return "" # Will result in 401


def lambda_handler(event, context):
try:
Expand All @@ -201,7 +220,11 @@ def lambda_handler(event, context):
if event["httpMethod"] == "GET":
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
if event["httpMethod"] == "POST":
return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), extract_token(event["headers"]))
return post_topic_message(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
extract_token(event["headers"]),
)
if event["resource"].lower() == "/terminate":
sys.exit("TERMINATING")
return _error_response(404, "route", "Resource not found")
Expand Down
18 changes: 10 additions & 8 deletions src/writer_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import boto3


def init(logger, CONFIG):
global _logger
global EVENT_BUS_ARN
global aws_eventbridge

_logger = logger
aws_eventbridge = boto3.client('events')

aws_eventbridge = boto3.client("events")
EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else ""
_logger.debug("Initialized EVENTBRIDGE writer")


def write(topicName, message):
if not EVENT_BUS_ARN:
_logger.debug("No EventBus Arn - skipping")
Expand All @@ -24,9 +26,9 @@ def write(topicName, message):
Entries=[
{
"Source": topicName,
'DetailType': 'JSON',
'Detail': json.dumps(message),
'EventBusName': EVENT_BUS_ARN,
"DetailType": "JSON",
"Detail": json.dumps(message),
"EventBusName": EVENT_BUS_ARN,
}
]
)
Expand All @@ -35,8 +37,8 @@ def write(topicName, message):
_logger.error(msg)
return False, msg
except Exception as e:
err_msg = f'The EventBridge writer failed with unknown error: {str(e)}'
err_msg = f"The EventBridge writer failed with unknown error: {str(e)}"
_logger.error(err_msg)
return False, err_msg

return True, None
Loading