Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing BatchManager in sdk-core #2613

Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8abe56f
Adding batchBuffer and testing code
Jul 13, 2021
55d6849
Adding sdk-core tests and fixing request to response mapping
Jul 14, 2021
f6e485a
Updating sdk-core tests with object to mimic batch request entry
Jul 14, 2021
fa01f37
Adding wrapper classes for functions
Jul 14, 2021
7a2c0b9
Making some changes for thread safety and updating tests
Jul 15, 2021
be0d218
Refactoring code and how requests are cleared. Simplifying scheduling…
Jul 15, 2021
d89d35f
Changing scheduled flush code. Need to test in next commit
Jul 15, 2021
7191141
Adding more test cases and scheduling periodically instead of just once
Jul 15, 2021
6efeb64
Refactoring responseMap into a wrapper class
Jul 15, 2021
aec0d5c
Fixing issues related to periodic scheduling. Need to test with multi…
Jul 19, 2021
db83454
Using map of currentIds instead of a singular currentId.
Jul 19, 2021
2c55e01
Fixed race conditions related to cancelling scheduled buffer flush
Jul 20, 2021
4b77e0b
Resetting cancellableFlush's flags after flush is completed
Jul 20, 2021
fe6950a
Multithreading test works. need to clean up code and recommit
Jul 20, 2021
1e3d0d8
Found another way to reset cancellableFlush flags. Also cleaning up code
Jul 20, 2021
ec2f127
Updating request batching tests with multi threading tests
Jul 20, 2021
1f9bf88
Renamed to batchManager and cleaned up variable names to be more self…
Jul 21, 2021
b11b1b9
Refactored sdk-core tests
Jul 21, 2021
581a620
Fixing checkstyle issues
Jul 21, 2021
043619d
Fixing checkstyle issues in the tests as well as refactoring some tests
Jul 21, 2021
102424b
Updating and adding SQS batching tests and removing xml.bind
Jul 21, 2021
4b6ea92
Using existing Md5Utils for Sqs batch tests
Jul 21, 2021
d1efa49
Naming and style changes to address PR comments
Jul 22, 2021
4b4cb85
Refactoring tests to use maps instead of arrays for better request-to…
Jul 22, 2021
2528a65
Name changes, refactoring code and addressing minor PR comments
Jul 22, 2021
d76a4ff
Refactoring SQS integration tests to use identifiable responses
Jul 23, 2021
cff589a
Combining request and response maps into one BatchingMap
Jul 23, 2021
e8089e6
Renaming generics as RequestT, ResponseT etc.
Jul 23, 2021
f478261
Refactoring tests to calculate batchGroupId from request and removing…
Jul 23, 2021
cbfc34a
Cleaning up names and methods
Jul 26, 2021
5c94dd2
Adding the batchGroupId function that I forgot last time
Jul 26, 2021
915be6f
Refactoring batchManager to use builder pattern and an overrideConfig…
Jul 26, 2021
e387b21
Removing need for separate executor (just use scheduled executor)
Jul 26, 2021
0b17fcb
Adding javadocs and renaming variables/classes to be more intuitive
Jul 26, 2021
7849b42
Refactoring incrementing currentId to a separate BatchUtils class
Jul 26, 2021
660803b
Refactoring currentIds map and scheduledFlushes map into the BatchBuffer
Jul 26, 2021
d9a40c3
Removing need to check if flush has executed but need to check if flu…
Jul 26, 2021
826fab3
Small changes to address github PR comments
Jul 27, 2021
424dd62
Removing completionService and just using CompletableFuture.supplyAsync
Jul 27, 2021
d2725ec
Adding code to cover exception thrown from the sendAndBatch function
Jul 27, 2021
89d55f1
Adding exception test and fixing batchingExcution locking
Jul 27, 2021
137b340
Adding lock object back into batchingExecutionContext for both read a…
Jul 27, 2021
7ac8916
Addressing minor github PR comments (making classes final, cleaning u…
Jul 28, 2021
52e6316
Just renaming cancelScheduledFlushIfNeeded method
Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
* It then sends the batch request and returns a CompletableFuture of the response.
* @param <RequestT> the type of an outgoing request.
* @param <ResponseT> the type of an outgoing batch response.
*/
@FunctionalInterface
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
@SdkProtectedApi
public interface BatchAndSendFunction<RequestT, ResponseT> {
CompletableFuture<ResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;

@SdkInternalApi
public class BatchContext<RequestT, ResponseT> {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved

private RequestT request;
private final CompletableFuture<ResponseT> response;

public BatchContext(RequestT request, CompletableFuture<ResponseT> response) {
this.request = request;
this.response = response;
}

public RequestT request() {
return request;
}

public CompletableFuture<ResponseT> response() {
return response;
}

public RequestT removeRequest() {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
RequestT ret = request;
request = null;
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;

/**
* Implementation of a generic buffer for automatic request batching.
* @param <RequestT> the type of an outgoing request.
* @param <ResponseT> the type of an outgoing response.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
@SdkInternalApi
public class BatchManager<RequestT, ResponseT, BatchResponseT> implements SdkAutoCloseable {
// TODO: Just a number from the CloudwatchMetricPublisher for now. Should I choose a different max task queue size?
private static final int MAXIMUM_TASK_QUEUE_SIZE = 128;

private static final Logger log = Logger.loggerFor(BatchManager.class);
private final int maxBatchItems;
private final Duration maxBatchOpenInMs;
private final BatchingMap<RequestT, ResponseT> requestsAndResponsesMaps;
private final Map<String, ScheduledFlush> scheduledFlushTasks;
private final Map<String, AtomicInteger> currentIds;
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
private final BatchAndSendFunction<RequestT, BatchResponseT> batchingFunction;
private final BatchResponseMapperFunction<BatchResponseT, ResponseT> mapResponsesFunction;
private final GetBatchGroupIdFunction<RequestT> batchGroupIdFunction;
16lim21 marked this conversation as resolved.
Show resolved Hide resolved

/**
* The executor that executes {@link #flushBuffer}
*/
private final ExecutorService executor;

/**
* A scheduled executor that periodically schedules {@link #flushBuffer} on the {@link #executor} thread. Note: The scheduled
* executor should never execute the flush task itself because that would modify the request and response maps which should
* only ever be modified from the {@link #executor} thread.
*/
private final ScheduledExecutorService scheduledExecutor;

public BatchManager(int maxBatchItems, Duration maxBatchOpenInMs, ScheduledExecutorService scheduledExecutor,
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
BatchAndSendFunction<RequestT, BatchResponseT> batchingFunction,
BatchResponseMapperFunction<BatchResponseT, ResponseT> mapResponsesFunction,
GetBatchGroupIdFunction<RequestT> batchGroupIdFunction) {
this.requestsAndResponsesMaps = new BatchingMap<>();
this.scheduledFlushTasks = new ConcurrentHashMap<>();
this.currentIds = new ConcurrentHashMap<>();
this.maxBatchItems = maxBatchItems;
this.maxBatchOpenInMs = maxBatchOpenInMs;
this.batchingFunction = batchingFunction;
this.mapResponsesFunction = mapResponsesFunction;
this.batchGroupIdFunction = batchGroupIdFunction;
ThreadFactory threadFactory = new ThreadFactoryBuilder().threadNamePrefix("batch-buffer").build();
this.scheduledExecutor = scheduledExecutor;
this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
zoewangg marked this conversation as resolved.
Show resolved Hide resolved
new ArrayBlockingQueue<>(MAXIMUM_TASK_QUEUE_SIZE),
threadFactory);
}

public CompletableFuture<ResponseT> sendRequest(RequestT request) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
String batchGroupId = batchGroupIdFunction.getBatchGroupId(request);
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<ResponseT> response = new CompletableFuture<>();
AtomicInteger currentId = currentIds.computeIfAbsent(batchGroupId, k -> new AtomicInteger(0));
String id = Integer.toString(getCurrentIdAndIncrement(currentId));
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
requestsAndResponsesMaps.getNestedMap(batchGroupId)
.put(id, request, response);

if (requestsAndResponsesMaps.get(batchGroupId).requestSize() < maxBatchItems) {
scheduledFlushTasks.computeIfAbsent(batchGroupId, k -> scheduleBufferFlush(batchGroupId, maxBatchOpenInMs.toMillis(),
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
scheduledExecutor));
} else {
CompletableFuture<ResponseT> cancelledResponse = cancelScheduledFlushIfNeeded(response, batchGroupId);
if (cancelledResponse != null) {
return cancelledResponse;
}
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
}
return response;
}

private CompletableFuture<ResponseT> cancelScheduledFlushIfNeeded(CompletableFuture<ResponseT> response,
String batchGroupId) {
if (scheduledFlushTasks.containsKey(batchGroupId)) {
// "reset" the flush task timer by cancelling scheduled task then restarting it.
ScheduledFlush scheduledFuture = scheduledFlushTasks.get(batchGroupId);
scheduledFuture.cancel();
// If scheduledFuture hasExecuted, do not perform a manual flush (initialDelay == 0), just return the response.
if (scheduledFuture.hasExecuted()) {
scheduledFlushTasks.put(batchGroupId, scheduleBufferFlush(batchGroupId, maxBatchOpenInMs.toMillis(),
scheduledExecutor));
return response;
}
}
scheduledFlushTasks.put(batchGroupId, scheduleBufferFlush(batchGroupId, 0, maxBatchOpenInMs.toMillis(),
scheduledExecutor));
return null;
}

private Future<?> flushBuffer(String batchGroupId) {
return executor.submit(() -> internalFlushBuffer(batchGroupId));
}

// Flushes the buffer for the given batchGroupId and fills in the response map with the returned responses.
// Returns exception in completableFuture if batchingFunction.apply throws an exception.
private void internalFlushBuffer(String batchGroupId) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
BatchingGroupMap<RequestT, ResponseT> requestBuffer = requestsAndResponsesMaps.get(batchGroupId);
if (!requestBuffer.hasRequests()) {
return;
}

List<IdentifiableRequest<RequestT>> requestEntryList = new ArrayList<>();
Iterator<Map.Entry<String, BatchContext<RequestT, ResponseT>>> requestIterator = requestBuffer.entrySet().iterator();
while (requestEntryList.size() < maxBatchItems && requestIterator.hasNext()) {
Map.Entry<String, BatchContext<RequestT, ResponseT>> entry = requestIterator.next();
RequestT request = entry.getValue().request();
if (request != null) {
requestEntryList.add(new IdentifiableRequest<>(entry.getKey(), request));
requestBuffer.removeRequest(entry.getKey());
}
}
if (!requestEntryList.isEmpty()) {
batchingFunction.batchAndSend(requestEntryList, batchGroupId)
.whenComplete((result, ex) -> handleAndCompleteResponses(batchGroupId, result, ex));
}
}

private void handleAndCompleteResponses(String batchGroupId, BatchResponseT batchResult, Throwable exception) {
if (exception != null) {
requestsAndResponsesMaps.get(batchGroupId)
.values()
.forEach(batchContext -> batchContext.response().completeExceptionally(exception));
} else {
List<IdentifiableResponse<ResponseT>> identifiedResponses = mapResponsesFunction.mapBatchResponse(batchResult);
for (IdentifiableResponse<ResponseT> identifiedResponse : identifiedResponses) {
String id = identifiedResponse.id();
ResponseT response = identifiedResponse.response();
requestsAndResponsesMaps.get(batchGroupId)
.getResponse(id)
.complete(response);
requestsAndResponsesMaps.get(batchGroupId)
.remove(id);
}
}
}

private ScheduledFlush scheduleBufferFlush(String batchGroupId, long timeOutInMs,
ScheduledExecutorService scheduledExecutor) {
return scheduleBufferFlush(batchGroupId, timeOutInMs, timeOutInMs, scheduledExecutor);
}

private ScheduledFlush scheduleBufferFlush(String batchGroupId, long initialDelay, long timeOutInMs,
ScheduledExecutorService scheduledExecutor) {
CancellableFlush flushTask = new CancellableFlush(() -> flushBuffer(batchGroupId));
ScheduledFuture<?> scheduledFuture = scheduledExecutor.scheduleAtFixedRate(() -> {
flushTask.reset();
flushTask.run();
}, initialDelay, timeOutInMs, TimeUnit.MILLISECONDS);
return new ScheduledFlush(flushTask, scheduledFuture);
}

public void close() {
try {
scheduledExecutor.shutdownNow();
scheduledFlushTasks.forEach((key, value) -> value.cancel());
requestsAndResponsesMaps.forEach((key, value) -> flushBuffer(key));
for (BatchingGroupMap<RequestT, ResponseT> idToResponse : requestsAndResponsesMaps.values()) {
CompletableFuture.allOf(idToResponse.responses().toArray(new CompletableFuture[0]))
.get(60, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(() -> "Interrupted during BatchBuffer shutdown" + e);
} catch (ExecutionException e) {
log.warn(() -> "Failed during graceful metric publisher shutdown." + e);
} catch (TimeoutException e) {
log.warn(() -> "Timed out during graceful metric publisher shutdown." + e);
} finally {
scheduledExecutor.shutdownNow();
}
}

private synchronized int getCurrentIdAndIncrement(AtomicInteger currentId) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
int id = currentId.getAndIncrement();
if (id < 0) {
currentId.set(1);
id = 0;
}
return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.List;
import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Unpacks the batch response, then transforms individual entries to the appropriate response type. Each entry's batch ID
* is mapped to the individual response entry.
* @param <BatchResponseT> the type of an outgoing batch response.
* @param <ResponseT> the type of an outgoing response.
*/
@FunctionalInterface
@SdkProtectedApi
public interface BatchResponseMapperFunction<BatchResponseT, ResponseT> {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
List<IdentifiableResponse<ResponseT>> mapBatchResponse(BatchResponseT batchResponse);
}