Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ private void sendModackOperations(
// Send modacks
int pendingOperations = 0;
for (ModackRequestData modackRequestData : modackRequestDataList) {
List<String> ackIdsInRequest = new ArrayList<>();
for (List<AckRequestData> ackRequestDataInRequestList :
Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
List<String> ackIdsInRequest = new ArrayList<>();
for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
ackIdsInRequest.add(ackRequestData.getAckId());
if (ackRequestData.hasMessageFuture()) {
Expand Down Expand Up @@ -511,9 +511,10 @@ public void onFailure(Throwable t) {
// Remove from our pending operations
ackOperationsWaiter.incrementPendingCount(-1);

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);

if (!getExactlyOnceDeliveryEnabled()) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
return;
}

Expand Down Expand Up @@ -578,9 +579,6 @@ public void run() {
currentBackoffMillis,
TimeUnit.MILLISECONDS);
}

Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.core.Distribution;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
Expand Down Expand Up @@ -71,8 +72,8 @@ public class StreamingSubscriberConnectionTest {
"TRANSIENT_FAILURE_SERVICE_UNAVAILABLE";
private static final String PERMANENT_FAILURE_OTHER = "I_DO_NOT_MATCH_ANY_KNOWN_ERRORS";

private static int MOCK_ACK_EXTENSION_DEFAULT = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(10);
private static int MOCK_ACK_EXTENSION_DEFAULT_SECONDS = 10;
private static Duration ACK_EXPIRATION_PADDING_DEFAULT_DURATION = Duration.ofSeconds(10);
private static int MAX_DURATION_PER_ACK_EXTENSION_DEFAULT_SECONDS = 10;

@Before
Expand Down Expand Up @@ -105,7 +106,8 @@ public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() {

ModackRequestData modackRequestDataSuccess =
new ModackRequestData(
MOCK_ACK_EXTENSION_DEFAULT, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
MOCK_ACK_EXTENSION_DEFAULT_SECONDS,
AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build());
modackRequestDataList.add(modackRequestDataSuccess);

ModackRequestData modackRequestDataNack =
Expand Down Expand Up @@ -138,7 +140,8 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
Map<String, String> errorInfoMetadataMapInitialRequest = new HashMap<String, String>();
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();

ModackRequestData modackRequestDataDefault = new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT);
ModackRequestData modackRequestDataDefault =
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS);

// Nack SUCCESS
SettableApiFuture<AckResponse> messageFutureSuccessExpected = SettableApiFuture.create();
Expand Down Expand Up @@ -218,14 +221,14 @@ public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() {
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsInitialRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

ModifyAckDeadlineRequest modifyAckDeadlineRequestRetry =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(ackIdsRetryRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();

// Set mock grpc responses
Expand Down Expand Up @@ -450,6 +453,53 @@ public void testSetFailureResponseOutstandingMessages() {
});
}

@Test
public void testMaxPerRequestChanges() {
// Setup mocks
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();
List<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();

int numAckIds = 3000;
int numMaxPerRequestChanges = 1000;

List<String> mockAckIds = new ArrayList<String>();

for (int i = 0; i < numAckIds; i++) {
String mockAckId = "MOCK-ACK-ID-" + i;
mockAckIds.add(mockAckId);
ackRequestDataList.add(AckRequestData.newBuilder(mockAckId).build());
}

modackRequestDataList.add(
new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT_SECONDS, ackRequestDataList));

// Instantiate class and run operation(s)
StreamingSubscriberConnection streamingSubscriberConnection =
getStreamingSubscriberConnection(false);
streamingSubscriberConnection.sendAckOperations(ackRequestDataList);
streamingSubscriberConnection.sendModackOperations(modackRequestDataList);

// Assert expected behavior
for (List<String> mockAckIdsInRequest : Lists.partition(mockAckIds, numMaxPerRequestChanges)) {
AcknowledgeRequest expectedAcknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.build();
verify(mockSubscriberStub.acknowledgeCallable(), times(1))
.futureCall(expectedAcknowledgeRequest);

ModifyAckDeadlineRequest expectedModifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.addAllAckIds(mockAckIdsInRequest)
.setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT_SECONDS)
.build();
verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1))
.futureCall(expectedModifyAckDeadlineRequest);
}
}

private StreamingSubscriberConnection getStreamingSubscriberConnection(
boolean exactlyOnceDeliveryEnabled) {
StreamingSubscriberConnection streamingSubscriberConnection =
Expand All @@ -466,7 +516,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
StreamingSubscriberConnection.Builder builder) {
return builder
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT_DURATION)
.setAckLatencyDistribution(mock(Distribution.class))
.setSubscriberStub(mockSubscriberStub)
.setChannelAffinity(0)
Expand Down