Skip to content

Commit

Permalink
[fix][transaction] avoid too many ServiceUnitNotReadyException for tr…
Browse files Browse the repository at this point in the history
…ansaction buffer handler (apache#14894)

### Motivation

1. Added max concurrent request limitation for transaction buffer client
2. Add the request to the pending request queue after reaching the concurrent request limitation
3. Avoid duplicated lookup cache invalidation
  • Loading branch information
codelipenghui authored and nicklixinyang committed Apr 20, 2022
1 parent 81101ae commit 5da2681
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 109 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -1369,6 +1369,9 @@ transactionBufferSnapshotMaxTransactionCount=1000
# Unit : millisecond
transactionBufferSnapshotMinTimeInMillis=5000

# The max concurrent requests for transaction buffer client, default is 1000
transactionBufferClientMaxConcurrentRequests=1000

### --- Packages management service configuration variables (begin) --- ###

# Enable the packages management service or not
Expand Down
Expand Up @@ -2469,6 +2469,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int transactionBufferSnapshotMinTimeInMillis = 5000;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "The max concurrent requests for transaction buffer client."
)
private int transactionBufferClientMaxConcurrentRequests = 1000;

/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
Expand Down
Expand Up @@ -751,7 +751,8 @@ public void start() throws PulsarServerException {
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer);
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests());

transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
Expand Down
Expand Up @@ -39,8 +39,10 @@ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
this.tbHandler = tbHandler;
}

public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer);
public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
int maxConcurrentRequests) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
maxConcurrentRequests);
return new TransactionBufferClientImpl(handler);
}

Expand Down Expand Up @@ -74,4 +76,14 @@ public CompletableFuture<TxnID> abortTxnOnSubscription(String topic, String subs
public void close() {
tbHandler.close();
}

@Override
public int getAvailableRequestCredits() {
return tbHandler.getAvailableRequestCredits();
}

@Override
public int getPendingRequestsCount() {
return tbHandler.getPendingRequestsCount();
}
}

Large diffs are not rendered by default.

Expand Up @@ -27,12 +27,9 @@
import lombok.Cleanup;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
Expand Down Expand Up @@ -84,7 +81,7 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace(namespace, 10);
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
tbClient = TransactionBufferClientImpl.create(pulsarClient,
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")));
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000);
}

@Override
Expand Down Expand Up @@ -163,19 +160,19 @@ public void testTransactionBufferClientTimeout() throws Exception {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer);
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
CompletableFuture<TxnID> endFuture =
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);

Field field = TransactionBufferHandlerImpl.class.getDeclaredField("pendingRequests");
Field field = TransactionBufferHandlerImpl.class.getDeclaredField("outstandingRequests");
field.setAccessible(true);
ConcurrentSkipListMap<Long, Object> pendingRequests =
ConcurrentSkipListMap<Long, Object> outstandingRequests =
(ConcurrentSkipListMap<Long, Object>) field.get(transactionBufferHandler);

assertEquals(pendingRequests.size(), 1);
assertEquals(outstandingRequests.size(), 1);

Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
if (pendingRequests.size() == 0) {
if (outstandingRequests.size() == 0) {
return true;
}
return false;
Expand Down Expand Up @@ -206,7 +203,7 @@ public void testTransactionBufferChannelUnActive() {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer);
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
try {
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get();
fail();
Expand Down Expand Up @@ -246,8 +243,8 @@ public void testTransactionBufferLookUp() throws Exception {
}

@Test
public void testTransactionBufferHandlerSemaphore() throws Exception {
String topic = "persistent://" + namespace + "/testTransactionBufferHandlerSemaphore";
public void testTransactionBufferRequestCredits() throws Exception {
String topic = "persistent://" + namespace + "/testTransactionBufferRequestCredits";
String subName = "test";

String abortTopic = topic + "_abort_sub";
Expand All @@ -260,11 +257,17 @@ public void testTransactionBufferHandlerSemaphore() throws Exception {
admin.topics().createSubscription(commitTopic, subName, MessageId.earliest);

tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get();

tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get();

tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get();
tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get();

assertEquals(tbClient.getAvailableRequestCredits(), 1000);
}

@Test
public void testTransactionBufferPendingRequests() throws Exception {

}

@Test
Expand All @@ -291,22 +294,4 @@ public void testEndSubNotExist() throws Exception {
tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get();
tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get();
}

private void waitPendingAckInit(String topic, String sub) throws Exception {

boolean exist = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
CompletableFuture<Optional<Topic>> completableFuture = getPulsarServiceList().get(i)
.getBrokerService().getTopics().get(topic);
if (completableFuture != null) {
PersistentSubscription persistentSubscription =
(PersistentSubscription) completableFuture.get().get().getSubscription(sub);
Awaitility.await().untilAsserted(() ->
assertEquals(persistentSubscription.getTransactionPendingAckStats().state, "Ready"));
exist = true;
}
}

assertTrue(exist);
}
}
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;

@Test(groups = "broker")
public class TransactionBufferHandlerImplTest {

@Test
public void testRequestCredits() {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarClient, null, 1000));
doNothing().when(handler).endTxn(any());
for (int i = 0; i < 500; i++) {
handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
}
assertEquals(handler.getAvailableRequestCredits(), 500);
for (int i = 0; i < 500; i++) {
handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
}
assertEquals(handler.getAvailableRequestCredits(), 0);
handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
assertEquals(handler.getPendingRequestsCount(), 1);
handler.onResponse(null);
assertEquals(handler.getAvailableRequestCredits(), 0);
assertEquals(handler.getPendingRequestsCount(), 0);
}

@Test
public void testMinRequestCredits() {
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(null, null, 50));
assertEquals(handler.getAvailableRequestCredits(), 100);
}
}
Expand Up @@ -91,4 +91,8 @@ CompletableFuture<TxnID> abortTxnOnSubscription(String topic,
long lowWaterMark);

void close();

int getAvailableRequestCredits();

int getPendingRequestsCount();
}
Expand Up @@ -72,4 +72,8 @@ CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
* Release resources.
*/
void close();

int getAvailableRequestCredits();

int getPendingRequestsCount();
}

0 comments on commit 5da2681

Please sign in to comment.