New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #199: Do not stop to replicate when producer throws exception #220
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
consumer1.receive(10); | ||
|
||
// Restrict backlog quota limit to 1 | ||
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we setup the test this way? If you want, you can create another test.
- limit quota to 1 message
- Publish 10 messages
- verify receive times out after first message is received(since quota is full)
- Increase quota to 10
- Verify we receive the remaining 9 messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saandrews
I modified the test a little:
- limit quota to 1
- Publish 1 message and wait for reflecting backlog limitation ※
- Publish 9 messages and verify they will be pended
※ If I produce 10 messages at once in interval of reflecting backlog limitation, they will be sended
log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName, | ||
replicator.localCluster, replicator.remoteCluster); | ||
// cursor shoud be rewinded since it was incremented when readMoreEntries | ||
replicator.cursor.rewind(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat Do you see any side effect if we rewind it for every exception. All failed pending messages would reach here and invoke rewind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewind is a cheap operation, it just resets the readPosition
to markDeletePosition + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Fix #199: Do not stop to replicate when producer throws exception * Fix log messages * testReplicatorProducerClosing shoud be executed at last since it closes pulsar2/pulsar3 * Add unit test for replication resumption on backlog exceeded
* Fixed Lookup service.
fixes apache#220 if using `computeIfAbsent `, `consumerManagerFuture.complete(null)` will store in `consumerTopicManagers`, and `getTopicConsumerManager ` will always get future null cache for key which should getTopic again.
Motivation
This closes #199
Modifications
When the replication producer catches an exception,
rewind cursor and continue readMoreEntries (if possible) rather than return immediately.
An unit test for simulating #199 is also added.
Result
Replicator will continue to replicate even when producer exception.