-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add streaming dispatcher. #9056
Add streaming dispatcher. #9056
Conversation
Add debug print for ut.
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
@@ -1915,6 +1920,21 @@ void notifyCursors() { | |||
} | |||
} | |||
|
|||
void notifyWaitingEntryCallBack() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void notifyWaitingEntryCallBack() { | |
void notifyWaitingEntryCallBacks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
category = CATEGORY_SERVER, | ||
doc = "Whether to use streaming read dispatcher." | ||
) | ||
private boolean streamingDispatch = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mark this feature as a preview feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the doc.
@@ -667,6 +667,12 @@ | |||
) | |||
private boolean preciseDispatcherFlowControl = false; | |||
|
|||
@FieldContext( | |||
category = CATEGORY_SERVER, | |||
doc = "Whether to use streaming read dispatcher." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to add more documentation about this feature.
public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers | ||
implements StreamingDispatcher { | ||
|
||
private StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl) cursor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MarvinCai Overall looks good to me. Can you add more tests to the newly introduced classes?
*/ | ||
@Slf4j | ||
@RequiredArgsConstructor | ||
public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, WaitingEntryCallBack { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MarvinCai Can you write a test case for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's StreamingEntryReaderTests class, it's folded by github.
@codelipenghui @hangc0276 Can you review this pull request? |
@codelipenghui Can you review this? |
@MarvinCai Can you rebase this pull request? @codelipenghui Can you review it? |
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@sijie Rebased to newer version of master and resolved conflict. |
/pulsarbot run-failure-checks |
Related to #3804 Trying to streamline the dispatcher's read requests to manager ledger instead of micro batch. Created a StreamingEntryReader that can streamline read request to managed ledger. Created StreamingDispatcher interface with necessary method to interact with StreamingEntryReader. Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that make use of StreamingEntryReader to read entries from managed ledger. Add config to use streaming dispatcher. (cherry picked from commit 8cfaf48)
Related to #3804
Motivation
Trying to streamline the dispatcher's read requests to manager ledger instead of micro batch.
Modifications
Created a StreamingEntryReader that can streamline read request to managed ledger.
Created StreamingDispatcher interface with necessary method to interact with StreamingEntryReader.
Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that make use of StreamingEntryReader to read entries from managed ledger.
Add config to use streaming dispatcher.
Verifying this change
(Please pick either of the following options)
This change added tests and can be verified as follows:
Add unit tests.
Does this pull request potentially affect one of the following parts:
Documentation