-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow consumer to seek to message id from within Pulsar client #848
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. just minor comments.
@@ -1910,6 +1910,7 @@ boolean isValidPosition(PositionImpl position) { | |||
log.debug("IsValid position: {} -- last: {}", position, last); | |||
} | |||
|
|||
// if (position.equals(PositionImpl.earliest || position.equals(PositionImpl))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ups, forgot those in there
Consumer consumer = consumerFuture.getNow(null); | ||
Subscription subscription = consumer.getSubscription(); | ||
MessageIdData msgIdData = seek.getMessageId(); | ||
// MessageIdImpl msgId = MessageIdImpl.earliest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, will cleanup
try { | ||
seekAsync(messageId).get(); | ||
} catch (ExecutionException | InterruptedException e) { | ||
throw new PulsarClientException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.getCause()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's interrupted we might not have the "cause". Though I can refactor and have 2 sections here.
}).exceptionally(ex -> { | ||
log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex); | ||
ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.UnknownError, | ||
"Error when resetting subscription: " + ex.getMessage())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ex.getCause().getMessage()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
Currently we have the option to "reset" a subscription to a specific timestamp or message id through the Pulsar admin API.
In some cases, it might be useful to allow the same operation to be done from the
Consumer
API. One such example is related to the Pulsar-Kafka wrapper, in which we need to emulate operations likeseekToBeginning()
,seekToEnd()
orseekTo()
.Modifications
Added new request command in wire protocol to issue a "seek" command to the broker, relative to the subscription associated with the current consumer.