Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - support seek() on Reader #4031

Merged
merged 2 commits into from
Apr 23, 2019

Conversation

lovelle
Copy link
Contributor

@lovelle lovelle commented Apr 12, 2019

Motivation

Trying to fix #3976

According to what was discussed in pull #3983 it would be an acceptable solution
to add seek() command to Reader in order to reset a non durable cursor after
Reader instance was built.

Modifications

  • Bugfix reset() by timestamp on a non-durable consumer, previously the
    cached cursor was not present, therefore the state set by reset() was missed
    resulting in a reset() at the beginning of the cursor instead of a reset()
    at the expected position.
  • Copy seek() commands to Reader interface from Consumer interface.
  • Fix inconsistency with lastDequeuedMessage field after seek() command was
    performed successfully.
  • Fix consumer discarding messages on receive (after seek() command) due to
    messages being present on acknowledge grouping tacker.

Verifying this change

  • Make sure that the change passes the CI checks.

@ConcurrencyPractitioner
Copy link
Contributor

ConcurrencyPractitioner commented Apr 12, 2019

@lovelle It looks good!

@ConcurrencyPractitioner
Copy link
Contributor

BTW, I don't know if you have considered other test scenarios. I think your test is only calling a seek() to the beginning of a sequence of messages you sent right (i.e. equivalent to seek(MessageId.EARLIEST)). Have you ever tried seeking for example to the middle of the sequence of messages and saw what happened?

I'm asking this because in my test, only seek(firstPublishTimeStamp) works, otherwise, as you saw in my testoutput, despite the fact that seek appeared to be successful, we still started processing from my-message-0.

@lovelle lovelle force-pushed the feature/support_seek_on_reader_impl branch from f333d36 to 80db9aa Compare April 12, 2019 18:19
@lovelle
Copy link
Contributor Author

lovelle commented Apr 12, 2019

@ConcurrencyPractitioner great suggestion! please take a look now, I've just add more tests according to what you said, please let me know what you think! Thanks 👍

Added test seeks in the middle of the sequence of messages using MessageId and works great !

@lovelle lovelle force-pushed the feature/support_seek_on_reader_impl branch 3 times, most recently from cb61dbe to 5f157a2 Compare April 12, 2019 19:51
@Test
public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
final int numOfMessage = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you numOfMessage as an argument to this test and try with edge cases like
numOfMessage = [1, 9, 100]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By this you mean having a generic method doing all the asserts and receiving by arguments all the parameters?

* the individual partitions.
*
* @param messageId the message id where to reposition the reader
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the operation be renamed as reset instead of seek. I mean I would assume seek to just find the message id but not change the state of subscription.

Copy link
Contributor Author

@lovelle lovelle Apr 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm I don't agree here, imho keeping seek() would be easier for the user understanding because of already existing seek on Consumer.

Also, what I think that is great of naming it seek() is that it behaves exactly the same as seeking on a file e.g. Linux which a lot of people already know what is meant for.

public void flushAndClean() {
flush();
lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
pendingIndividualAcks.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't flush empty out pendingIndividualAcks

https://github.com/apache/pulsar/blob/5f157a2a82eff0202a9fc0920bad67f5aabcde87/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L170

If yes then I would not create a new function - I would just add lastCumulativeAck = (MessageIdImpl) MessageId.earliest; to the flush() function.

Copy link
Contributor Author

@lovelle lovelle Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, flush will empty pendingIndividualAcks but that doesn't mean that when you call flush() (do the effective redelivery of the message) also wants to lost track of lastCumulativeAck.

I added flushAndClean() because here we need to start over again and forget every message tracked, the same as when the constructor occurs.

@ConcurrencyPractitioner
Copy link
Contributor

ConcurrencyPractitioner commented Apr 13, 2019

Sorry, just one last test case.
Have you considered testing the seek(long startPublishTime) to the middle of a sequence of messages?
I think seek(MessageId id) was expected to work, but what about the other seek method?

If seek(long startPublishTime) works as well, then I'm all for this issue.

@lovelle lovelle force-pushed the feature/support_seek_on_reader_impl branch from 5f157a2 to 0a5ea4c Compare April 13, 2019 01:38
@ConcurrencyPractitioner
Copy link
Contributor

Oh, so @lovelle
I added the following test in my local repo and ran it:

    @Test
    public void testReaderIsAbleToSeekWithTime() throws Exception {
        final String topic = "persistent://my-property/my-ns/testReaderIsAbleToSeekWithTime";
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
        final long timeMs = System.currentTimeMillis();
        log.info("Current system time is: " + timeMs);
        for (int i = 0; i < 10; i++) {
           String message = "my-message-" + i;
           producer.send(message.getBytes("UTF-8"));
           Thread.sleep(1000);
       }

       log.info("Seeking to time: " + (timeMs + 3000));
       Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
               .startMessageId(MessageId.earliest).create();

       reader.seek(timeMs + 3000);

       Message<byte[]> msg = null;
       Set<String> messageSet = Sets.newHashSet();
       for (int i = 3; i < 10; i++) {
           msg = reader.readNext();

           String receivedMessage = new String(msg.getData(), "UTF-8");
           log.info("Received message: [{}]", receivedMessage);
           String expectedMessage = "my-message-" + i;
           testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
       }
       reader.close();
       producer.close();
    }

And this was the test output:

[ERROR] testReaderIsAbleToSeekWithTime(org.apache.pulsar.client.api.TopicReaderTest)  Time elapsed: 10.112 s  <<< FAILURE!
java.lang.AssertionError: Received message my-message-0 did not match the expected message my-message-3 expected [my-message-3] but found [my-message-0]

I think seek(long timestamp) in your code is also suffering from the same problems that I am.

@lovelle
Copy link
Contributor Author

lovelle commented Apr 13, 2019

@ConcurrencyPractitioner

Yes, you are right, reset on ManagedCursorImpl tells moving the cursor to 3:4 position but after this I still receive [3:0..3:9]
e.g:

13:36:22.428 [bookkeeper-ml-workers-OrderedExecutor-5-0:org.apache.bookkeeper.mledger.impl.ManagedCursorImpl@822] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/testReaderIsAbleToSeekWithTime] Initiate reset position to 3:4 on cursor reader-f3710e27ca
13:36:22.429 [bookkeeper-ml-workers-OrderedExecutor-5-0:org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$7@861] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/testReaderIsAbleToSeekWithTime] reset position to 3:4 before current read position 3:10 on cursor reader-f3710e27ca
13:36:25.195 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:0:-1:0
13:36:25.196 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:1:-1:0
13:36:25.197 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:2:-1:0
13:36:25.198 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:3:-1:0
13:36:25.199 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:4:-1:0
13:36:25.199 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:5:-1:0
13:36:25.200 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:6:-1:0
13:36:25.201 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:7:-1:0
13:36:25.201 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:8:-1:0
13:36:25.201 [main:org.apache.pulsar.client.api.TopicReaderTest@653] INFO  org.apache.pulsar.client.api.TopicReaderTest - received msg -> 3:9:-1:0

I will keep debugging this to see if I find something. 👍

@lovelle lovelle force-pushed the feature/support_seek_on_reader_impl branch from 0a5ea4c to 71f10d9 Compare April 15, 2019 23:36
*Motivation*

Trying to fix apache#3976

According to what was discussed in pull apache#3983 it would be an acceptable solution
to add seek() command to Reader in order to reset a non durable cursor after
Reader instance was build.

*Modifications*

  - Bugfix reset() by timestamp on a non-durable consumer, previously the
    cached cursor was not present, therefore the state set by reset() was missed
    resulting in a reset() at the beginning of the cursor instead of a reset()
    at the expected position.
  - Copy seek() commands to Reader interface from Consumer interface.
  - Fix inconsistency with lastDequeuedMessage field after seek() command was
    performed successfully.
  - Fix consumer discarding messages on receive (after seek() command) due to
    messages being present on acknowledge grouping tacker.
  - Add functional test to assert seek() with timestamp behaviour from Reader to
    reach the beginning of the topic.
  - Add functional test to assert seek() with timestamp behaviour from Reader to
    reach the middle of the topic.
  - Add functional test to assert seek() with MessageId behaviour from Reader to
    reach the middle of the topic.
@lovelle lovelle force-pushed the feature/support_seek_on_reader_impl branch from 71f10d9 to 12c8ab9 Compare April 15, 2019 23:52
@lovelle
Copy link
Contributor Author

lovelle commented Apr 15, 2019

@ConcurrencyPractitioner the last mentioned missed test case is now solved and added, please take a look now 👍

@sijie @jai1 this pull is ready to be fully reviewed, thanks!.

@ConcurrencyPractitioner
Copy link
Contributor

👍 I see now, you made some changes to ManagedLedgerImpl. I could see now why seek(long timestamp) was not working. Nice work!

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sijie
Copy link
Member

sijie commented Apr 16, 2019

run java8 tests

@sijie
Copy link
Member

sijie commented Apr 18, 2019

@jai1 can you please take a look again?

@sijie
Copy link
Member

sijie commented Apr 22, 2019

ping @jai1

@sijie sijie added area/client type/feature The PR added a new feature or issue requested a new feature labels Apr 23, 2019
@sijie sijie added this to the 2.4.0 milestone Apr 23, 2019
@sijie sijie merged commit c70438c into apache:master Apr 23, 2019
@lovelle lovelle deleted the feature/support_seek_on_reader_impl branch April 23, 2019 20:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support opening a reader by publish time
5 participants