Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,8 @@
from jsonschema import validate
from jsonschema.exceptions import ValidationError

# Import writer modules with explicit ImportError fallback
try:
from . import writer_eventbridge
from . import writer_kafka
from . import writer_postgres
except ImportError: # fallback when executed outside package context
import writer_eventbridge # type: ignore[no-redef]
import writer_kafka # type: ignore[no-redef]
import writer_postgres # type: ignore[no-redef]

# Import configuration directory symbols with explicit ImportError fallback
try:
from .conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef]
except ImportError: # fallback when executed outside package context
from conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef]
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV

# Internal aliases used by rest of module
_CONF_DIR = CONF_DIR
Expand Down
15 changes: 15 additions & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
10 changes: 8 additions & 2 deletions src/conf_path.py → src/utils/conf_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Module providing reusable configuration directory resolution.
Resolution order:
1. CONF_DIR env var if it exists and points to a directory
2. <project_root>/conf (project_root = parent of this file's directory)
2. <project_root>/conf
3. <this_module_dir>/conf (flattened deployment)
4. Fallback to <project_root>/conf even if missing (subsequent file operations will raise)
"""
Expand All @@ -34,9 +34,12 @@ def resolve_conf_dir(env_var: str = "CONF_DIR"):
Tuple (conf_dir, invalid_env) where conf_dir is the chosen directory path and
invalid_env is the rejected env var path if provided but invalid, else None.
"""
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# Simplified project root: two levels up from this file (../../)
parent_utils_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
project_root = os.path.abspath(os.path.join(parent_utils_dir, ".."))
current_dir = os.path.dirname(__file__)

# Scenario 1: Environment variable if it exists and points to a directory
env_conf = os.environ.get(env_var)
invalid_env = None
conf_dir = None
Expand All @@ -48,16 +51,19 @@ def resolve_conf_dir(env_var: str = "CONF_DIR"):
else:
invalid_env = candidate

# Scenario 2: Use <project_root>/conf if present and not already satisfied by env var
if conf_dir is None:
parent_conf = os.path.join(project_root, "conf")
if os.path.isdir(parent_conf):
conf_dir = parent_conf

# Scenario 3: Use <this_module_dir>/conf for flattened deployments.
if conf_dir is None:
current_conf = os.path.join(current_dir, "conf")
if os.path.isdir(current_conf):
conf_dir = current_conf

# Scenario 4: Final fallback to <project_root>/conf even if it does not exist.
if conf_dir is None:
conf_dir = os.path.join(project_root, "conf")

Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions src/trace_logging.py → src/utils/trace_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import logging
from typing import Any, Dict

from .logging_levels import TRACE_LEVEL
from .safe_serialization import safe_serialize_for_log
from src.utils.logging_levels import TRACE_LEVEL
from src.utils.safe_serialization import safe_serialize_for_log


def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None:
Expand Down
15 changes: 15 additions & 0 deletions src/writers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
198 changes: 99 additions & 99 deletions src/writer_eventbridge.py → src/writers/writer_eventbridge.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,99 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""EventBridge writer module.
Provides initialization and write functionality for publishing events to AWS EventBridge.
"""
import json
import logging
from typing import Any, Dict, Optional, Tuple, List
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from .trace_logging import log_payload_at_trace
STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}
def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
"""Initialize the EventBridge writer.
Args:
logger: Shared application logger.
config: Configuration dictionary (expects optional 'event_bus_arn').
"""
STATE["logger"] = logger
STATE["client"] = boto3.client("events")
STATE["event_bus_arn"] = config.get("event_bus_arn", "")
STATE["logger"].debug("Initialized EVENTBRIDGE writer")
def _format_failed_entries(entries: List[Dict[str, Any]]) -> str:
failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e]
# Keep message concise but informative
return json.dumps(failed) if failed else "[]"
def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Publish a message to EventBridge.
Args:
topic_name: Source topic name used as event Source.
message: JSON-serializable payload.
Returns:
Tuple of success flag and optional error message.
"""
logger = STATE["logger"]
event_bus_arn = STATE["event_bus_arn"]
client = STATE["client"]
if not event_bus_arn:
logger.debug("No EventBus Arn - skipping")
return True, None
if client is None: # defensive
logger.debug("EventBridge client not initialized - skipping")
return True, None
log_payload_at_trace(logger, "EventBridge", topic_name, message)
try:
logger.debug("Sending to eventBridge %s", topic_name)
response = client.put_events(
Entries=[
{
"Source": topic_name,
"DetailType": "JSON",
"Detail": json.dumps(message),
"EventBusName": event_bus_arn,
}
]
)
failed_count = response.get("FailedEntryCount", 0)
if failed_count > 0:
entries = response.get("Entries", [])
failed_repr = _format_failed_entries(entries)
msg = f"{failed_count} EventBridge entries failed: {failed_repr}"
logger.error(msg)
return False, msg
except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors
logger.exception("EventBridge put_events call failed")
return False, str(err)
# Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400)
return True, None
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""EventBridge writer module.

Provides initialization and write functionality for publishing events to AWS EventBridge.
"""

import json
import logging
from typing import Any, Dict, Optional, Tuple, List

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from src.utils.trace_logging import log_payload_at_trace

STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}


def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
"""Initialize the EventBridge writer.

Args:
logger: Shared application logger.
config: Configuration dictionary (expects optional 'event_bus_arn').
"""
STATE["logger"] = logger
STATE["client"] = boto3.client("events")
STATE["event_bus_arn"] = config.get("event_bus_arn", "")
STATE["logger"].debug("Initialized EVENTBRIDGE writer")


def _format_failed_entries(entries: List[Dict[str, Any]]) -> str:
failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e]
# Keep message concise but informative
return json.dumps(failed) if failed else "[]"


def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Publish a message to EventBridge.

Args:
topic_name: Source topic name used as event Source.
message: JSON-serializable payload.
Returns:
Tuple of success flag and optional error message.
"""
logger = STATE["logger"]
event_bus_arn = STATE["event_bus_arn"]
client = STATE["client"]

if not event_bus_arn:
logger.debug("No EventBus Arn - skipping")
return True, None
if client is None: # defensive
logger.debug("EventBridge client not initialized - skipping")
return True, None

log_payload_at_trace(logger, "EventBridge", topic_name, message)

try:
logger.debug("Sending to eventBridge %s", topic_name)
response = client.put_events(
Entries=[
{
"Source": topic_name,
"DetailType": "JSON",
"Detail": json.dumps(message),
"EventBusName": event_bus_arn,
}
]
)
failed_count = response.get("FailedEntryCount", 0)
if failed_count > 0:
entries = response.get("Entries", [])
failed_repr = _format_failed_entries(entries)
msg = f"{failed_count} EventBridge entries failed: {failed_repr}"
logger.error(msg)
return False, msg
except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors
logger.exception("EventBridge put_events call failed")
return False, str(err)

# Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400)
return True, None
Loading