Skip to content

Commit

Permalink
[Transaction] Transaction buffer snapshot implementation. (#9490)
Browse files Browse the repository at this point in the history
## Motivation
Transaction buffer snapshot to recover ongoing and abort transaction.

## implement
snapshot metadata: 
```
public class AbortTxnMetadata {
    long txnIdMostBits;
    long txnIdLeastBits;
    long ledgerId;
    long entryId;
}

public class TransactionBufferSnapshot {
    private String topicName;
    private long maxReadPositionLedgerId;
    private long maxReadPositionEntryId;
    private List<AbortTxnMetadata> aborts;
}
```
- now only sore the max read position and aborts transaction, follow-up will store ongoing txns.
- implement this by a namespace event topic. `__transaction_buffer_snapshot`
- add `transactionBufferSnapshotMaxTransactionCount` and `transactionBufferSnapshotMinTimeInMills` to control taking snapshot
- add `SystemTopicBaseTxnBufferSnapshotService` to create take snapshot `Writer` and `Reader` and cache the same namespace client.
  • Loading branch information
congbobo184 committed Mar 10, 2021
1 parent 1d6aa57 commit 488d384
Show file tree
Hide file tree
Showing 33 changed files with 1,415 additions and 161 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,13 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
transactionCoordinatorEnabled=false
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider

# Transaction buffer take snapshot transaction count
transactionBufferSnapshotMaxTransactionCount=1000

# Transaction buffer take snapshot interval time
# Unit : millisecond
transactionBufferSnapshotMinTimeInMillis=5000

### --- Packages management service configuration variables (begin) --- ###

# Enable the packages management service or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numCacheExecutorThreadPoolSize = 10;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for pulsar broker service."
+ " The executor in thread pool will do transaction recover"
)
private int numTransactionExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors();

@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests")
private int maxConcurrentHttpRequests = 1024;

Expand Down Expand Up @@ -1950,6 +1957,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String transactionBufferProviderClassName =
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Transaction buffer take snapshot transaction count"
)
private int transactionBufferSnapshotMaxTransactionCount = 1000;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Transaction buffer take snapshot min interval time"
)
private int transactionBufferSnapshotMinTimeInMillis = 5000;

/**** --- KeyStore TLS config variables --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -210,6 +212,7 @@ public class PulsarService implements AutoCloseable {
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
private TransactionBufferClient transactionBufferClient;
private ScheduledExecutorService transactionExecutor;
private HashedWheelTimer transactionTimer;

private BrokerInterceptor brokerInterceptor;
Expand All @@ -221,6 +224,7 @@ public class PulsarService implements AutoCloseable {

private MetadataStoreExtended localMetadataStore;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;

private MetadataStoreExtended configurationMetadataStore;
private PulsarResources pulsarResources;
Expand Down Expand Up @@ -360,6 +364,11 @@ public void close() throws PulsarServerException {
adminClient = null;
}

if (transactionBufferSnapshotService != null) {
transactionBufferSnapshotService.close();
transactionBufferSnapshotService = null;
}

if (client != null) {
client.close();
client = null;
Expand Down Expand Up @@ -419,6 +428,7 @@ public void close() throws PulsarServerException {

state = State.Closed;
isClosedCondition.signalAll();

} catch (Exception e) {
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
Expand Down Expand Up @@ -651,6 +661,10 @@ public Boolean get() {

// Register pulsar system namespaces and start transaction meta store service
if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutor = Executors.newScheduledThreadPool(
config.getNumTransactionExecutorThreadPoolSize(),
new DefaultThreadFactory("pulsar-transaction"));
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {

private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;

private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.clients = new ConcurrentHashMap<>();
}

@Override
public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}

private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
}

@Override
public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName,
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
entry.getValue().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic

private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap<>();

private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader>>
private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();

private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
Expand All @@ -74,11 +74,10 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
CompletableFuture<Void> result = new CompletableFuture<>();

createSystemTopicFactoryIfNeeded();
SystemTopicClient systemTopicClient =
namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(),
EventType.TOPIC_POLICY);
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());

CompletableFuture<SystemTopicClient.Writer> writerFuture = systemTopicClient.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture = systemTopicClient.newWriterAsync();
writerFuture.whenComplete((writer, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
Expand Down Expand Up @@ -159,9 +158,8 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa
result.complete(null);
return result;
}
SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory
.createSystemTopic(topicName.getNamespaceObject()
, EventType.TOPIC_POLICY);
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newReaderAsync().thenAccept(r ->
fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result));
return result;
Expand All @@ -177,11 +175,11 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
result.complete(null);
} else {
SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(namespace
, EventType.TOPIC_POLICY);
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
policyCacheInitMap.put(namespace, false);
CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture =
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
systemTopicClient.newReaderAsync();
readerCaches.put(namespace, readerCompletableFuture);
readerCompletableFuture.whenComplete((reader, ex) -> {
Expand All @@ -202,7 +200,8 @@ public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle n
NamespaceName namespace = namespaceBundle.getNamespaceObject();
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = readerCaches.remove(namespace);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
readerCaches.remove(namespace);
if (readerCompletableFuture != null) {
readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
ownedBundlesCountPerNamespace.remove(namespace);
Expand Down Expand Up @@ -237,7 +236,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
});
}

private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture<Void> future) {
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
Expand All @@ -260,7 +259,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture
});
}

private void readMorePolicies(SystemTopicClient.Reader reader) {
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync().whenComplete((msg, ex) -> {
if (ex == null) {
refreshTopicPoliciesCache(msg);
Expand Down Expand Up @@ -304,8 +303,8 @@ private void createSystemTopicFactoryIfNeeded() {
}
}

private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader reader, TopicName topicName,
TopicPolicies policies,
private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> reader,
TopicName topicName, TopicPolicies policies,
CompletableFuture<TopicPolicies> future) {
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.TopicName;

public interface TransactionBufferSnapshotService {

/**
* Create a transaction buffer snapshot writer.
*
* @param topicName {@link TopicName} the topic name
*
* @return {@link CompletableFuture<Writer>} return the future of writer
*/
CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName);

/**
* Create a transaction buffer snapshot reader.
*
* @param topicName {@link TopicName} the topic name
*
* @return {@link CompletableFuture<Writer>} return the future of reader
*/
CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName);

/**
* Remove a topic client from cache.
*
* @param topicName {@link TopicName} the topic name
* @param transactionBufferSystemTopicClient {@link TransactionBufferSystemTopicClient} the topic client
*
*/
void removeClient(TopicName topicName, TransactionBufferSystemTopicClient transactionBufferSystemTopicClient);

/**
* Close transaction buffer snapshot service.
*/
void close() throws Exception;

}
Loading

0 comments on commit 488d384

Please sign in to comment.