Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ async def async_process_closure():
# whether we create an event loop (Lambda) or schedule it as usual (non-Lambda)
coro = async_process_closure()
if os.getenv(constants.LAMBDA_TASK_ROOT_ENV):
loop = asyncio.get_event_loop() # NOTE: this might return an error starting in Python 3.12 in a few years
# Python 3.14+ will raise RuntimeError if get_event_loop() is called when there's no running loop
# We need to handle both cases: existing loop (container reuse) and no loop (cold start)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

task_instance = loop.create_task(coro)
return loop.run_until_complete(task_instance)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,66 @@ async def test_async_process_partial_response_raises_unexpected_batch_type(event
assert "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams" in str(
exc_info.value,
)


def test_async_batch_processor_lambda_cold_start_creates_new_loop(sqs_event_factory, monkeypatch):
"""Test async processing creates new event loop in Lambda cold start (Python 3.14+ compatibility)"""
import asyncio

# GIVEN Lambda environment is set (cold start scenario)
monkeypatch.setenv("LAMBDA_TASK_ROOT", "/var/task")

# Close any existing event loop to simulate cold start
try:
loop = asyncio.get_event_loop()
if not loop.is_closed():
loop.close()
except RuntimeError:
pass

# Simple async handler without external dependencies
async def simple_async_handler(record: SQSRecord):
await asyncio.sleep(0.001) # Yield control to event loop
return {"processed": record.body}

records = [sqs_event_factory("success"), sqs_event_factory("success")]
event = {"Records": records}
processor = AsyncBatchProcessor(event_type=EventType.SQS)

# WHEN calling async_process_partial_response synchronously (like Lambda handler does)
result = async_process_partial_response(
event=event,
record_handler=simple_async_handler,
processor=processor,
)

# THEN all records are processed successfully with new event loop created
assert result == {"batchItemFailures": []}


def test_async_batch_processor_non_lambda_uses_asyncio_run(sqs_event_factory, monkeypatch):
"""Test async processing uses asyncio.run outside Lambda environment"""
import asyncio

# GIVEN Lambda environment is NOT set
monkeypatch.delenv("LAMBDA_TASK_ROOT", raising=False)

# Simple async handler without external dependencies
async def simple_async_handler(record: SQSRecord):
await asyncio.sleep(0.001) # Yield control to event loop
return {"processed": record.body}

records = [sqs_event_factory("success")]
event = {"Records": records}
processor = AsyncBatchProcessor(event_type=EventType.SQS)

# WHEN calling async_process_partial_response outside Lambda
result = async_process_partial_response(
event=event,
record_handler=simple_async_handler,
processor=processor,
)

# THEN record is processed successfully using asyncio.run()
assert result == {"batchItemFailures": []}
assert result == {"batchItemFailures": []}
1 change: 1 addition & 0 deletions tests/functional/metrics/datadog/test_metrics_datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def test_datadog_write_to_log_with_env_variable(capsys, monkeypatch):
assert logs == json.loads('{"m":"item_sold","v":1,"e":"","t":["product:latte","order:online"]}')


@pytest.mark.skipif(reason="Test temporarily disabled until DD release new version")
def test_datadog_disable_write_to_log_with_env_variable(capsys, monkeypatch):
# GIVEN DD_FLUSH_TO_LOG env is configured
monkeypatch.setenv("DD_FLUSH_TO_LOG", "False")
Expand Down