Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch Pulsar offsets from Consumer interface instead of Reader #8017

Merged
merged 13 commits into from
Apr 6, 2022

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Jan 13, 2022

The Pulsar Plugin fails to consume data if the auto.offset.reset property is set to largest. The reason is the reader interface always resets to after the last message in the topic. The solution is either to use large fetch timeouts so that records pushed are consumed before a new pulsar consumer is created.
OR
Ditch the Pulsar Reader interface and use the Consumer interface. The Consumer interface can return the last valid message-id in the topic and hence the PulsarConsumer can begin consumption after that.

@codecov-commenter
Copy link

codecov-commenter commented Jan 13, 2022

Codecov Report

Merging #8017 (d0fb9a4) into master (35cef48) will decrease coverage by 1.78%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master    #8017      +/-   ##
============================================
- Coverage     71.34%   69.56%   -1.79%     
- Complexity     4215     4279      +64     
============================================
  Files          1596     1664      +68     
  Lines         82778    87342    +4564     
  Branches      12348    13227     +879     
============================================
+ Hits          59062    60758    +1696     
- Misses        19728    22299    +2571     
- Partials       3988     4285     +297     
Flag Coverage Δ
integration1 27.07% <ø> (-1.89%) ⬇️
integration2 ?
unittests1 67.02% <ø> (-1.09%) ⬇️
unittests2 14.13% <ø> (-0.17%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...t/core/plan/StreamingInstanceResponsePlanNode.java 0.00% <0.00%> (-100.00%) ⬇️
...ore/operator/streaming/StreamingResponseUtils.java 0.00% <0.00%> (-100.00%) ⬇️
...ager/realtime/PeerSchemeSplitSegmentCommitter.java 0.00% <0.00%> (-100.00%) ⬇️
...pache/pinot/common/utils/grpc/GrpcQueryClient.java 0.00% <0.00%> (-94.74%) ⬇️
...he/pinot/core/plan/StreamingSelectionPlanNode.java 0.00% <0.00%> (-88.89%) ⬇️
...ator/streaming/StreamingSelectionOnlyOperator.java 0.00% <0.00%> (-87.81%) ⬇️
...re/query/reduce/SelectionOnlyStreamingReducer.java 0.00% <0.00%> (-85.72%) ⬇️
...data/manager/realtime/DefaultSegmentCommitter.java 0.00% <0.00%> (-80.00%) ⬇️
...ller/api/access/BasicAuthAccessControlFactory.java 0.00% <0.00%> (-80.00%) ⬇️
...ctionaryBasedSingleColumnDistinctOnlyExecutor.java 0.00% <0.00%> (-80.00%) ⬇️
... and 523 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 35cef48...d0fb9a4. Read the comment docs.

@KKcorps KKcorps marked this pull request as ready for review January 16, 2022 21:05
@mathieudruart
Copy link
Contributor

Hi @KKcorps

We have tested your PR and it seems to miss messages, it seems to have an issue in the method getNextStreamParitionMsgOffsetAtIndex : if the message is part of a Pulsar batch (BatchMessageIdImpl), you add +1 to the entry id every time, that doesn't seem to be correct because in fact the next message id will have only the batch index incremented with the same entry id (all messages inside a Pulsar batch share the same entry id).
We tried with this version of the method, and it seems to get all messages correctly :

  @Override
  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
    MessageIdImpl currentMessageId = MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId());
    MessageId nextMessageId;
    
    long currentLedgerId = currentMessageId.getLedgerId();
    long currentEntryId = currentMessageId.getEntryId();
    int currentPartitionIndex = currentMessageId.getPartitionIndex();
    
    if (currentMessageId instanceof BatchMessageIdImpl) {
      int currentBatchIndex = ((BatchMessageIdImpl) currentMessageId).getBatchIndex();
      int currentBatchSize = ((BatchMessageIdImpl) currentMessageId).getBatchSize();
      
      if (currentBatchIndex < currentBatchSize - 1) {
        nextMessageId = new BatchMessageIdImpl(currentLedgerId, currentEntryId,
                currentPartitionIndex, currentBatchIndex + 1, currentBatchSize, 
                ((BatchMessageIdImpl) currentMessageId).getAcker());
      } else {
        nextMessageId = new BatchMessageIdImpl(currentLedgerId, currentEntryId + 1,
                currentPartitionIndex, 0, currentBatchSize, ((BatchMessageIdImpl) currentMessageId).getAcker());
      }
    } else {
      nextMessageId =
              DefaultImplementation.newMessageId(currentLedgerId, currentEntryId + 1,
                      currentPartitionIndex);
    }
    return new MessageIdStreamOffset(nextMessageId);
  }

@KKcorps
Copy link
Contributor Author

KKcorps commented Feb 3, 2022

Hi
thanks a lot for pointing it out. I am also not a fan of the increment methodology but pulsar doesn't offer a neat interface to get next offset like Kafka.
I will test and incorporate your patch. I think our tests currently cover only non-batch use cases and hence they missed this.

@KKcorps
Copy link
Contributor Author

KKcorps commented Feb 4, 2022

@mathieudruart can you send me some doc/examples to produce batch message scenarios where the code fails.
Need it to fix the test cases.

@mathieudruart
Copy link
Contributor

@mathieudruart can you send me some doc/examples to produce batch message scenarios where the code fails. Need it to fix the test cases.

Hi @KKcorps, here is an example scenario with which we reproduce the problem :

  • activate infinite retention policy on a Pulsar namespace (this will make it easier to reproduce the problem) : pulsar-admin namespaces set-retention my-tenant/my-ns --size -1 --time -1
  • use a Pulsar client to push messages with batch activated into a topic of the previous namespace (the Pulsar java client activates batches by default)
  • create a table into Pinot linked to the previous topic (all messages will be loaded correctly)
  • send new batch messages into the topic => the first messages will be skipped (the number of skipped messages depends on the size of the last correctly consumed batch)

Don't hesitate to ask me for clarification.

@KKcorps
Copy link
Contributor Author

KKcorps commented Feb 20, 2022

@mathieudruart Thanks for the help. I reproduced the scenario and it works fine with new code. I have also added a few unit test cases for Batch Id. The test cases still don't cover your scenario since it falls under Integration Test. I have added that as well but need to sort out a few dependency conflicts for test package before raising a PR. You can find the code here - https://github.com/KKcorps/incubator-pinot/blob/pulsar_integration_test/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimePulsarIntegrationTest.java

@mayankshriv
Copy link
Contributor

@npawar @navina please take a look.

@mathieudruart
Copy link
Contributor

With the latest version of this patch, we have no more issues.

@KKcorps KKcorps requested a review from npawar April 4, 2022 20:48
Copy link
Contributor

@npawar npawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!
Only 1 optional comment, feel free to merge with or without it

@KKcorps KKcorps merged commit d7be2ef into apache:master Apr 6, 2022
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return partition;
}
}).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, TimeUnit.SECONDS).create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance this is related to recent flakiness in #8537 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix the issue with "pinot-pulsar" module (potentially library conflicts)
6 participants