Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Transaction buffer stable position and lowWaterMark implementation. #9195

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Jan 12, 2021

Motivation

Transaction buffer stable position and lowWaterMark implementation.

implement

Details view streamnative/community#3

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (yes)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

@sijie sijie requested review from merlimat, codelipenghui and jiazhai and removed request for merlimat January 13, 2021 03:20
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, left some comments please take a look.

@@ -1774,6 +1774,11 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {

if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxPostion of the OpReadEntry might be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it not be null

@@ -1789,6 +1794,11 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
lastEntryInLedger = ledger.getLastAddConfirmed();
}

// can read max position entryId
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment

@@ -84,6 +84,5 @@ message MarkersMessageIdData {


/// --- Transaction marker ---
message TxnCommitMarker {
repeated MarkersMessageIdData message_id = 1;
message TxnMarker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not useful, please remove it from the proto.


@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the in-memory transaction buffer returns latest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in-memory didn't need stable position logical, I don't change it.

@@ -58,6 +63,12 @@ public TopicTransactionBuffer(PersistentTopic topic) {
topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
if (!ongoingTxns.containsKey(txnId)) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LinkedMap will throw NoSuchElementException while the map is empty. And I think the LinkedMap is not a thread-safe container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it not a thread-safe container, i lock all logical what change this map.


private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>();

private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this also need LinkedMap ? Please give comment for these two maps especially the value of the map, I think the first map is the min message position of the transaction and the second map is the max message position of the transaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we handle lowWaterMark, we should get the first entry of the map to compare the txnID with lowWaterMark. And ongoingTxns is ordered, so we should get the first key position become the maxReadPosition.

}
}

void changeMaxReadPosition(TxnID txnID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void changeMaxReadPosition(TxnID txnID) {
void updateMaxReadPosition(TxnID txnID) {

Comment on lines +45 to +49
private volatile PositionImpl maxReadPosition = PositionImpl.latest;

private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>();

private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test coverage? We'd better to add some unit test to make sure the data should be removed correctly to avoid memory lead and make the maxReadPosition is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have add the LowWaterMarkTest. Add the abort and commit MaxReadPosition Test.

.enableTransaction(true)
.build();

Thread.sleep(1000 * 3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to avoid use sleep here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@codelipenghui codelipenghui added this to the 2.8.0 milestone Jan 14, 2021
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

@merlimat
Copy link
Contributor

@congbobo184 @codelipenghui Master seems to still be broken after this change:

AILED: produceAbortTest
org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in org.apache.pulsar.client.impl.TransactionEndToEndTest was not fulfilled within 3 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:864)
	at org.apache.pulsar.client.impl.TransactionEndToEndTest.produceAbortTest(TransactionEndToEndTest.java:220)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
	at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.testng.TestRunner.privateRun(TestRunner.java:764)
	at org.testng.TestRunner.run(TestRunner.java:585)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
	at org.testng.TestNG.runSuites(TestNG.java:1069)
	at org.testng.TestNG.run(TestNG.java:1037)
	at org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:115)
	at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:251)
	at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:77)
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:101)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:81)
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:101)
	... 33 more

@codelipenghui
Copy link
Contributor

Thanks @merlimat, we will take a look soon.

@congbobo184
Copy link
Contributor Author

@merlimat #9229 (comment) have solved this problem.

@congbobo184 @codelipenghui Master seems to still be broken after this change:

AILED: produceAbortTest
org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in org.apache.pulsar.client.impl.TransactionEndToEndTest was not fulfilled within 3 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:864)
	at org.apache.pulsar.client.impl.TransactionEndToEndTest.produceAbortTest(TransactionEndToEndTest.java:220)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
	at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.testng.TestRunner.privateRun(TestRunner.java:764)
	at org.testng.TestRunner.run(TestRunner.java:585)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
	at org.testng.TestNG.runSuites(TestNG.java:1069)
	at org.testng.TestNG.run(TestNG.java:1037)
	at org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:115)
	at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:251)
	at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:77)
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:101)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:81)
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:101)
	... 33 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants