KAFKA-20549: Implemented PRODUCE RPC handler for DLQ state manager. [5/N]#22368
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Some initial comments.
| Object isEnabled = props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG); | ||
| if (isEnabled instanceof Boolean) { | ||
| return (boolean) isEnabled; | ||
| if (isEnabled instanceof String) { |
There was a problem hiding this comment.
I'm surprised that this is necessary. This config has a boolean type already, so I wonder whether that's being lost somehow on its way to the metadata cache. For other similar configs, such as TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, the code just does Config.getBoolean.
There was a problem hiding this comment.
Ok, will encapsulate in LogConfig
|
|
||
| @Test | ||
| public void testIsDlqEnabledOnTopicReturnsTrue() { | ||
| public void testIsDlqEnabledOnTopicReturnsFalseValue() { |
There was a problem hiding this comment.
nit: why FalseValue instead of False?
There was a problem hiding this comment.
nit: This method could call the overload beneath will fully specified arguments.
| } | ||
|
|
||
| private void addRequestToNodeMap(Node node, ProduceRequestHandler handler) { | ||
| if (!handler.isBatchable()) { |
There was a problem hiding this comment.
This could do with some explanation I feel.
| } | ||
|
|
||
| if (tpData.numPartitions().isEmpty()) { | ||
| throw new ConfigException(String.format("DLQ topic partition count not be found for share group %s with DLQ topic %s.", param.groupId(), dlqTopic.get())); |
|
|
||
| private Header[] headers(long offset) { | ||
| List<Header> headers = new ArrayList<>(); | ||
| headers.add(new RecordHeader("__dlq.errors.topic", recordTopic().getBytes(StandardCharsets.UTF_8))); |
There was a problem hiding this comment.
I would define some static strings for the header keys.
| ProduceResponse produceResponse = ((ProduceResponse) response.responseBody()); | ||
| ProduceResponseData.TopicProduceResponseCollection produceResponseCollection = produceResponse.data().responses(); | ||
| if (produceResponseCollection.isEmpty()) { | ||
| LOG.error("Received empty produce response for {} to dlq topic node {}.", this, dlqPartitionLeaderNode()); |
There was a problem hiding this comment.
nit: Let's be consistent with use of . in the log messages. I would tend not to include in log messages, but I would definitely go for consistency in this source file.
|
@AndrewJSchofield Thanks for the review, inc comments. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
This looks like a good initial implementation of DLQ produce.
* Add ShareGroupDLQManager instance creation code in BrokerServer and pass along the instance to SharePartitionManager to be handed over to SharePartition. NOTE: Merge after #22368 Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com> Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
ShareGroupDLQStateManager.Reviewers: Andrew Schofield aschofield@confluent.io