Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,086 changes: 1,543 additions & 1,543 deletions docs/utilities/batch-processing.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,26 @@
// Validate typed handler configurations
ValidateTypedHandlerConfiguration();

// Check if typed handlers are configured (not yet fully supported in attributes)
// Check if typed handlers are configured
if (IsTypedHandlerConfigured())
{
throw new NotSupportedException("Typed record handlers are not yet fully supported with BatchProcessorAttribute. Please use direct typed batch processor calls for typed processing.");
// Create typed aspect handler
return eventType switch
{
BatchEventType.DynamoDbStream => CreateTypedBatchProcessingAspectHandler(() => TypedDynamoDbStreamBatchProcessor.TypedInstance, args),
BatchEventType.KinesisDataStream => CreateTypedBatchProcessingAspectHandler(() => TypedKinesisEventBatchProcessor.TypedInstance, args),
BatchEventType.Sqs => CreateTypedBatchProcessingAspectHandler(() => TypedSqsBatchProcessor.TypedInstance, args),
_ => throw new ArgumentOutOfRangeException($"{eventType}", eventType, "Unsupported event type.")
};
}

// Create aspect handler
// Create traditional aspect handler
return eventType switch
{
BatchEventType.DynamoDbStream => CreateBatchProcessingAspectHandler(() => DynamoDbStreamBatchProcessor.Instance),
BatchEventType.KinesisDataStream => CreateBatchProcessingAspectHandler(() => KinesisEventBatchProcessor.Instance),
BatchEventType.Sqs => CreateBatchProcessingAspectHandler(() => SqsBatchProcessor.Instance),
_ => throw new ArgumentOutOfRangeException(nameof(eventType), eventType, "Unsupported event type.")
_ => throw new ArgumentOutOfRangeException($"{eventType}", eventType, "Unsupported event type.")
};
}

Expand Down Expand Up @@ -395,6 +402,146 @@
});
}

private TypedBatchProcessingAspectHandler<TEvent, TRecord> CreateTypedBatchProcessingAspectHandler<TEvent, TRecord>(Func<ITypedBatchProcessor<TEvent, TRecord>> defaultTypedBatchProcessorProvider, IReadOnlyList<object> args)
{
// Create typed batch processor
ITypedBatchProcessor<TEvent, TRecord> typedBatchProcessor;
if (BatchProcessor != null && BatchProcessor.IsAssignableTo(TypedBatchProcessorTypes[GetEventTypeFromArgs(args)]))
{
try
{
typedBatchProcessor = (ITypedBatchProcessor<TEvent, TRecord>)Activator.CreateInstance(BatchProcessor)!;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{BatchProcessor.Name}'.", ex);
}
}
else
{
typedBatchProcessor = defaultTypedBatchProcessorProvider.Invoke();
}

// Create deserialization options
var deserializationOptions = new DeserializationOptions
{
ErrorPolicy = DeserializationErrorPolicy
};

if (JsonSerializerContext != null)
{
try
{
var jsonSerializerContext = (JsonSerializerContext)Activator.CreateInstance(JsonSerializerContext)!;
deserializationOptions.JsonSerializerContext = jsonSerializerContext;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of JsonSerializerContext: '{JsonSerializerContext.Name}'.", ex);
}
}

// Create processing options
var errorHandlingPolicy = Enum.TryParse(PowertoolsConfigurations.Instance.BatchProcessingErrorHandlingPolicy, true, out BatchProcessorErrorHandlingPolicy errHandlingPolicy)
? errHandlingPolicy
: ErrorHandlingPolicy;
if (ErrorHandlingPolicy != BatchProcessorErrorHandlingPolicy.DeriveFromEvent)
{
errorHandlingPolicy = ErrorHandlingPolicy;
}

var processingOptions = new ProcessingOptions
{
CancellationToken = CancellationToken.None,
ErrorHandlingPolicy = errorHandlingPolicy,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
BatchParallelProcessingEnabled = BatchParallelProcessingEnabled,
ThrowOnFullBatchFailure = ThrowOnFullBatchFailure
};

// Create typed handler wrapper
object typedHandler = null;
bool hasContext = false;

if (TypedRecordHandler != null)
{
try
{
typedHandler = Activator.CreateInstance(TypedRecordHandler)!;
hasContext = false;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{TypedRecordHandler.Name}'.", ex);
}
}
else if (TypedRecordHandlerProvider != null)
{
try
{
var provider = Activator.CreateInstance(TypedRecordHandlerProvider)!;
// Assume the provider has a Create() method that returns the handler
var createMethod = TypedRecordHandlerProvider.GetMethod("Create");

Check warning on line 484 in libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs

View workflow job for this annotation

GitHub Actions / build

'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicMethods' in call to 'System.Type.GetMethod(String)'. The return value of method 'AWS.Lambda.Powertools.BatchProcessing.BatchProcessorAttribute.TypedRecordHandlerProvider.get' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to.
if (createMethod == null)
{
throw new InvalidOperationException($"TypedRecordHandlerProvider '{TypedRecordHandlerProvider.Name}' must have a 'Create()' method.");
}
typedHandler = createMethod.Invoke(provider, null)!;
hasContext = false;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of typed record handler using provider: '{TypedRecordHandlerProvider.Name}'.", ex);
}
}
else if (TypedRecordHandlerWithContext != null)
{
try
{
typedHandler = Activator.CreateInstance(TypedRecordHandlerWithContext)!;
hasContext = true;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{TypedRecordHandlerWithContext.Name}'.", ex);
}
}
else if (TypedRecordHandlerWithContextProvider != null)
{
try
{
var provider = Activator.CreateInstance(TypedRecordHandlerWithContextProvider)!;
// Assume the provider has a Create() method that returns the handler
var createMethod = TypedRecordHandlerWithContextProvider.GetMethod("Create");

Check warning on line 515 in libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs

View workflow job for this annotation

GitHub Actions / build

'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicMethods' in call to 'System.Type.GetMethod(String)'. The return value of method 'AWS.Lambda.Powertools.BatchProcessing.BatchProcessorAttribute.TypedRecordHandlerWithContextProvider.get' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to.
if (createMethod == null)
{
throw new InvalidOperationException($"TypedRecordHandlerWithContextProvider '{TypedRecordHandlerWithContextProvider.Name}' must have a 'Create()' method.");
}
typedHandler = createMethod.Invoke(provider, null)!;
hasContext = true;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of typed record handler with context using provider: '{TypedRecordHandlerWithContextProvider.Name}'.", ex);
}
}
else
{
throw new InvalidOperationException("A typed record handler or typed record handler provider is required.");
}

return new TypedBatchProcessingAspectHandler<TEvent, TRecord>(typedBatchProcessor, typedHandler, hasContext, deserializationOptions, processingOptions);
}

private static BatchEventType GetEventTypeFromArgs(IReadOnlyList<object> args)
{
if (args == null || args.Count == 0 || !EventTypes.TryGetValue(args[0].GetType(), out var eventType))
{
throw new ArgumentException($"The first function handler parameter must be of one of the following types: {string.Join(',', EventTypes.Keys.Select(x => $"'{x.Namespace}'"))}.");
}
return eventType;
}

private void ValidateTypedHandlerConfiguration()
{
// Ensure only one type of handler is configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Amazon.Lambda.DynamoDBEvents;
using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
using AWS.Lambda.Powertools.BatchProcessing.Internal;
using AWS.Lambda.Powertools.Common;

namespace AWS.Lambda.Powertools.BatchProcessing.DynamoDb;

Expand All @@ -18,19 +19,66 @@ public class TypedDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor, I
private readonly IDeserializationService _deserializationService;
private readonly IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> _recordDataExtractor;

/// <summary>
/// The singleton instance of the typed DynamoDB stream batch processor.
/// </summary>
private static ITypedBatchProcessor<DynamoDBEvent, DynamoDBEvent.DynamodbStreamRecord> _typedInstance;

/// <summary>
/// Gets the typed instance.
/// </summary>
/// <value>The typed instance.</value>
public static ITypedBatchProcessor<DynamoDBEvent, DynamoDBEvent.DynamodbStreamRecord> TypedInstance =>
_typedInstance ??= new TypedDynamoDbStreamBatchProcessor();

/// <summary>
/// Return the typed instance ProcessingResult
/// </summary>
public new static ProcessingResult<DynamoDBEvent.DynamodbStreamRecord> Result => _typedInstance?.ProcessingResult;


/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class.
/// </summary>
/// <param name="powertoolsConfigurations">The Powertools configurations.</param>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
/// <param name="recordDataExtractor">The record data extractor. If null, uses DynamoDbRecordDataExtractor.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService = null,
public TypedDynamoDbStreamBatchProcessor(
IPowertoolsConfigurations powertoolsConfigurations,
IDeserializationService deserializationService = null,
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor = null)
{
_deserializationService = deserializationService ?? JsonDeserializationService.Instance;
_recordDataExtractor = recordDataExtractor ?? DynamoDbRecordDataExtractor.Instance;
}

/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class with custom deserialization service.
/// </summary>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService)
: this(PowertoolsConfigurations.Instance, deserializationService, null)
{
}

/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class with custom services.
/// </summary>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
/// <param name="recordDataExtractor">The record data extractor. If null, uses DynamoDbRecordDataExtractor.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService,
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor)
: this(PowertoolsConfigurations.Instance, deserializationService, recordDataExtractor)
{
}

/// <summary>
/// Default constructor for when consumers create a custom typed batch processor.
/// </summary>
public TypedDynamoDbStreamBatchProcessor() : this(PowertoolsConfigurations.Instance)
{
}

/// <inheritdoc />
public async Task<ProcessingResult<DynamoDBEvent.DynamodbStreamRecord>> ProcessAsync<T>(
DynamoDBEvent @event,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Amazon.Lambda.Core;

namespace AWS.Lambda.Powertools.BatchProcessing.Internal;

internal class TypedBatchProcessingAspectHandler<TEvent, TRecord> : IBatchProcessingAspectHandler
{
private readonly ITypedBatchProcessor<TEvent, TRecord> _typedBatchProcessor;
private readonly object _typedHandler;
private readonly bool _hasContext;
private readonly DeserializationOptions _deserializationOptions;
private readonly ProcessingOptions _processingOptions;

public TypedBatchProcessingAspectHandler(
ITypedBatchProcessor<TEvent, TRecord> typedBatchProcessor,
object typedHandler,
bool hasContext,
DeserializationOptions deserializationOptions,
ProcessingOptions processingOptions)
{
_typedBatchProcessor = typedBatchProcessor;
_typedHandler = typedHandler;
_hasContext = hasContext;
_deserializationOptions = deserializationOptions;
_processingOptions = processingOptions;
}

public async Task HandleAsync(object[] args)
{
// Try get event from args
if (args?.FirstOrDefault() is not TEvent @event)
{
throw new InvalidOperationException($"The first function handler parameter must be of type: '{typeof(TEvent).Namespace}'.");
}

// Get Lambda context if available and needed
ILambdaContext context = null;
if (_hasContext && args.Length > 1 && args[1] is ILambdaContext lambdaContext)
{
context = lambdaContext;
}

// Use reflection to call the appropriate ProcessAsync method on the typed batch processor
await CallTypedProcessAsync(@event, context);
}

private async Task CallTypedProcessAsync(TEvent @event, ILambdaContext context)
{
// Get the generic type argument from the handler
var handlerType = _typedHandler.GetType();
var handlerInterface = handlerType.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType &&
(i.GetGenericTypeDefinition() == typeof(ITypedRecordHandler<>) ||
i.GetGenericTypeDefinition() == typeof(ITypedRecordHandlerWithContext<>)));

if (handlerInterface == null)
{
throw new InvalidOperationException($"Handler type '{handlerType.Name}' does not implement ITypedRecordHandler<T> or ITypedRecordHandlerWithContext<T>.");
}

var dataType = handlerInterface.GetGenericArguments()[0];

// Find the appropriate ProcessAsync method on the typed batch processor
MethodInfo processMethod;
if (_hasContext && context != null)
{
// Look for ProcessAsync<T>(TEvent, ITypedRecordHandlerWithContext<T>, ILambdaContext, DeserializationOptions, ProcessingOptions)
processMethod = _typedBatchProcessor.GetType().GetMethods()
.FirstOrDefault(m => m.Name == "ProcessAsync" &&
m.IsGenericMethodDefinition &&
m.GetParameters().Length == 5 &&
m.GetParameters()[1].ParameterType.IsGenericType &&
m.GetParameters()[1].ParameterType.GetGenericTypeDefinition() == typeof(ITypedRecordHandlerWithContext<>) &&
m.GetParameters()[2].ParameterType == typeof(ILambdaContext) &&
m.GetParameters()[3].ParameterType == typeof(DeserializationOptions) &&
m.GetParameters()[4].ParameterType == typeof(ProcessingOptions));
}
else
{
// Look for ProcessAsync<T>(TEvent, ITypedRecordHandler<T>, DeserializationOptions, ProcessingOptions)
processMethod = _typedBatchProcessor.GetType().GetMethods()
.FirstOrDefault(m => m.Name == "ProcessAsync" &&
m.IsGenericMethodDefinition &&
m.GetParameters().Length == 4 &&
m.GetParameters()[1].ParameterType.IsGenericType &&
m.GetParameters()[1].ParameterType.GetGenericTypeDefinition() == typeof(ITypedRecordHandler<>) &&
m.GetParameters()[2].ParameterType == typeof(DeserializationOptions) &&
m.GetParameters()[3].ParameterType == typeof(ProcessingOptions));
}

if (processMethod == null)
{
throw new InvalidOperationException($"Could not find appropriate ProcessAsync method on typed batch processor for handler type '{handlerType.Name}'.");
}

// Make the method generic with the data type
var genericProcessMethod = processMethod.MakeGenericMethod(dataType);

// Call the method
Task processTask;
if (_hasContext && context != null)
{
processTask = (Task)genericProcessMethod.Invoke(_typedBatchProcessor, new object[] { @event, _typedHandler, context, _deserializationOptions, _processingOptions });
}
else
{
processTask = (Task)genericProcessMethod.Invoke(_typedBatchProcessor, new object[] { @event, _typedHandler, _deserializationOptions, _processingOptions });
}

await processTask;
}
}
Loading
Loading