Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing BatchManager in sdk-core #2613

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8abe56f
Adding batchBuffer and testing code
Jul 13, 2021
55d6849
Adding sdk-core tests and fixing request to response mapping
Jul 14, 2021
f6e485a
Updating sdk-core tests with object to mimic batch request entry
Jul 14, 2021
fa01f37
Adding wrapper classes for functions
Jul 14, 2021
7a2c0b9
Making some changes for thread safety and updating tests
Jul 15, 2021
be0d218
Refactoring code and how requests are cleared. Simplifying scheduling…
Jul 15, 2021
d89d35f
Changing scheduled flush code. Need to test in next commit
Jul 15, 2021
7191141
Adding more test cases and scheduling periodically instead of just once
Jul 15, 2021
6efeb64
Refactoring responseMap into a wrapper class
Jul 15, 2021
aec0d5c
Fixing issues related to periodic scheduling. Need to test with multi…
Jul 19, 2021
db83454
Using map of currentIds instead of a singular currentId.
Jul 19, 2021
2c55e01
Fixed race conditions related to cancelling scheduled buffer flush
Jul 20, 2021
4b77e0b
Resetting cancellableFlush's flags after flush is completed
Jul 20, 2021
fe6950a
Multithreading test works. need to clean up code and recommit
Jul 20, 2021
1e3d0d8
Found another way to reset cancellableFlush flags. Also cleaning up code
Jul 20, 2021
ec2f127
Updating request batching tests with multi threading tests
Jul 20, 2021
1f9bf88
Renamed to batchManager and cleaned up variable names to be more self…
Jul 21, 2021
b11b1b9
Refactored sdk-core tests
Jul 21, 2021
581a620
Fixing checkstyle issues
Jul 21, 2021
043619d
Fixing checkstyle issues in the tests as well as refactoring some tests
Jul 21, 2021
102424b
Updating and adding SQS batching tests and removing xml.bind
Jul 21, 2021
4b6ea92
Using existing Md5Utils for Sqs batch tests
Jul 21, 2021
d1efa49
Naming and style changes to address PR comments
Jul 22, 2021
4b4cb85
Refactoring tests to use maps instead of arrays for better request-to…
Jul 22, 2021
2528a65
Name changes, refactoring code and addressing minor PR comments
Jul 22, 2021
d76a4ff
Refactoring SQS integration tests to use identifiable responses
Jul 23, 2021
cff589a
Combining request and response maps into one BatchingMap
Jul 23, 2021
e8089e6
Renaming generics as RequestT, ResponseT etc.
Jul 23, 2021
f478261
Refactoring tests to calculate batchGroupId from request and removing…
Jul 23, 2021
cbfc34a
Cleaning up names and methods
Jul 26, 2021
5c94dd2
Adding the batchGroupId function that I forgot last time
Jul 26, 2021
915be6f
Refactoring batchManager to use builder pattern and an overrideConfig…
Jul 26, 2021
e387b21
Removing need for separate executor (just use scheduled executor)
Jul 26, 2021
0b17fcb
Adding javadocs and renaming variables/classes to be more intuitive
Jul 26, 2021
7849b42
Refactoring incrementing currentId to a separate BatchUtils class
Jul 26, 2021
660803b
Refactoring currentIds map and scheduledFlushes map into the BatchBuffer
Jul 26, 2021
d9a40c3
Removing need to check if flush has executed but need to check if flu…
Jul 26, 2021
826fab3
Small changes to address github PR comments
Jul 27, 2021
424dd62
Removing completionService and just using CompletableFuture.supplyAsync
Jul 27, 2021
d2725ec
Adding code to cover exception thrown from the sendAndBatch function
Jul 27, 2021
89d55f1
Adding exception test and fixing batchingExcution locking
Jul 27, 2021
137b340
Adding lock object back into batchingExecutionContext for both read a…
Jul 27, 2021
7ac8916
Addressing minor github PR comments (making classes final, cleaning u…
Jul 28, 2021
52e6316
Just renaming cancelScheduledFlushIfNeeded method
Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
* It then sends the batch request and returns a CompletableFuture of the response.
* @param <T> the type of an outgoing request.
* @param <U> the type of an outgoing batch response.
*/
@FunctionalInterface
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
public interface BatchAndSendFunction<T, U> {
CompletableFuture<U> batchAndSend(List<IdentifiedRequest<T>> identifiedRequests, String destination);
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;

/**
* Implementation of a generic buffer for automatic request batching.
* @param <T> the type of an outgoing request.
* @param <U> the type of an outgoing response.
* @param <V> the type of an outgoing batch response.
*/
@SdkInternalApi
public class BatchManager<T, U, V> {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
// Just a number from the cloudwatch metric publisher for now. Not sure if I should choose a different max task queue size?
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
private static final int MAXIMUM_TASK_QUEUE_SIZE = 128;

private final BatchingMap<T> batchGroupIdToIdToRequest;
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
private final BatchingMap<CompletableFuture<U>> batchGroupIdToIdToResponse;
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, ScheduledFlush> scheduledFlushTasks;
private final Map<String, AtomicInteger> currentIds;
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
private final BatchAndSendFunction<T, V> batchingFunction;
private final UnpackBatchResponseFunction<V, U> unpackResponseFunction;
private final ScheduledExecutorService scheduledExecutor;
private final ThreadFactory threadFactory;
private final ExecutorService executor;
private final Duration maxBatchOpenInMs;
private final int maxBatchItems;

public BatchManager(int maxBatchItems, Duration maxBatchOpenInMs,
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
BatchAndSendFunction<T, V> batchingFunction,
UnpackBatchResponseFunction<V, U> unpackResponseFunction) {
this.batchGroupIdToIdToRequest = new BatchingMap<>();
this.batchGroupIdToIdToResponse = new BatchingMap<>();
this.scheduledFlushTasks = new ConcurrentHashMap<>();
this.currentIds = new ConcurrentHashMap<>();
this.maxBatchItems = maxBatchItems;
this.maxBatchOpenInMs = maxBatchOpenInMs;
this.batchingFunction = batchingFunction;
this.unpackResponseFunction = unpackResponseFunction;
this.threadFactory = new ThreadFactoryBuilder().threadNamePrefix("batch-buffer").build();
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
zoewangg marked this conversation as resolved.
Show resolved Hide resolved
new ArrayBlockingQueue<>(MAXIMUM_TASK_QUEUE_SIZE),
threadFactory);
}

public CompletableFuture<U> sendRequest(T request, String batchGroupId) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<U> response = new CompletableFuture<>();
AtomicInteger currentId = currentIds.computeIfAbsent(batchGroupId, k -> new AtomicInteger(0));
String id = Integer.toString(getCurrentIdAndIncrement(currentId));
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
batchGroupIdToIdToResponse.getNestedMap(batchGroupId)
.put(id, response);
batchGroupIdToIdToRequest.getNestedMap(batchGroupId)
.put(id, request);

if (batchGroupIdToIdToRequest.get(batchGroupId).size() < maxBatchItems || checkIfScheduledFlush(batchGroupId)) {
if (!scheduledFlushTasks.containsKey(batchGroupId)) {
scheduledFlushTasks.put(batchGroupId, scheduleBufferFlush(batchGroupId, maxBatchOpenInMs.toMillis(),
scheduledExecutor));
}
} else {
if (scheduledFlushTasks.containsKey(batchGroupId)) {
// "reset" the flush task timer by cancelling scheduled task then restarting it.
ScheduledFlush scheduledFuture = scheduledFlushTasks.get(batchGroupId);
scheduledFuture.cancel();
if (scheduledFuture.hasExecuted()) {
scheduledFlushTasks.put(batchGroupId, scheduleBufferFlush(batchGroupId, maxBatchOpenInMs.toMillis(),
scheduledExecutor));
return response;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does scheduledFuture.hasExecuted() mean the scheduledFuture has been cancelled or has been executed?

Why do we need to schedule another flush here?

}
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
scheduledFlushTasks.put(batchGroupId, scheduleBufferFlush(batchGroupId, 0, maxBatchOpenInMs.toMillis(),
scheduledExecutor));
}
return response;
}

private void flushBuffer(String batchGroupId) {
executor.submit(() -> internalFlushBuffer(batchGroupId));
}

// Flushes the buffer for the given batchGroupId and fills in the response map with the returned responses.
// Returns exception in completableFuture if batchingFunction.apply throws an exception.
private void internalFlushBuffer(String batchGroupId) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, T> requestBuffer = batchGroupIdToIdToRequest.get(batchGroupId);
Map<String, T> requestBufferCopy = new HashMap<>(requestBuffer);
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
if (requestBufferCopy.isEmpty()) {
return;
}

List<IdentifiedRequest<T>> requestEntryList = new ArrayList<>();
Iterator<Map.Entry<String, T>> requestIterator = requestBufferCopy.entrySet().iterator();
for (int i = 0; i < maxBatchItems && requestIterator.hasNext(); i++) {
Map.Entry<String, T> entry = requestIterator.next();
requestEntryList.add(new IdentifiedRequest<>(entry.getKey(), entry.getValue()));
requestBuffer.remove(entry.getKey());
}

batchingFunction.batchAndSend(requestEntryList, batchGroupId)
.whenComplete((result, ex) -> handleAndCompleteResponses(batchGroupId, result, ex));
}

private void handleAndCompleteResponses(String batchGroupId, V batchResult, Throwable exception) {
if (exception != null) {
batchGroupIdToIdToResponse.get(batchGroupId)
.values()
.forEach(responseFuture -> responseFuture.completeExceptionally(exception));
} else {
List<IdentifiedResponse<U>> identifiedResponses = unpackResponseFunction.unpackBatchResponse(batchResult);
for (IdentifiedResponse<U> identifiedResponse : identifiedResponses) {
String id = identifiedResponse.getId();
U response = identifiedResponse.getResponse();
batchGroupIdToIdToResponse.get(batchGroupId)
.get(id)
.complete(response);
batchGroupIdToIdToResponse.get(batchGroupId)
.remove(id);
}
}
}

private ScheduledFlush scheduleBufferFlush(String batchGroupId, long timeOutInMs,
ScheduledExecutorService scheduledExecutor) {
return scheduleBufferFlush(batchGroupId, timeOutInMs, timeOutInMs, scheduledExecutor);
}

private ScheduledFlush scheduleBufferFlush(String batchGroupId, long initialDelay, long timeOutInMs,
ScheduledExecutorService scheduledExecutor) {
CancellableFlush flushTask = new CancellableFlush(() -> flushBuffer(batchGroupId));
ScheduledFuture<?> scheduledFuture = scheduledExecutor.scheduleAtFixedRate(() -> {
flushTask.reset();
flushTask.run();
},
initialDelay,
timeOutInMs,
TimeUnit.MILLISECONDS);
return new ScheduledFlush(flushTask, scheduledFuture);
}

// Returns true if a flush is currently being executed.
private boolean checkIfScheduledFlush(String batchGroupId) {
if (scheduledFlushTasks.containsKey(batchGroupId)) {
return scheduledFlushTasks.get(batchGroupId).hasExecuted();
}
return false;
}

public void close() {
try {
scheduledExecutor.shutdownNow();
scheduledFlushTasks.forEach((key, value) -> value.cancel());
batchGroupIdToIdToRequest.forEach((key, value) -> flushBuffer(key));
for (Map<String, CompletableFuture<U>> idToResponse : batchGroupIdToIdToResponse.values()) {
CompletableFuture.allOf(idToResponse.values().toArray(new CompletableFuture[0]))
.get(60, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Interrupted during BatchBuffer shutdown" + e);
} catch (ExecutionException e) {
System.err.println("Failed during graceful metric publisher shutdown." + e);
} catch (TimeoutException e) {
System.err.println("Timed out during graceful metric publisher shutdown." + e);
} finally {
scheduledExecutor.shutdownNow();
}
}

private synchronized int getCurrentIdAndIncrement(AtomicInteger currentId) {
16lim21 marked this conversation as resolved.
Show resolved Hide resolved
int id = currentId.getAndIncrement();
if (id < 0) {
currentId.set(1);
id = 0;
}
return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Outer map maps a batch group ID (ex. queueUrl, overrideconfig etc.) to a nested map.
* Inner map maps batch id to a request/CompletableFuture response.
* @param <T> the type of an outgoing response
*/
public class BatchingMap<T> implements Map<String, Map<String, T>>{

private final Map<String, Map<String,T>> batchGroupIdToIdToMessage;

public BatchingMap() {
this.batchGroupIdToIdToMessage = new ConcurrentHashMap<>();
}

public Map<String, T> getNestedMap(String destination) {
return batchGroupIdToIdToMessage.computeIfAbsent(destination, k -> new ConcurrentHashMap<>());
}

@Override
public int size() {
return batchGroupIdToIdToMessage.size();
}

/**
* Only empty if every batchGroupId has an empty map since it is possible for a batchGroupId to exist but point to an empty
* map.
*/
@Override
public boolean isEmpty() {
for (Map<String, T> idToMessage : batchGroupIdToIdToMessage.values()) {
if (!idToMessage.isEmpty()) {
return false;
}
}
return true;
}

@Override
public boolean containsKey(Object key) {
return batchGroupIdToIdToMessage.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
throw new UnsupportedOperationException();
}

@Override
public Map<String, T> get(Object key) {
return batchGroupIdToIdToMessage.get(key);
}

@Override
public Map<String, T> put(String key, Map<String, T> value) {
return batchGroupIdToIdToMessage.put(key, value);
}

@Override
public Map<String, T> remove(Object key) {
return batchGroupIdToIdToMessage.remove(key);
}

@Override
public void putAll(Map<? extends String, ? extends Map<String, T>> m) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
for (Map<String, T> idToMessage : batchGroupIdToIdToMessage.values()) {
idToMessage.clear();
}
batchGroupIdToIdToMessage.clear();
}

@Override
public Set<String> keySet() {
return batchGroupIdToIdToMessage.keySet();
}

@Override
public Collection<Map<String, T>> values() {
return batchGroupIdToIdToMessage.values();
}

@Override
public Set<Entry<String, Map<String, T>>> entrySet() {
return batchGroupIdToIdToMessage.entrySet();
}
}