Skip to content

[feat] [perf] Support for transaction consistence test with PerformanceProducer. #19780

@thetumbled

Description

@thetumbled

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, bin/pulsar-perf do not support for verify the exactly-once semantics. Like Kafka, Pulsar only support for exactly-once semantics in specific working pattern, that is consume-transform-produce, while i try to achieve exactly-once semantics in other working pattern. the detailed info see #19744.
This tool is to verify the transaction consistence with producer-only working pattern. We can use this tool to generate messages with or without transaction, and do the experiment in any disturbing environment, for example, we can restart brokers hundreds of times, unload topics hundreds of times or use tc qdisc to drop the packet. Finally, we consume the messages produced without interference and check for the data integrity, that is no message lost or duplication.

Solution

Implement the logic into PerformanceProducer.

Message Format

For the convenience of checking for the data integrity, we define the message format as number incrementing from 0 to N. For example, we start the perf process with configuration --num-messages 100000000, then we read the content produced to broker successfully and verify whether the content constitute a range [0,100000000), which can be implemented easily with RangeSet.

Client-Side Persistence Scheme

Client may corrupt and restart when produce messages in transaction, it need to persist the messages that have been sent in transaction to avoid message lost or duplication.
I adopt a simper way to persist the messages, that is to persist every messages in a local file tmpX.data when producer send it in transaction. If the transaction is committed successfully, the content of local file tmpX.data will be cleared. If the transaction is aborted or committed failed, we will resend messages in this aborted transaction with a new transaction.
We assign every transaction with a tmp file, such as tmp0.data, to persist the messages have been sent in this transaction. As there will be many transaction running concurrently, we will create many tmp files for persistence, and add concurrent control to these files.
When client is shutdown, it will take a snapshot of current number of message generator IncrementedNumberMessageFormatter. When the client is restarted, it will read the value from snapshot and continue generate number from the state before shutdown.

the detailed implementation see #19781.
I will show my experiment result following.

Usage

Non-transactional produce with topic unload

Produce 100 million messages, unload topics every 80 seconds for 50 times.

bin/pulsar-perf produce --num-messages 100000000 --rate 100000 --format-class org.apache.pulsar.testclient.IncrementedNumberMessageFormatter --send-timeout 30 --resend-on-failure  persistent://test/test/testTxn10

the script for unloading topics:

#!/bin/bash
# script for restarting broker.

interval=80  # interval in second to restart broker.
unloadCount=50  # total restart times.
topic="persistent://test/test/testTxn10"

while true
do
  if test $unloadCount -ne 0
  then
    echo "unload topic:"$topic
    bin/pulsar-admin topics unload $topic
    unloadCount=$(expr $unloadCount - 1)
  else
    break
  fi
  sleep $interval
done

Result as follows:
image
image

09:59:54.250 [main] INFO  org.example.utils.PulsarCommonUtils - only one range exists. RangeSet:[[0..100000000)]
09:59:54.254 [main] INFO  org.example.utils.PulsarCommonUtils - duplicate message count:11317, total message count:100011317, duplicate rate:0.011315719400035498%

There are 11317 messages are duplicated.

Transactional produce with broker restarting hundreds of times.

Produce 100 million messages, restarting brokers every 120 seconds for 50 times.

nohup bin/pulsar-perf produce --producer-name "txnProducer" -txn -nmt 800 --num-messages 100000000 --txn-test-enabled --max-txn 20 --rate 100000 --format-class org.apache.pulsar.testclient.IncrementedNumberMessageFormatter --send-timeout 0 persistent://test/test/testTxn10 > produce.txt &
#!/bin/bash
# script for restarting broker.

interval=120  # interval in second to restart broker.
restartCount=50  # total restart times.

while true
do
  if test $restartCount -ne 0
  then
    bin/pulsar-daemon restart broker
    restartCount=$(expr $restartCount - 1)
  else
    break
  fi
  sleep $interval
done

The result is as follows:

19:21:23.507 [main] INFO  org.example.utils.PulsarCommonUtils - only one range exists. RangeSet:[[0..100000000)]
19:21:23.510 [main] INFO  org.example.utils.PulsarCommonUtils - duplicate message count:800, total message count:100000800, duplicate rate:7.999936000511996E-4%

There are 800 messages duplicated, which is resulted by one transaction, as we produce 800 messages in a transaction. So, we can see that exactly-once semantics is not guaranteed in producer-only working pattern!
The reason accounting for duplication see #19629, and I have fixed them in #19744.

Following I will show the experiment result after i merge the PR #19662, and you can repeat my experiment with these two PR #19662 and #19781.

Transactional produce with broker restarting hundreds of times (PR merged).

nohup bin/pulsar-perf produce --producer-name "txnProducer" -txn -nmt 800 --num-messages 100000000 --txn-test-enabled --max-txn 30 --rate 200000 --format-class org.apache.pulsar.testclient.IncrementedNumberMessageFormatter --send-timeout 0 -threads 4 persistent://test/test/testTxn10 > produce.txt &

image

image

image

image

13:17:03.543 [main] INFO  org.example.utils.PulsarCommonUtils - only one range exists. RangeSet:[[0..100000000)]
13:17:03.546 [main] INFO  org.example.utils.PulsarCommonUtils - duplicate message count:0, total message count:100000000, duplicate rate:0.0%

We can see that there is no message duplication or lost.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Staletype/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions