Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction log implemention #5570

Open
wants to merge 40 commits into
base: master
from

Conversation

@congbobo184
Copy link
Contributor

congbobo184 commented Nov 6, 2019

Master Issue: PIP31

Motivation

Implemention of PIP31, transaction log

Modifications

Add TransactionMetadataStore implemention by managed ledger

Verifying this change

Add the tests for it

congbobo added 10 commits Oct 28, 2019
…lement

# Conflicts:
#	pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@congbobo184 congbobo184 changed the title Transaction log implement Transaction log implemention Nov 6, 2019
congbobo added 4 commits Nov 7, 2019
optional TxnStatus new_status = 5;
repeated string partitions = 6;
repeated Subscription subscriptions = 7;
optional uint64 txn_timeout_ms = 8;

This comment has been minimized.

Copy link
@sijie

sijie Nov 11, 2019

Member

txn_timeout_ms, txn_start_time and txn_last_modification_time are already in the transaction metadata entry. I don't think we need to prefix them with txn_.

*/
@Data
public class TxnSubscription {
private String topic;

This comment has been minimized.

Copy link
@sijie

sijie Nov 11, 2019

Member

make the variables final?



/**
* The provider that offers topic-base-memory implementation of {@link TransactionMetadataStore}.

This comment has been minimized.

Copy link
@sijie

sijie Nov 11, 2019

Member

what does 'topic-base-memory' mean here?

this.readOnlyCursor = managedLedgerFactory
.openReadOnlyCursor(tcId,
PositionImpl.earliest, new ManagedLedgerConfig());
CountDownLatch countDownLatch = new CountDownLatch(1);

This comment has been minimized.

Copy link
@sijie

sijie Nov 11, 2019

Member

I don't think it is a good idea to have a loop in the constructor and we shouldn't use synchronous operations. since all Pulsar internals are asynchronous.

List<Entry> entries = readOnlyCursor.readEntries(2);
countDownLatch.await();
countDownLatch = new CountDownLatch(1);
new Thread(new ReadOnce(countDownLatch,

This comment has been minimized.

Copy link
@sijie

sijie Nov 11, 2019

Member

we shouldn't start a thread for each entry it receives.

congbobo added 2 commits Nov 12, 2019
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Nov 13, 2019

@sijie please review again. Thanks!

enum TxnStatus {
OPEN = 0;
COMMITTING = 1;
COMMITTED = 2;
ABORTING = 3;
ABORTED = 4;
}

message TransactionMetadataEntry {

enum TransactionMetadataOp {
NEW = 0;
ADD_PARTITION = 1;
ADD_SUBSCRIPTION = 2;
UPDATE = 3;
}

optional TransactionMetadataOp metadata_op = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnStatus expected_status = 4;
optional TxnStatus new_status = 5;
repeated string partitions = 6;
repeated Subscription subscriptions = 7;
optional uint64 timeout_ms = 8;
optional uint64 start_time = 9;
optional uint64 last_modification_time = 10;
}

Comment on lines +648 to +676

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

I think PulsarApi.proto only for wire protocol, is it better to move TransactionMetadataEntry to a split proto file? @sijie please help confirm.

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
Comment on lines 48 to 52

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

is it better to create a proto file in pulsar-transaction-common module? so that we don't need to add pulsar common dependency.

* @return a future represents the result of this operation.
* it returns {@link TxnMeta} of the given transaction.
*/
CompletableFuture<TxnMeta> getTxnMeta(TxnID txnid);
CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID);

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

We'd better use getTxnMetaAsync here, since this is an asynchronous method, and the java doc also need to describe this is a asynchronous method. please check others.

/**
* Update the {@link State}.
*
* @param state the transaction metadata store state {@link State}
* @return a future represents the result of this operation
*/
CompletableFuture<Void> updateMetadataStoreState(State state);

/**
* Set the txn sequenceId.
*
* @param sequenceId the transaction sequenceId for new transaction Id
* @return a future represents the result of this operation
*/
CompletableFuture<Void> setTxnSequenceId(long sequenceId);

Comment on lines 135 to 150

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

Shall we need to expose these method in TransactionMetadataStore interface?

* @throws InvalidTxnStatusException if the transaction is not in
* {@link TxnStatus#OPEN}
*/
TxnMeta addTxnSubscription(List<TxnSubscription> subscriptions)

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

Please use addAckedPartitions

}

private static List<TxnSubscription> subscriptionToTxnSubscription(List<Subscription> subscriptions) {
List<TxnSubscription> txnSubscriptions = new ArrayList<>(subscriptions.size());

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

Please considering reuse memory, this call lead more memory workload.


private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);

private ConcurrentMap<TxnID, TxnMeta> txnMetaMap = new ConcurrentHashMap<>();

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

It's better to maintain cache in TransactionMetastore

transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING);
readOnlyCursor
.asyncReadEntries(100,
new ReaderReadEntriesCallback(countDownLatch, txnMetaMap, sequenceId,

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

every 100 messages will create a ReaderReadEntriesCallback instance, i think here can refer to approach like consumer read messages or PulsarRecordCursor in Pulsar SQL.

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

Also, expose configs for each read batch size.

}
currentCountDownLatch.countDown();
if (!readOnlyCursor.hasMoreEntries()) {
originalCountDownLatch.await();

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

read only cursor can be closed while have no available messages?

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
try {
if (readOnlyCursor.hasMoreEntries()) {

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 14, 2019

Contributor

should check meta store state here

default CompletableFuture<TxnStatus> getTxnStatus(TxnID txnID) {
return getTxnMetaAsync(txnID).thenApply(TxnMeta::status);
}
Comment on lines 52 to 54

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 20, 2019

Contributor

It's better to rename to getTxnStatusAsync since all async method end with Async

*
* @param txnid transaction id
* @param timeOut the timeout time

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 20, 2019

Contributor
Suggested change
* @param timeOut the timeout time
* @param timeOut the timeout duration of the transaction in mills
/**
* Exception is thrown when update the state incorrect in transaction store.
*/
public class TxnStoreStateUpdateException extends CoordinatorException {

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Nov 20, 2019

Contributor

Do we need this exception? i think the meta store should handle the state change internal

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 3, 2019

run Integration Tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 3, 2019

run java8 tests

* @param numberOfEntriesToRead the number of reading entry
* @param callback the callback to executing when reading entry async finished
*/
void readAsync(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx);

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Dec 4, 2019

Contributor

Both write read and replay should use TransactionMetadataEntry not Entry or some other types.

/**
* Exception is thrown when a operation of transaction is executed in a error transaction metadata store state.
*/
public class TransactionMetadataStoreStateException extends CoordinatorException {

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Dec 4, 2019

Contributor

It is TransactionStateException not MetadataStoreStateException, right?

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 5, 2019

run java8 tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 5, 2019

run cpp tests

1 similar comment
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 5, 2019

run cpp tests

congbobo added 2 commits Dec 5, 2019
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 5, 2019

run java8 tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 9, 2019

run cpp tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 9, 2019

run java8 tests

congbobo added 4 commits Dec 9, 2019
…lement

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
#	pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
#	pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java
#	pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
#	pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
#	pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 10, 2019

run java8 tests

1 similar comment
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 10, 2019

run java8 tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 10, 2019

congbobo added 2 commits Dec 11, 2019
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 11, 2019

run java8 tests

1 similar comment
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 12, 2019

run java8 tests

@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 13, 2019

1 similar comment
@congbobo184

This comment has been minimized.

Copy link
Contributor Author

congbobo184 commented Dec 19, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.