-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction][buffer] Add basic operation of transaction #4738
Conversation
@sijie PTAL |
import org.apache.bookkeeper.mledger.Position; | ||
import org.apache.pulsar.transaction.impl.common.TxnID; | ||
|
||
public interface TransactionCursor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment for this interface.
also please add comments for the @param
in following methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your reminder. I will add later.
|
||
@Override | ||
public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return CompletableFuture.failedFuture(new UnsupportedOperationException());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
@@ -99,6 +99,8 @@ | |||
long committedAtLedgerId, | |||
long committedAtEntryId); | |||
|
|||
CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId, ByteBuf marker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why do you need marker
in commitTxn
and abortTxn
. The caller of this interface doesn't provide the marker. The marker should be part of the implementation not the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the marker in data ledgers is different from the transaction log?
|
||
@Override | ||
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) { | ||
CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need additional getFuture here.
it should just be :
return txnCursor.getTxnMeta(txnID, false);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
} | ||
|
||
@Override | ||
public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that sequenceId is not used in publishMessage. sequenceId
should be used for de-duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. I should add it to TxnCtx.
ByteBuf marker) { | ||
CompletableFuture<Void> commitFuture = new CompletableFuture<>(); | ||
|
||
publishMessage(marker, new TxnCtx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The marker should be generated within this implementation.
|
||
CompletableFuture<Void> abortFuture = new CompletableFuture<>(); | ||
|
||
publishMessage(marker, new TxnCtx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
marker should be prepared in this implementation.
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException; | ||
import org.apache.pulsar.transaction.impl.common.TxnID; | ||
|
||
public class TransactionCursorImpl implements TransactionCursor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the transaction cursor is used for persistent transaction buffer. but you are using an in-memory implementation of transaction metadata. this doesn't seem to be right.
@Override | ||
void close(); | ||
|
||
CompletableFuture<Void> closeBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeAsync()?
@@ -125,7 +123,5 @@ | |||
/** | |||
* {@inheritDoc} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the comment. since it doesn't inherit from AutoCloseable
anymore.
* @param ledgerId the transaction committed ledger id | ||
* @return | ||
*/ | ||
CompletableFuture<Set<TxnID>> getRemoveTxns(long ledgerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAllTxnsCommitedAtLedger ?
* @param ledgerId the remove transaction id | ||
* @return | ||
*/ | ||
CompletableFuture<Void> removeCommittedLedger(long ledgerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removeTxnsCommittedAtLedger
CompletableFuture<Set<TxnID>> getRemoveTxns(long ledgerId); | ||
|
||
/** | ||
* Remove transaction from index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the comments please
} | ||
|
||
|
||
dataLedgers.forEach(dataLedger -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
purgeFuture need to wait all dataLedgers are deleted. but the implementation here doesn't guarantee it.
private CompletableFuture<Void> deleteTxn(TxnID txnID) { | ||
CompletableFuture<Void> deleteFuture = new CompletableFuture<>(); | ||
|
||
txnCursor.getTxnMeta(txnID, false).thenCompose(meta -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return txnCursor.getTxnMeta(txnID, false)
.thenCompose(meta -> meta.readEntries(...))
.thenCompose(positions -> {
...
});
Marker commitMarker = Marker.builder().txnID(txnID).status(TxnStatus.COMMITTED).build(); | ||
ByteBuf marker = Unpooled.wrappedBuffer(commitMarker.serialize()); | ||
|
||
txnCursor.getTxnMeta(txnID, false).thenCompose(meta -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return txnCursor.getTxnMeta(txnID, false)
.thenCompose(meta -> publishMarker())
.thenCompose(position -> txnCursor.commitTxn(...));
|
||
txnCursor.getTxnMeta(txnID, false).thenCompose(meta -> { | ||
meta.readEntries(meta.numEntries(), -1L).thenCompose(longPositionSortedMap -> { | ||
longPositionSortedMap.values().forEach(position -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<Void> asyncMarkDelete(position);
List<CompletableFuture<Void>> deleteFutures = longPositionSortedMap.values()
.stream()
.map(position -> asyncMarkDelete())
.collect(Collectors.toList());
return CompletableFuture.allOf(deleteFutures).thenApply(ignored -> null);
public CompletableFuture<List<TransactionEntry>> readNext(int numEntries) { | ||
CompletableFuture<List<TransactionEntry>> readFuture = new CompletableFuture<>(); | ||
|
||
meta.readEntries(numEntries, currentSeuquenceId).thenCompose(entries -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return meta.readEntries(numEntries, currentSequenceId)
.thenCompose(entries -> readEntry(entries);
meta.readEntries(...).whenComplete((entries, cause) -> {
if (null != cause) {
///
} else {
....
}
});
@@ -123,9 +121,9 @@ | |||
CompletableFuture<Void> purgeTxns(List<Long> dataLedgers); | |||
|
|||
/** | |||
* {@inheritDoc} | |||
* Close the buffer asynchronous. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Close the buffer asynchronous. | |
* Close the buffer asynchronously. |
import org.apache.pulsar.transaction.impl.common.TxnID; | ||
|
||
/** | ||
* The transaction Cursor maintained the index of all transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The transaction Cursor maintained the index of all transaction. | |
* The transaction Cursor maintains the index of all transactions. |
package org.apache.pulsar.transaction.buffer.exceptions; | ||
|
||
/** | ||
* Exception when get a non exist committed ledger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception is thrown when no transactions found committed at a given ledger.
|
||
|
||
@Override | ||
public String getProducerName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The producer name should be transaction id. Because pulsar is using producer name for de-duplications. You need to guarantee the messages produced by a transaction is not duplicated when we produce them to the transaction buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. yes.
return FutureUtil.waitForAll(removeFutures); | ||
} | ||
|
||
private CompletableFuture<Void> deleteLedger(long dataledger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should call it deleteLedger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about cleanTxnsOnLedger()
?
public CompletableFuture<Void> removeTxnsCommittedAtLedger(long ledgerId) { | ||
|
||
synchronized (committedTxnIndex) { | ||
Set<TxnID> txnIDS = committedTxnIndex.remove(ledgerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
txnIDs can be null
if (startSequenceId != -1L) { | ||
readEntries = entries.tailMap(startSequenceId); | ||
} | ||
if (startSequenceId == -2L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define constants for -1L
and -2L
} | ||
|
||
if (num != 0) { | ||
result.put(-2L, PositionImpl.earliest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this used for?
} | ||
|
||
@Override | ||
public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to synchronize the method when you modifying the txnStatus
return abortFuture; | ||
} | ||
|
||
this.txnStatus = TxnStatus.ABORTED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronize accessing txnStatus
@sijie This pr is ready for review. Please take a look when you have time. Thanks. |
...fer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...fer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...fer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...fer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
Show resolved
Hide resolved
...fer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
Outdated
Show resolved
Hide resolved
run java8 tests |
--- *Modifications* Add primary operation of transaction. Keep all actions persistently. Describe the modifications you've done. - add commit operation - add abort operation - add openreader operation *TODO* - add purge txn operation - add tests
--- (If this PR fixes a github issue, please add `Fixes #<xyz>`) Fixes #<xyz> (or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue) Master Issue: #<xyz> *Motivation* Describe here the context, and why you're making that change. What is the problem you're trying to solve. *Modifications* Describe the modifications you've done. *Verify this change* Please pick either of following options. - This change is a trivial rework / code cleanup without any test coverage. - This change is already covered by existing tests, such as *(please describe tests)*. - This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment* - *Extended integration test for recovery after broker failure*
run java8 tests |
run integration tests |
run java8 tests |
3 similar comments
run java8 tests |
run java8 tests |
run java8 tests |
run cpp tests |
run java8 tests |
4 similar comments
run java8 tests |
run java8 tests |
run java8 tests |
run java8 tests |
ci error
|
run java8 tests |
Modifications
Add primary operation of transaction. Keep all actions persistently. (except the index persistent, the persistent logic will add in next pull request)
Describe the modifications you've done.