Skip to content

Commit

Permalink
Implementing BatchManager in sdk-core (#2613)
Browse files Browse the repository at this point in the history
* Adding batchBuffer and testing code

* Adding sdk-core tests and fixing request to response mapping

* Updating sdk-core tests with object to mimic batch request entry

* Adding wrapper classes for functions

* Making some changes for thread safety and updating tests

* Refactoring code and how requests are cleared. Simplifying scheduling code as well

* Changing scheduled flush code. Need to test in next commit

* Adding more test cases and scheduling periodically instead of just once

* Refactoring responseMap into a wrapper class

* Fixing issues related to periodic scheduling. Need to test with multithreading

* Using map of currentIds instead of a singular currentId.

* Fixed race conditions related to cancelling scheduled buffer flush

* Resetting cancellableFlush's flags after flush is completed

* Multithreading test works. need to clean up code and recommit

* Found another way to reset cancellableFlush flags. Also cleaning up code

* Updating request batching tests with multi threading tests

* Renamed to batchManager and cleaned up variable names to be more self-explanatory. Also refactored sdk-core tests

* Refactored sdk-core tests

* Fixing checkstyle issues

* Fixing checkstyle issues in the tests as well as refactoring some tests

* Updating and adding SQS batching tests and removing xml.bind

* Using existing Md5Utils for Sqs batch tests

* Naming and style changes to address PR comments

* Refactoring tests to use maps instead of arrays for better request-to-response correlation checking

* Name changes, refactoring code and addressing minor PR comments

* Refactoring SQS integration tests to use identifiable responses

* Combining request and response maps into one BatchingMap

* Renaming generics as RequestT, ResponseT etc.

* Refactoring tests to calculate batchGroupId from request and removing need for requestBufferCopy

* Cleaning up names and methods

* Adding the batchGroupId function that I forgot last time

* Refactoring batchManager to use builder pattern and an overrideConfiguration to manage configuration values

* Removing need for separate executor (just use scheduled executor)

* Adding javadocs and renaming variables/classes to be more intuitive

* Refactoring incrementing currentId to a separate BatchUtils class

* Refactoring currentIds map and scheduledFlushes map into the BatchBuffer

* Removing need to check if flush has executed but need to check if flush was a manual flush. Also adding batchUtils

* Small changes to address github PR comments

* Removing completionService and just using CompletableFuture.supplyAsync

* Adding code to cover exception thrown from the sendAndBatch function

* Adding exception test and fixing batchingExcution locking

* Adding lock object back into batchingExecutionContext for both read and remove request methods

* Addressing minor github PR comments (making classes final, cleaning up names etc.)

* Just renaming cancelScheduledFlushIfNeeded method

Co-authored-by: Michael Li <mchlia@amazon.com>
  • Loading branch information
16lim21 and Michael Li committed Jul 28, 2021
1 parent d08a5bd commit 329566c
Show file tree
Hide file tree
Showing 15 changed files with 1,611 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.batchutilities.BatchManager;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration values for the {@link BatchManager}. All values are optional, and the default values will be used
* if they are not specified.
*/
@SdkPublicApi
public final class BatchOverrideConfiguration implements ToCopyableBuilder<BatchOverrideConfiguration.Builder,
BatchOverrideConfiguration> {

private final Integer maxBatchItems;
private final Duration maxBatchOpenInMs;
private final ScheduledExecutorService scheduledExecutor;

public BatchOverrideConfiguration(Builder builder) {
Validate.notNull(builder.maxBatchItems, "maxBatchItems cannot be null");
this.maxBatchItems = Validate.isPositive(builder.maxBatchItems, "maxBatchItems");
Validate.notNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs cannot be null");
this.maxBatchOpenInMs = Validate.isPositive(builder.maxBatchOpenInMs, "maxBachOpenInMs");
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
}

public static Builder builder() {
return new Builder();
}

/**
* @return the optional maximum number of messages that are batched together in a single request.
*/
public Integer maxBatchItems() {
return maxBatchItems;
}

/**
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
* the same type.
*/
public Duration maxBatchOpenInMs() {
return maxBatchOpenInMs;
}

public ScheduledExecutorService scheduledExecutor() {
return scheduledExecutor;
}

@Override
public Builder toBuilder() {
return new Builder().maxBatchItems(maxBatchItems)
.maxBatchOpenInMs(maxBatchOpenInMs)
.scheduledExecutor(scheduledExecutor);
}

@Override
public String toString() {
return ToString.builder("BatchOverrideConfiguration")
.add("maxBatchItems", maxBatchItems)
.add("maxBatchOpenInMs", maxBatchOpenInMs)
.build();
}

public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchItems;
private Duration maxBatchOpenInMs;
private ScheduledExecutorService scheduledExecutor;

private Builder() {
}

public Builder maxBatchItems(Integer maxBatchItems) {
this.maxBatchItems = maxBatchItems;
return this;
}

public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
this.maxBatchOpenInMs = maxBatchOpenInMs;
return this;
}

public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}

public BatchOverrideConfiguration build() {
return new BatchOverrideConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
* It then sends the batch request and returns a CompletableFuture of the response.
* @param <RequestT> the type of an outgoing request.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
@FunctionalInterface
@SdkProtectedApi
public interface BatchAndSend<RequestT, BatchResponseT> {
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;

@SdkInternalApi
public final class BatchBuffer<RequestT, ResponseT> {
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
private final AtomicInteger numRequests;

/**
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
* response pair is received.
*/
private final AtomicInteger nextId;

/**
* The scheduled flush tasks associated with this batchBuffer.
*/
private ScheduledFlush scheduledFlush;

public BatchBuffer(ScheduledFlush scheduledFlush) {
this.idToBatchContext = new ConcurrentHashMap<>();
this.numRequests = new AtomicInteger(0);
this.nextId = new AtomicInteger(0);
this.scheduledFlush = scheduledFlush;
}

public int size() {
return idToBatchContext.size();
}

public int requestSize() {
return numRequests.get();
}

public boolean hasRequests() {
return numRequests.get() != 0;
}

public boolean hasResponses() {
return !idToBatchContext.isEmpty();
}

public boolean containsKey(String key) {
return idToBatchContext.containsKey(key);
}

public RequestT getRequest(String key) {
return idToBatchContext.get(key).request();
}

public CompletableFuture<ResponseT> getResponse(String key) {
return idToBatchContext.get(key).response();
}

public ScheduledFlush getScheduledFlush() {
return scheduledFlush;
}

public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
numRequests.getAndIncrement();
String id = BatchUtils.getAndIncrementId(nextId);
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
}

public void putScheduledFlush(ScheduledFlush scheduledFlush) {
this.scheduledFlush = scheduledFlush;
}

public void cancelScheduledFlush() {
scheduledFlush.cancel();
}

public void removeRequest(String key) {
if (idToBatchContext.get(key).removeRequest()) {
numRequests.getAndDecrement();
}
}

public BatchingExecutionContext<RequestT, ResponseT> remove(String key) {
return idToBatchContext.remove(key);
}

public Collection<BatchingExecutionContext<RequestT, ResponseT>> values() {
return idToBatchContext.values();
}

public Collection<CompletableFuture<ResponseT>> responses() {
return idToBatchContext.values()
.stream()
.map(BatchingExecutionContext::response)
.collect(Collectors.toList());
}

public Set<Map.Entry<String, BatchingExecutionContext<RequestT, ResponseT>>> entrySet() {
return idToBatchContext.entrySet();
}

public void clear() {
numRequests.set(0);
idToBatchContext.clear();
}

public void forEach(BiConsumer<String, BatchingExecutionContext<RequestT, ResponseT>> action) {
idToBatchContext.forEach(action);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a request and extracts a batchGroupId as determined by the caller.
* TODO: For right now, the batchKey is a String but this may change as needed in the future.
* @param <RequestT> the request.
*/
@FunctionalInterface
@SdkProtectedApi
public interface BatchKeyMapper<RequestT> {
String getBatchKey(RequestT request);
}

0 comments on commit 329566c

Please sign in to comment.