Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flakiness issues with subscriptionKeySharedUseConsistentHashing=true / PIP-119 in CPP tests #13965

Closed
lhotari opened this issue Jan 26, 2022 · 17 comments
Labels
type/bug The PR fixed a bug or issue reported a bug type/flaky-tests
Milestone

Comments

@lhotari
Copy link
Member

lhotari commented Jan 26, 2022

Describe the bug

Quoting @BewareMyPower from #13963

I tried to use three Java consumers with Key_Shared subscription to consume the topic produced by C++ test KeySharedConsumerTest.testMultiTopics. Sometimes not all messages can be received as well. It looks like there is something wrong with the consistent hashing implementation of Key_Shared dispatcher.

I also made similar observations based on C++ test logs:

Example of failures:

FAILED TESTS (3/279):
    9941 ms: ./main KeySharedConsumerTest.testMultiTopics (try #1)
    6242 ms: ./main KeySharedConsumerTest.testKeyBasedBatching (try #1)
    9740 ms: ./main KeySharedConsumerTest.testMultiTopics (try #2)

full logs in https://github.com/apache/pulsar/suites/5064608592/artifacts/150614790

2022-01-26 11:26:24.372 INFO  [140238723073792] MultiTopicsConsumerImpl:95 | Successfully Subscribed to Topics
2022-01-26 11:26:33.950 INFO  [140238845213440] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1098, 1 => 811, 2 => 1027}
/pulsar/pulsar-client-cpp/tests/KeySharedConsumerTest.cc:129: Failure
Value of: expectedNumTotalMessages
  Actual: 3000
Expected: numTotalMessages
Which is: 2936
2022-01-26 11:26:33.951 INFO  [140238845213440] ClientImpl:496 | Closing Pulsar client with 3 producers and 3 consumers

To Reproduce
Steps to reproduce the behavior:

  1. Set subscriptionKeySharedUseConsistentHashing=true
  2. Produce messages to multiple topics using key shared
  3. Consume messages

Expected behavior
There shouldn't be any message loss

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Jan 26, 2022

Yeah, here is a screenshot of my Java consumer application. The topic was created by C++ UT and received 3000 messages from C++ producer. Java consumers should have received 3000 messages in total, and sometimes it works well.

image

The code is

    private static int receive(Consumer<byte[]> consumer) throws PulsarClientException {
        int n = 0;
        while (true) {
            final Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
            if (msg == null) {
                break;
            }
            n++;
            System.out.println("Received " + new String(msg.getValue())
                    + " from " + msg.getMessageId() +  ", key: " + msg.getKey());
        }
        return n;
    }

    public static void main(String[] args) throws PulsarClientException {
        final PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
        final ConsumerBuilder<byte[]> builder = client.newConsumer()
                .topicsPattern(".*KeySharedConsumerTest-multi-topics.*")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionType(SubscriptionType.Key_Shared)
                .subscriptionName("my-sub-1");
        final Consumer<byte[]> consumer1 = builder.clone().subscribe();
        final Consumer<byte[]> consumer2 = builder.clone().subscribe();
        final Consumer<byte[]> consumer3 = builder.clone().subscribe();
        int n1 = receive(consumer1);
        int n2 = receive(consumer2);
        int n3 = receive(consumer3);
        System.out.println("n1: " + n1 + ", n2: " + n2 + ", n3: " + n3 + ", total: " + (n1 + n2 + n3));
        client.close();
    }

But I cannot reproduce it with Java UT easily at the moment.

@BewareMyPower
Copy link
Contributor

It's weird that it's hard to reproduce it in a unit test but easy to reproduce with a standalone.

Here is my project to reproduce it: https://github.com/BewareMyPower/pulsar-issue-13965-reproduce

/cc @codelipenghui

@lhotari
Copy link
Member Author

lhotari commented Jan 26, 2022

Great work on the repro case @BewareMyPower !

@lhotari
Copy link
Member Author

lhotari commented Jan 26, 2022

the problem can be reproduced with Pulsar 2.8.2 and Pulsar 2.7.4 too.

A quick way to start Pulsar 2.8.2 standalone with subscriptionKeySharedUseConsistentHashing=true is with this docker command:

docker run -it --name pulsar --rm -p 8080:8080 -p 6650:6650 -e subscriptionKeySharedUseConsistentHashing=true apachepulsar/pulsar:2.8.2 bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker --no-stream-storage"

@lhotari
Copy link
Member Author

lhotari commented Jan 26, 2022

The result in the repro seems to be the same with subscriptionKeySharedUseConsistentHashing=false.

@lhotari
Copy link
Member Author

lhotari commented Jan 26, 2022

Also tried by starting the container without --rm and stopping it

docker run -it --name pulsar -p 8080:8080 -p 6650:6650 -e subscriptionKeySharedUseConsistentHashing=true apachepulsar/pulsar:2.8.2 bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker --no-stream-storage"

then running

mvn exec:java -Dexec.mainClass=Produce
mvn exec:java -Dexec.mainClass=Consume
mvn exec:java -Dexec.mainClass=Consume
docker stop pulsar
docker start pulsar
mvn exec:java -Dexec.mainClass=Consume
mvn exec:java -Dexec.mainClass=Consume

The problem reproduces after restart too.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Jan 26, 2022

Yeah. It's weird that I also tried C++ client to consume the topic produce by Java client with subscriptionKeySharedUseConsistentHashing=false, the message loss still happened.

#include <assert.h>
#include <iostream>
#include <vector>
#include <pulsar/Client.h>
using namespace pulsar;

int main() {
    Client client("pulsar://localhost:6650");
    std::vector<Consumer> consumers(3);
    for (size_t i = 0; i < consumers.size(); i++) {
        ConsumerConfiguration conf;
        conf.setConsumerType(ConsumerType::ConsumerKeyShared);
        conf.setSchema(SchemaInfo(STRING, "String", ""));
        conf.setSubscriptionInitialPosition(InitialPositionEarliest);
        conf.setPatternAutoDiscoveryPeriod(1);
        auto result =
            client.subscribeWithRegex(".*KeySharedConsumerTest-multi-topics.*", "my-sub", conf, consumers[i]);
        assert(result == ResultOk);
    }

    std::vector<int> numReceivedList;
    int numTotal = 0;
    for (Consumer& consumer : consumers) {
        int n = 0;
        Message msg;
        while (true) {
            auto result = consumer.receive(msg, 3000);
            if (result == ResultTimeout) {
                break;
            }
            assert(result == ResultOk);
            n++;
        }
        numReceivedList.emplace_back(n);
        numTotal += n;
    }

    for (int n : numReceivedList) {
        std::cout << n << std::endl;
    }
    std::cout << numTotal << std::endl;
    client.close();
}

The code above is nearly the same as Consume.java in my repo. And a sample output is:

154
0
0
154

But when I run C++ UT, it never failed now when subscriptionKeySharedUseConsistentHashing=false

./tests/main --gtest_filter='*testMultiTopics'

Here are 5 test results in a row:

2022-01-26 23:45:25.235 INFO  [0x113994600] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1442, 1 => 738, 2 => 820}
2022-01-26 23:45:58.855 INFO  [0x1111d1600] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1421, 1 => 786, 2 => 793}
2022-01-26 23:46:30.804 INFO  [0x11da5d600] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1415, 1 => 779, 2 => 806}
2022-01-26 23:47:34.651 INFO  [0x1111e2600] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1396, 1 => 798, 2 => 806}
2022-01-26 23:47:52.039 INFO  [0x111313600] KeySharedConsumerTest:124 | messagesPerConsumer: {0 => 1371, 1 => 751, 2 => 878}

@merlimat
Copy link
Contributor

The messages should not get acknowledged (and thus lost), right?

@merlimat
Copy link
Contributor

I just checked, adding consumer acks, and the messages are there in the backlog, so this shouldn't be characterized
as "message loss".

@merlimat
Copy link
Contributor

I think there is a problem with the repro code above and with the original C++ test and the production code is actually correct here.

The repro code is doing:

  • Create 3 producers and publish on 3 different topics
  • Create 3 consumers on the 3 topics with key-shared, starting from "earliest"
  • The problem is that consumers are created after the messages are published, therefore the pre-fetching is already done
  • The first consumer will get pushed 1K message when it's first added, then other keys will get stalled.

There are 2 problems in the way the repro code (and the test) are consuming:

  • There's no ack, therefore the keys that were sent to first consumer cannot be sent to any other consumers
  • The test is sequentially receiving from the 3 consumers, stopping when 1 consumers has no more messages for 1 second.

What happens is that 1000 messages are pushed to the 1st consumer, and then the other consumers are stalled because the 1st consumer didn't acknowledge.

@merlimat
Copy link
Contributor

A couple of ways to fix the test:

  1. Create the consumers before the messages are published
  2. Use 1 thread per consumer to call receive() (and don't forget to ack) or use message listener

@BewareMyPower
Copy link
Contributor

Create the consumers before the messages are published

Yes, I applied BewareMyPower/pulsar-issue-13965-reproduce@8e85c77 and the test works now.

But I'm still confused that what makes the difference that the C++ test KeySharedConsumerTest.testMultiTopics is easily to fail with subscriptionKeySharedUseConsistentHashing=true but always succeed with subscriptionKeySharedUseConsistentHashing=false.

@BewareMyPower
Copy link
Contributor

Use 1 thread per consumer to call receive() (and don't forget to ack) or use message listener

I've tried both solutions above in C++ client and it works well. But I'm confused about why it works, could you explain a little more? @merlimat

@BewareMyPower
Copy link
Contributor

I found another problem with subscriptionKeySharedUseConsistentHashing=true. I changed the C++ tests that each consumer calls receive in a separated thread. After that, testMultiTopics never failed even with subscriptionKeySharedUseConsistentHashing=true.

However, the testKeyBasedBatching failed sometimes. In this test, a producer is created to send messages with interleaved keys and key based batching. i.e. send: ABABABAB......

Then, two consumers are created to consume these messages. I expect each consumer can consume a key. However, sometimes it failed with

2022-01-27 20:50:02.410 INFO  [0x10e646600] KeySharedConsumerTest:149 | messagesPerConsumer: {0 => 200, 1 => 0}
pulsar-client-cpp/tests/KeySharedConsumerTest.cc:160: Failure
Expected: (fabs(count - expectedMessagesPerConsumer)) < (expectedMessagesPerConsumer * PERCENT_ERROR), actual: 100 vs 50
pulsar-client-cpp/tests/KeySharedConsumerTest.cc:260: Failure
Expected equality of these values:
  messagesPerConsumer[i]
    Which is: 200
  NUM_MESSAGES_PER_KEY
    Which is: 100

We can see one consumer received all messages. Is it an expected behavior when subscriptionKeySharedUseConsistentHashing=true? @merlimat

@merlimat
Copy link
Contributor

@BewareMyPower

I've tried both solutions above in C++ client and it works well. But I'm confused about why it works, could you explain a little more? @merlimat

While it was clear for the repro code that you share, I couldn't find the problem in the C++ test (since it was acking the messages).

However, the testKeyBasedBatching failed sometimes. In this test, a producer is created to send messages with interleaved keys and key based batching. i.e. send: ABABABAB......

The random behavior is due to the auto-generated consumer names. The selection of consumers is done like:

  • Take the consumer name and place 100 points in the hash ring
  • The points are selected like hash(consumerName + "0"), hash(consumerName + "1"), hash(consumerName + "2") ...

For 2 given message keys, you're not guaranteed that they will be assigned to 2 different consumers, although statistically, the keys will be evenly distributed across consumers.

To make the test deterministic, you can set the consumer names (different) on the 2 consumers.

@codelipenghui codelipenghui removed the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Jan 28, 2022
@lhotari lhotari changed the title Message loss with subscriptionKeySharedUseConsistentHashing=true / PIP-119 detected in CPP tests Flakiness issues with subscriptionKeySharedUseConsistentHashing=true / PIP-119 in CPP tests Jan 31, 2022
@lhotari
Copy link
Member Author

lhotari commented Jan 31, 2022

I just checked, adding consumer acks, and the messages are there in the backlog, so this shouldn't be characterized as "message loss".

I agree. I renamed the issue. I'll close this issue since it seems to be addressed. @BewareMyPower Please reopen if there's more to do.

@lhotari lhotari closed this as completed Jan 31, 2022
@lightistor
Copy link

We are facing this issue that yields a constantly increasing backlog with millions of messages. Is there a fix for it or a guidance to implement our consumers? We end-up with 2 out of 16 consumer service pods that can subscribe with the others idling and not receiving any message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug type/flaky-tests
Projects
None yet
Development

No branches or pull requests

5 participants