Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
name: Test ASF Lambda provider
environment:
PROVIDER_OVERRIDE_LAMBDA: "asf"
TEST_PATH: "tests/integration/awslambda/test_lambda.py tests/integration/awslambda/test_lambda_api.py tests/integration/awslambda/test_lambda_common.py tests/integration/awslambda/test_lambda_integration_sqs.py tests/integration/cloudformation/resources/test_lambda.py"
TEST_PATH: "tests/integration/awslambda/test_lambda.py tests/integration/awslambda/test_lambda_api.py tests/integration/awslambda/test_lambda_common.py tests/integration/awslambda/test_lambda_integration_sqs.py tests/integration/cloudformation/resources/test_lambda.py tests/integration/awslambda/test_lambda_integration_dynamodbstreams.py tests/integration/awslambda/test_lambda_integration_kinesis.py"
PYTEST_ARGS: "--reruns 3 --junitxml=target/reports/lambda_asf.xml -o junit_suite_name='lambda_asf'"
COVERAGE_ARGS: "-p"
command: make test-coverage
Expand Down
10 changes: 2 additions & 8 deletions localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,8 @@ def in_docker():
KINESIS_PROVIDER = os.environ.get("KINESIS_PROVIDER") or "kinesis-mock"

# Whether or not to handle lambda event sources as synchronous invocations
SYNCHRONOUS_SNS_EVENTS = is_env_true("SYNCHRONOUS_SNS_EVENTS")
SYNCHRONOUS_SQS_EVENTS = is_env_true("SYNCHRONOUS_SQS_EVENTS")
SYNCHRONOUS_API_GATEWAY_EVENTS = is_env_not_false("SYNCHRONOUS_API_GATEWAY_EVENTS")
SYNCHRONOUS_KINESIS_EVENTS = is_env_not_false("SYNCHRONOUS_KINESIS_EVENTS")
SYNCHRONOUS_DYNAMODB_EVENTS = is_env_not_false("SYNCHRONOUS_DYNAMODB_EVENTS")
SYNCHRONOUS_SNS_EVENTS = is_env_true("SYNCHRONOUS_SNS_EVENTS") # DEPRECATED
SYNCHRONOUS_KINESIS_EVENTS = is_env_not_false("SYNCHRONOUS_KINESIS_EVENTS") # DEPRECATED

# randomly inject faults to Kinesis
KINESIS_ERROR_PROBABILITY = float(os.environ.get("KINESIS_ERROR_PROBABILITY", "").strip() or 0.0)
Expand Down Expand Up @@ -787,11 +784,8 @@ def in_docker():
"SQS_ENDPOINT_STRATEGY",
"SQS_PORT_EXTERNAL",
"STEPFUNCTIONS_LAMBDA_ENDPOINT",
"SYNCHRONOUS_API_GATEWAY_EVENTS",
"SYNCHRONOUS_DYNAMODB_EVENTS",
"SYNCHRONOUS_KINESIS_EVENTS",
"SYNCHRONOUS_SNS_EVENTS",
"SYNCHRONOUS_SQS_EVENTS",
"TEST_AWS_ACCOUNT_ID",
"TEST_IAM_USER_ID",
"TEST_IAM_USER_NAME",
Expand Down
186 changes: 154 additions & 32 deletions localstack/services/awslambda/event_source_listeners/adapters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
import threading
from abc import ABC
from concurrent.futures import Future
from typing import Callable, Optional

from localstack import config
from localstack.aws.api.lambda_ import InvocationType
from localstack.services.awslambda import api_utils
from localstack.services.awslambda.invocation.lambda_models import InvocationError, InvocationResult
Expand All @@ -18,9 +21,34 @@


class EventSourceAdapter(ABC):
"""Adapter for the communication between event source mapping and lambda service"""
"""
Adapter for the communication between event source mapping and lambda service
Generally just a temporary construct to bridge the old and new provider and re-use the existing event source listeners.

Remove this file when sunsetting the legacy provider or when replacing the event source listeners.
"""

def invoke(
self,
function_arn: str,
context: dict,
payload: dict,
invocation_type: InvocationType,
callback: Optional[Callable] = None,
) -> None:
pass

def invoke(self, function_arn, context, payload, invocation_type, callback) -> None:
def invoke_with_statuscode(
self,
function_arn,
context,
payload,
invocation_type,
callback=None,
*,
lock_discriminator,
parallelization_factor
) -> int:
pass

def get_event_sources(self, source_arn: str):
Expand All @@ -31,7 +59,7 @@ class EventSourceLegacyAdapter(EventSourceAdapter):
def __init__(self):
pass

def invoke(self, function_arn, context, payload, invocation_type, callback):
def invoke(self, function_arn, context, payload, invocation_type, callback=None):
from localstack.services.awslambda.lambda_api import run_lambda

run_lambda(
Expand All @@ -42,6 +70,38 @@ def invoke(self, function_arn, context, payload, invocation_type, callback):
callback=callback,
)

def invoke_with_statuscode(
self,
function_arn,
context,
payload,
invocation_type,
callback=None,
*,
lock_discriminator,
parallelization_factor
) -> int:
from localstack.services.awslambda import lambda_executors
from localstack.services.awslambda.lambda_api import run_lambda

if not config.SYNCHRONOUS_KINESIS_EVENTS:
lambda_executors.LAMBDA_ASYNC_LOCKS.assure_lock_present(
lock_discriminator, threading.BoundedSemaphore(parallelization_factor)
)
else:
lock_discriminator = None

result = run_lambda(
func_arn=function_arn,
event=payload,
context=context,
asynchronous=(invocation_type == InvocationType.Event),
callback=callback,
lock_discriminator=lock_discriminator,
)
status_code = getattr(result.result, "status_code", 0)
return status_code

def get_event_sources(self, source_arn: str) -> list:
from localstack.services.awslambda.lambda_api import get_event_sources

Expand All @@ -58,7 +118,7 @@ class EventSourceAsfAdapter(EventSourceAdapter):
def __init__(self, lambda_service: LambdaService):
self.lambda_service = lambda_service

def invoke(self, function_arn, context, payload, invocation_type, callback):
def invoke(self, function_arn, context, payload, invocation_type, callback=None):

# split ARN ( a bit unnecessary since we build an ARN again in the service)
fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(function_arn).groupdict()
Expand All @@ -74,33 +134,95 @@ def invoke(self, function_arn, context, payload, invocation_type, callback):
payload=to_bytes(json.dumps(payload or {})),
)

def new_callback(ft_result: Future[InvocationResult]) -> None:
try:
result = ft_result.result(timeout=10)
error = None
if isinstance(result, InvocationError):
error = "?"
callback(
result=LegacyInvocationResult(
result=to_str(json.loads(result.payload)),
log_output=result.logs,
),
func_arn="doesntmatter",
event="doesntmatter",
error=error,
)

except Exception as e:
# TODO: map exception to old error format?
LOG.error(e)
callback(
result=None,
func_arn="doesntmatter",
event="doesntmatter",
error=e,
)

ft.add_done_callback(new_callback)
if callback:

def mapped_callback(ft_result: Future[InvocationResult]) -> None:
try:
result = ft_result.result(timeout=10)
error = None
if isinstance(result, InvocationError):
error = "?"
callback(
result=LegacyInvocationResult(
result=to_str(json.loads(result.payload)),
log_output=result.logs,
),
func_arn="doesntmatter",
event="doesntmatter",
error=error,
)

except Exception as e:
# TODO: map exception to old error format?
LOG.debug("Encountered an exception while handling callback", exc_info=True)
callback(
result=None,
func_arn="doesntmatter",
event="doesntmatter",
error=e,
)

ft.add_done_callback(mapped_callback)

def invoke_with_statuscode(
self,
function_arn,
context,
payload,
invocation_type,
callback=None,
*,
lock_discriminator,
parallelization_factor
) -> int:
# split ARN ( a bit unnecessary since we build an ARN again in the service)
fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(function_arn).groupdict()

try:
ft = self.lambda_service.invoke(
# basically function ARN
function_name=fn_parts["function_name"],
qualifier=fn_parts["qualifier"],
region=fn_parts["region_name"],
account_id=fn_parts["account_id"],
invocation_type=invocation_type,
client_context=json.dumps(context or {}),
payload=to_bytes(json.dumps(payload or {})),
)

if callback:

def mapped_callback(ft_result: Future[InvocationResult]) -> None:
try:
result = ft_result.result(timeout=10)
error = None
if isinstance(result, InvocationError):
error = "?"
callback(
result=LegacyInvocationResult(
result=to_str(json.loads(result.payload)),
log_output=result.logs,
),
func_arn="doesntmatter",
event="doesntmatter",
error=error,
)

except Exception as e:
LOG.debug("Encountered an exception while handling callback", exc_info=True)
callback(
result=None,
func_arn="doesntmatter",
event="doesntmatter",
error=e,
)

ft.add_done_callback(mapped_callback)

return 200
except Exception:
LOG.debug("Encountered an exception while handling lambda invoke", exc_info=True)
return 500

def get_event_sources(self, source_arn: str):
# assuming the region/account from function_arn
Expand All @@ -112,5 +234,5 @@ def get_event_sources(self, source_arn: str):
if event_source_arn_matches(
mapped=esm.get("EventSourceArn"), searched=source_arn
):
results.append(esm)
results.append(esm.copy())
return results
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from localstack.services.awslambda.event_source_listeners.stream_event_source_listener import (
StreamEventSourceListener,
)
from localstack.services.awslambda.lambda_api import get_event_sources
from localstack.utils.aws import aws_stack
from localstack.utils.threads import FuncThread

Expand All @@ -23,7 +22,7 @@ def source_type() -> Optional[str]:
return "dynamodb"

def _get_matching_event_sources(self) -> List[Dict]:
event_sources = get_event_sources(source_arn=r".*:dynamodb:.*")
event_sources = self._invoke_adapter.get_event_sources(source_arn=r".*:dynamodb:.*")
return [source for source in event_sources if source["State"] == "Enabled"]

def _get_stream_client(self, region_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def start_listeners_for_asf(event_source_mapping: Dict, lambda_service: LambdaSe
"""limited version of start_listeners for the new provider during migration"""
# force import EventSourceListener subclasses
# otherwise they will not be detected by EventSourceListener.get(service_type)
from . import dynamodb_event_source_listener # noqa: F401
from . import kinesis_event_source_listener # noqa: F401
from . import sqs_event_source_listener # noqa: F401

source_arn = event_source_mapping.get("EventSourceArn") or ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from localstack.services.awslambda.event_source_listeners.stream_event_source_listener import (
StreamEventSourceListener,
)
from localstack.services.awslambda.lambda_api import get_event_sources
from localstack.utils.aws import aws_stack
from localstack.utils.common import first_char_to_lower, to_str
from localstack.utils.threads import FuncThread
Expand All @@ -25,7 +24,7 @@ def source_type() -> Optional[str]:
return "kinesis"

def _get_matching_event_sources(self) -> List[Dict]:
event_sources = get_event_sources(source_arn=r".*:kinesis:.*")
event_sources = self._invoke_adapter.get_event_sources(source_arn=r".*:kinesis:.*")
return [source for source in event_sources if source["State"] == "Enabled"]

def _get_stream_client(self, region_name):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import logging
import math
import threading
import time
from typing import Dict, List, Optional, Tuple

from localstack import config
from localstack.services.awslambda import lambda_executors
from localstack.aws.api.lambda_ import InvocationType
from localstack.services.awslambda.event_source_listeners.adapters import (
EventSourceAdapter,
EventSourceLegacyAdapter,
)
from localstack.services.awslambda.event_source_listeners.event_source_listener import (
EventSourceListener,
)
from localstack.services.awslambda.lambda_api import run_lambda
from localstack.services.awslambda.lambda_executors import InvocationResult
from localstack.services.awslambda.lambda_utils import filter_stream_records
from localstack.utils.aws.aws_stack import extract_region_from_arn
from localstack.utils.aws.message_forwarding import send_event_to_target
Expand Down Expand Up @@ -131,33 +127,29 @@ def start(self, invoke_adapter: Optional[EventSourceAdapter] = None):
)
self._COORDINATOR_THREAD.start()

# TODO: remove lock_discriminator and parallelization_factor old lambda provider is gone
def _invoke_lambda(
self, function_arn, payload, lock_discriminator, parallelization_factor
) -> Tuple[bool, int]:
"""
invoke a given lambda function
:returns: True if the invocation was successful (False otherwise) and the status code of the invocation result

# TODO: rework this to properly invoke a lambda through the API. Needs additional restructuring upstream of this function as well.
"""
if not config.SYNCHRONOUS_KINESIS_EVENTS:
lambda_executors.LAMBDA_ASYNC_LOCKS.assure_lock_present(
lock_discriminator, threading.BoundedSemaphore(parallelization_factor)
)
else:
lock_discriminator = None

result = run_lambda(
func_arn=function_arn,
event=payload,
status_code = self._invoke_adapter.invoke_with_statuscode(
function_arn=function_arn,
payload=payload,
invocation_type=InvocationType.RequestResponse,
context={},
asynchronous=not config.SYNCHRONOUS_KINESIS_EVENTS,
lock_discriminator=lock_discriminator,
parallelization_factor=parallelization_factor,
)
if isinstance(result, InvocationResult):
status_code = getattr(result.result, "status_code", 0)
if status_code >= 400:
return False, status_code
return True, status_code
return False, 500

if status_code >= 400:
return False, status_code
return True, status_code

def _get_lambda_event_filters_for_arn(self, function_arn: str, queue_arn: str):
result = []
Expand Down Expand Up @@ -360,7 +352,9 @@ def _monitor_stream_event_sources(self, *args):
"function_arn": source["FunctionArn"],
"stream_arn": stream_arn,
"batch_size": batch_size,
"parallelization_factor": source["ParallelizationFactor"],
"parallelization_factor": source.get(
"ParallelizationFactor", 1
),
"lock_discriminator": lock_discriminator,
"shard_id": shard_id,
"stream_client": stream_client,
Expand Down
Loading