Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7 #19035

Merged
merged 4 commits into from
Jan 3, 2023

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Dec 23, 2022

PIP: #16763

Motivation

#16763

Modifications

Implement Filter out all delayed messages and skip them when reading messages from bookies.

The logical changes are mainly in ManagedLedgerImpl and OpReadEntry.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 23, 2022
@coderzc coderzc marked this pull request as draft December 23, 2022 02:15
@coderzc coderzc self-assigned this Dec 25, 2022
@coderzc coderzc added this to the 2.12.0 milestone Dec 25, 2022
@coderzc coderzc marked this pull request as ready for review December 25, 2022 12:59
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
PositionImpl maxPosition, Predicate<PositionImpl> skipCondition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a new method.
I mean, we'd better not change the existing API directly.
This class is annotated with

@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

@coderzc coderzc force-pushed the cursor_read_filter branch 5 times, most recently from 1b042a0 to 3aefb59 Compare December 28, 2022 07:42
@coderzc coderzc added type/feature The PR added a new feature or issue requested a new feature area/broker labels Dec 28, 2022
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change!
I like the brilliant solution for skipping the entries.

I just left some minor comments.

* @param ctx opaque context
* @param maxPosition max position can read
*/
void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a default implementation. Otherwise, we will break the user's existing implementations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean just add default

default void asyncReadEntriesWithSkip()

Comment on lines 79 to 81
if (lastPosition == null || entriesCount != 0) {
lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the returnedEntries is empty and the lastPosition is null, we will get an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

@coderzc coderzc Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to we can only judge entriesCount != 0

Comment on lines 2079 to 2084
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
break;
}
lastValidEntry = entryId;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can try to optimize with only one loop to find the first invalid entry range 😁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

@@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we forget to reset it when recycle this obj

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is good to extend this method because we defined a parameter named context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context already be opReadEntry.ctx passing

Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, please apply @codelipenghui comments.

Comment on lines 79 to 81
if (lastPosition == null || entriesCount != 0) {
lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@codecov-commenter
Copy link

codecov-commenter commented Dec 29, 2022

Codecov Report

Merging #19035 (fadff0b) into master (08591d9) will decrease coverage by 4.06%.
The diff coverage is 19.71%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #19035      +/-   ##
============================================
- Coverage     49.85%   45.79%   -4.07%     
- Complexity     8658    11061    +2403     
============================================
  Files           500      772     +272     
  Lines         54930    74360   +19430     
  Branches       5867     8009    +2142     
============================================
+ Hits          27386    34053    +6667     
- Misses        24464    36518   +12054     
- Partials       3080     3789     +709     
Flag Coverage Δ
unittests 45.79% <19.71%> (-4.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...a/org/apache/bookkeeper/mledger/ManagedCursor.java 46.15% <0.00%> (-39.57%) ⬇️
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 53.78% <0.00%> (+0.01%) ⬆️
...ava/org/apache/bookkeeper/mledger/impl/OpScan.java 0.00% <ø> (ø)
...rg/apache/pulsar/broker/delayed/bucket/Bucket.java 0.00% <0.00%> (ø)
...r/delayed/bucket/BucketDelayedDeliveryTracker.java 0.00% <0.00%> (ø)
.../pulsar/broker/delayed/bucket/ImmutableBucket.java 0.00% <0.00%> (ø)
...g/apache/pulsar/broker/lookup/TopicLookupBase.java 56.89% <ø> (+6.05%) ⬆️
...rg/apache/pulsar/broker/web/PulsarWebResource.java 57.41% <ø> (+1.75%) ⬆️
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 36.70% <44.44%> (+0.02%) ⬆️
...pulsar/broker/admin/impl/PersistentTopicsBase.java 59.47% <72.72%> (+0.75%) ⬆️
... and 331 more

@coderzc coderzc force-pushed the cursor_read_filter branch 2 times, most recently from 4197fb6 to dc17954 Compare December 29, 2022 09:40
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@coderzc

Error:  Tests run: 556, Failures: 1, Errors: 0, Skipped: 520, Time elapsed: 46.303 s <<< FAILURE! - in org.apache.bookkeeper.mledger.impl.ManagedCursorTest
[1957](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1958)
  Error:  testAsyncReadWithMaxSizeByte(org.apache.bookkeeper.mledger.impl.ManagedCursorTest)  Time elapsed: 20.008 s  <<< FAILURE!
[1958](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1959)
  org.testng.internal.thread.ThreadTimeoutException: Method org.apache.bookkeeper.mledger.impl.ManagedCursorTest.testAsyncReadWithMaxSizeByte() didn't finish within the time-out 20000
[1959](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1960)
  	at java.base@17.0.5/jdk.internal.misc.Unsafe.park(Native Method)
[1960](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1961)
  	at java.base@17.0.5/java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)
[1961](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1962)
  	at java.base@17.0.5/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:715)
[1962](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1963)
  	at java.base@17.0.5/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
[1963](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1964)
  	at java.base@17.0.5/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230)
[1964](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1965)
  	at app//org.apache.bookkeeper.mledger.impl.ManagedCursorTest.readAndCheck(ManagedCursorTest.java:572)
[1965](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1966)
  	at app//org.apache.bookkeeper.mledger.impl.ManagedCursorTest.testAsyncReadWithMaxSizeByte(ManagedCursorTest.java:549)
[1966](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1967)
  	at java.base@17.0.5/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[1967](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1968)
  	at java.base@17.0.5/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
[1968](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1969)
  	at java.base@17.0.5/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[1969](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1970)
  	at java.base@17.0.5/java.lang.reflect.Method.invoke(Method.java:568)
[1970](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1971)
  	at app//org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
[1971](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1972)
  	at app//org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
[1972](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1973)
  	at app//org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
[1973](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1974)
  	at app//org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
[1974](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1975)
  	at java.base@17.0.5/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[1975](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1976)
  	at java.base@17.0.5/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[1976](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1977)
  	at java.base@17.0.5/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[1977](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1978)
  	at java.base@17.0.5/java.lang.Thread.run(Thread.java:833)

I noticed this test failed multiple times. Maybe related to the changes of this PR.

@coderzc coderzc force-pushed the cursor_read_filter branch 2 times, most recently from b49f41f to c57f05a Compare December 30, 2022 10:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants