Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions backend/compact-connect/lambdas/python/common/cc_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ def rate_limiting_table_name(self):
def rate_limiting_table(self):
return boto3.resource('dynamodb').Table(self.rate_limiting_table_name)

@property
def event_state_table_name(self):
return os.environ['EVENT_STATE_TABLE_NAME']

@property
def event_state_table(self):
return boto3.resource('dynamodb').Table(self.event_state_table_name)

@cached_property
def event_state_client(self):
from cc_common.event_state_client import EventStateClient

return EventStateClient(self)

@cached_property
def allowed_origins(self):
return json.loads(os.environ['ALLOWED_ORIGINS'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def reconcile_unsettled_transactions(self, compact: str, settled_transactions: l
'Deleting matched unsettled transactions',
compact=compact,
count=len(matched_unsettled),
settled_transaction_ids=settled_transaction_ids
settled_transaction_ids=settled_transaction_ids,
)
with self.config.transaction_history_table.batch_writer() as batch:
for tx in matched_unsettled:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def __call__(
) -> dict[str, Any]: ...


class JurisdictionNotificationMethod(Protocol):
"""Protocol for Jurisdiction encumbrance notification methods."""

def __call__(
self, *, compact: str, jurisdiction: str, template_variables: EncumbranceNotificationTemplateVariables
) -> dict[str, Any]: ...


class EmailServiceClient:
"""
Client for sending email notifications through the email notification service lambda.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import time
from datetime import timedelta
from enum import StrEnum
from uuid import UUID

from cc_common.config import _Config, logger


class RecipientType(StrEnum):
"""Enum for notification recipient types."""

PROVIDER = 'provider'
STATE = 'state'


class NotificationStatus(StrEnum):
"""Enum for notification delivery status."""

SUCCESS = 'SUCCESS'
FAILED = 'FAILED'


class EventType(StrEnum):
"""Enum for encumbrance event types."""

LICENSE_ENCUMBRANCE = 'license.encumbrance'
LICENSE_ENCUMBRANCE_LIFTED = 'license.encumbranceLifted'
PRIVILEGE_ENCUMBRANCE = 'privilege.encumbrance'
PRIVILEGE_ENCUMBRANCE_LIFTED = 'privilege.encumbranceLifted'


class EventStateClient:
"""Client interface for event state table operations to track notification delivery state."""

def __init__(self, config: _Config):
self.config = config

def record_notification_attempt(
self,
*,
compact: str,
message_id: str,
recipient_type: RecipientType,
status: NotificationStatus,
provider_id: UUID,
event_type: EventType,
event_time: str,
jurisdiction: str | None = None,
error_message: str | None = None,
ttl_weeks: int = 4,
) -> None:
"""
Record a notification attempt to the event state table.

:param compact: The compact identifier
:param message_id: SQS message ID
:param recipient_type: RecipientType enum or string ('provider' or 'state')
:param status: NotificationStatus enum or string ('SUCCESS' or 'FAILED')
:param provider_id: Provider ID
:param event_type: EventType enum or string (e.g., 'license.encumbrance')
:param event_time: Event timestamp
:param jurisdiction: Jurisdiction code (for state notifications)
:param error_message: Error message if failed
:param ttl_weeks: TTL in weeks (default 4 weeks)
"""
# Build partition and sort keys
pk = f'COMPACT#{compact}#SQS_MESSAGE#{message_id}'

sk = f'NOTIFICATION#{recipient_type}#{jurisdiction or ""}'

# Calculate TTL
ttl = int(time.time()) + int(timedelta(weeks=ttl_weeks).total_seconds())

# Build item (ensure all values are DynamoDB-compatible types)
item = {
'pk': pk,
'sk': sk,
'status': status,
'providerId': str(provider_id), # Convert UUID to string for DynamoDB
'eventType': event_type,
'eventTime': str(event_time), # Ensure string format for DynamoDB
'ttl': ttl,
}

# Add optional fields
if jurisdiction:
item['jurisdiction'] = jurisdiction

if error_message:
item['errorMessage'] = error_message

# Write to table
self.config.event_state_table.put_item(Item=item)
logger.debug('Recorded notification attempt', pk=pk, sk=sk, status=status)

def _get_notification_attempts(self, *, compact: str, message_id: str) -> dict[str, dict]:
"""
Query all notification attempts for a message.

:param compact: The compact identifier
:param message_id: SQS message ID
:return: Dict mapping SK to item data
"""
pk = f'COMPACT#{compact}#SQS_MESSAGE#{message_id}'

response = self.config.event_state_table.query(
KeyConditionExpression='pk = :pk',
ExpressionAttributeValues={':pk': pk},
ConsistentRead=True,
)

return {item['sk']: item for item in response.get('Items', [])}


class NotificationTracker:
"""
Helper class to track which notifications have been sent for an SQS message.
Provides convenient methods to check status and determine what needs to be sent.
Encapsulates the EventStateClient to simplify handler interfaces.
"""

def __init__(self, *, compact: str, message_id: str):
from cc_common.config import config

self.compact = compact
self.message_id = message_id
self.event_state_client = config.event_state_client
self._attempts = self.event_state_client._get_notification_attempts( # noqa: SLF001 meant for use within the notification tracker
compact=compact, message_id=message_id
)

def should_send_provider_notification(self) -> bool:
"""
Check if provider notification needs to be sent.

:return: True if notification should be sent, False otherwise
"""
sk = f'NOTIFICATION#{RecipientType.PROVIDER}#'
return self._attempts.get(sk, {}).get('status') != 'SUCCESS'

def should_send_state_notification(self, jurisdiction: str) -> bool:
"""
Check if state notification needs to be sent.

:param jurisdiction: Jurisdiction code
:return: True if notification should be sent, False otherwise
"""
sk = f'NOTIFICATION#{RecipientType.STATE}#{jurisdiction}'
return self._attempts.get(sk, {}).get('status') != 'SUCCESS'

def record_success(
self,
*,
recipient_type: RecipientType,
provider_id: UUID,
event_type: EventType,
event_time: str,
jurisdiction: str | None = None,
) -> None:
"""
Record a successful notification.

:param recipient_type: RecipientType enum or string ('provider' or 'state')
:param provider_id: Provider ID
:param event_type: EventType enum or string
:param event_time: Event timestamp
:param jurisdiction: Jurisdiction code (for state notifications)
"""
try:
self.event_state_client.record_notification_attempt(
compact=self.compact,
message_id=self.message_id,
recipient_type=recipient_type,
status=NotificationStatus.SUCCESS,
provider_id=provider_id,
event_type=event_type,
event_time=event_time,
jurisdiction=jurisdiction,
)
except Exception as e: # noqa: BLE001
# If this cannot be written for whatever reason, we swallow the error since the notification itself was
# sent, and this step is just another layer of system redundancy, not business critical. Just log the error
# and move on.
logger.error(
'Unable to record notification success.',
compact=self.compact,
recipient_type=recipient_type,
provider_id=provider_id,
event_type=event_type,
jurisdiction=jurisdiction or 'None',
error=str(e),
)

def record_failure(
self,
*,
recipient_type: RecipientType,
provider_id: UUID,
event_type: EventType,
event_time: str,
error_message: str,
jurisdiction: str | None = None,
) -> None:
"""
Record a failed notification.

:param recipient_type: RecipientType enum or string ('provider' or 'state')
:param provider_id: Provider ID
:param event_type: EventType enum or string
:param event_time: Event timestamp
:param error_message: Error message describing the failure
:param jurisdiction: Jurisdiction code (for state notifications)
"""
try:
self.event_state_client.record_notification_attempt(
compact=self.compact,
message_id=self.message_id,
recipient_type=recipient_type,
status=NotificationStatus.FAILED,
provider_id=provider_id,
event_type=event_type,
event_time=event_time,
jurisdiction=jurisdiction,
error_message=error_message,
)
except Exception as e: # noqa: BLE001
# If this cannot be written, we swallow the error as the lambda will automatically retry and
# attempt to send out the notification again. Just log the error and move on.
logger.error(
'Unable to record notification failure.',
compact=self.compact,
recipient_type=recipient_type,
provider_id=provider_id,
event_type=event_type,
jurisdiction=jurisdiction or 'None',
error=str(e),
)
64 changes: 63 additions & 1 deletion backend/compact-connect/lambdas/python/common/cc_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def authorized(event: dict, context: LambdaContext):


def sqs_handler(fn: Callable):
"""Process messages from the ingest queue.
"""Process messages from an SQS queue.

This handler uses batch item failure reporting:
https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_SQS_Lambda_batch_item_failures_section.html
Expand Down Expand Up @@ -468,6 +468,68 @@ def process_messages(event, context: LambdaContext): # noqa: ARG001 unused-argu
return process_messages


def sqs_handler_with_notification_tracking(fn: Callable):
"""
Process messages from SQS with notification tracking capabilities.

This decorator provides a generic pattern for tracking notification delivery state
across SQS message retries. It creates a NotificationTracker and passes it as a
parameter to the handler function.

The handler function should accept (message: dict, tracker: NotificationTracker) as parameters.
"""

@wraps(fn)
@metrics.log_metrics
@logger.inject_lambda_context
def process_messages(event, context: LambdaContext): # noqa: ARG001 unused-argument
records = event['Records']
logger.info('Starting batch with notification tracking', batch_count=len(records))
batch_failures = []

for record in records:
try:
message = json.loads(record['body'])
message_id = record['messageId']

# Extract compact from message detail for notification tracking
compact = message.get('detail', {}).get('compact')
if not compact:
logger.warning('No compact found in message, skipping notification tracking', message_id=message_id)
# Still process the message but without tracking
fn(message, None)
continue

# Create notification tracker and pass as parameter
from cc_common.event_state_client import NotificationTracker

tracker = NotificationTracker(compact=compact, message_id=message_id)

logger.info(
'Processing message with notification tracking',
message_id=message_id,
compact=compact,
message_attributes=record.get('messageAttributes'),
)
fn(message, tracker)

# When we receive a batch of messages from SQS, letting an exception escape all the way back to AWS is
# really undesirable. Instead, we're going to catch _almost_ any exception raised, note what message we
# were processing, and report those item failures back to AWS.
except Exception as e: # noqa: BLE001 broad-exception-caught
logger.error(
'Failed to process message with notification tracking',
exception=str(e),
message_id=record['messageId'],
)
batch_failures.append({'itemIdentifier': record['messageId']})

logger.info('Completed batch', batch_failures=len(batch_failures))
return {'batchItemFailures': batch_failures}

return process_messages


def delayed_function(delay_seconds: float):
"""
Delay the result of the decorated function by the specified number of seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def setUpClass(cls):
'AWS_DEFAULT_REGION': 'us-east-1',
'BULK_BUCKET_NAME': 'cc-license-data-bulk-bucket',
'EVENT_BUS_NAME': 'license-data-events',
'EVENT_STATE_TABLE_NAME': 'event-state-table',
'PROVIDER_TABLE_NAME': 'provider-table',
'COMPACT_CONFIGURATION_TABLE_NAME': 'compact-configuration-table',
'EMAIL_NOTIFICATION_SERVICE_LAMBDA_NAME': 'email-notification-service',
Expand Down
Loading