Skip to content

Commit

Permalink
[Transaction] Transaction log (#8658)
Browse files Browse the repository at this point in the history
Master Issue: [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support)
### Motivation
Implemention of [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support), transaction log

### Modifications
Add TransactionMetadataStore implemention by managed ledger
### Verifying this change
Add the tests for it
  • Loading branch information
congbobo184 authored Nov 22, 2020
1 parent 642461c commit c25bab8
Show file tree
Hide file tree
Showing 30 changed files with 2,687 additions and 85 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/*.graffle</exclude>
<exclude>**/*.hgrm</exclude>
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
Expand Down Expand Up @@ -1227,6 +1228,7 @@ flexible messaging model and an intuitive client API.</description>
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;

import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;

import org.apache.pulsar.common.api.proto.PulsarApi.TxnAction;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;

import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;

import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -36,7 +41,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -135,12 +140,12 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
}
}

public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId) {
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.newTransaction();
return store.newTransaction(timeoutInMills);
}

public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
Expand Down Expand Up @@ -179,14 +184,14 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus,
return store.updateTxnStatus(txnId, newStatus, expectedStatus);
}

public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
TxnStatus newStatus;
switch (txnAction) {
case PulsarApi.TxnAction.COMMIT_VALUE:
case TxnAction.COMMIT_VALUE:
newStatus = TxnStatus.COMMITTING;
break;
case PulsarApi.TxnAction.ABORT_VALUE:
case TxnAction.ABORT_VALUE:
newStatus = TxnStatus.ABORTING;
break;
default:
Expand All @@ -210,7 +215,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<P
}

private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction,
List<PulsarApi.MessageIdData> messageIdDataList) {
List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
Expand All @@ -221,10 +226,10 @@ private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAc

txnMeta.ackedPartitions().forEach(tbSub -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
} else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
} else {
Expand All @@ -234,20 +239,20 @@ private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAc
});

List<MessageId> messageIdList = new ArrayList<>();
for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
for (MessageIdData messageIdData : messageIdDataList) {
messageIdList.add(new MessageIdImpl(
messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
messageIdData.recycle();
}

txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
TopicName.get(partition).getPartitionIndex()).collect(Collectors.toList()));
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
} else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.netty.util.concurrent.Promise;

import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
Expand Down Expand Up @@ -129,7 +128,7 @@
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1669,7 +1668,7 @@ protected void handleNewTxn(CommandNewTxn command) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
}
TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId)
service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds())
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1827,13 +1826,9 @@ protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
command.getRequestId(), remoteAddress, txnID);
}
List<TransactionSubscription> subscriptionList = command.getSubscriptionList().stream()
.map(subscription -> TransactionSubscription.builder()
.topic(subscription.getTopic())
.subscription(subscription.getSubscription())
.build())
.collect(Collectors.toList());
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)

service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionList()))
.whenComplete(((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;

/**
* The metadata for the transaction in the transaction buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.transaction.buffer.exceptions;

import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;

/**
* Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand All @@ -41,7 +41,7 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;

/**
* The in-memory implementation of {@link TransactionBuffer}.
Expand Down Expand Up @@ -280,7 +280,7 @@ public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(
}

@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, List<PulsarApi.MessageIdData> messageIdDataList) {
public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
try {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
Expand All @@ -307,7 +307,7 @@ private void addTxnToTxnIdex(TxnID txnId, long committedAtLedgerId) {
}

@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, List<PulsarApi.MessageIdData> sendMessageIdList) {
public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> sendMessageIdList) {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -71,9 +71,9 @@ public void testNewTransaction() throws ExecutionException, InterruptedException
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1)).get();
TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2)).get();
TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get();
TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get();
Assert.assertEquals(0, txnID0.getMostSigBits());
Assert.assertEquals(1, txnID1.getMostSigBits());
Assert.assertEquals(2, txnID2.getMostSigBits());
Expand All @@ -88,7 +88,7 @@ public void testAddProducedPartitionToTxn() throws ExecutionException, Interrupt
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
List<String> partitions = new ArrayList<>();
partitions.add("ptn-0");
partitions.add("ptn-1");
Expand All @@ -105,7 +105,7 @@ public void testAddAckedPartitionToTxn() throws ExecutionException, InterruptedE
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
List<TransactionSubscription> partitions = new ArrayList<>();
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down
23 changes: 23 additions & 0 deletions pulsar-transaction/coordinator/generate_protobuf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


PROTOC=${PROTOC:-protoc}
${PROTOC} --java_out=pulsar-transaction/coordinator/src/main/java pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
42 changes: 42 additions & 0 deletions pulsar-transaction/coordinator/generate_protobuf_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Fail script in case of errors
set -e

ROOT_DIR=$(git rev-parse --show-toplevel)
COMMON_DIR=$ROOT_DIR/
cd $COMMON_DIR

BUILD_IMAGE_NAME="${BUILD_IMAGE_NAME:-apachepulsar/pulsar-build}"
BUILD_IMAGE_VERSION="${BUILD_IMAGE_VERSION:-ubuntu-16.04}"

IMAGE="$BUILD_IMAGE_NAME:$BUILD_IMAGE_VERSION"

echo $IMAGE

# Force to pull image in case it was updated
docker pull $IMAGE

WORKDIR=/workdir
docker run -i \
-v ${COMMON_DIR}:${WORKDIR} $IMAGE \
bash -c "cd ${WORKDIR}; PROTOC=/pulsar/protobuf/src/protoc ./pulsar-transaction/coordinator/generate_protobuf.sh"

Loading

0 comments on commit c25bab8

Please sign in to comment.