-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9674: corruption should also cleanup producer and recreate #8242
Conversation
Due to time constraints, I feel unblocking trunk is higher priority than test coverage on the active task creator, adding a ticket to track later: https://issues.apache.org/jira/browse/KAFKA-9676 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abbccdda ! I had a couple of high-level comments.
log.error(uncleanMessage, producerException); | ||
producerCloseExceptions.putIfAbsent(task.id(), producerException); | ||
} | ||
} | ||
} | ||
|
||
task.revive(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we try to revive the task if there was an exception closing/re-creating the task producer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For closing yes -- we close dirty anyway. And I don't think that creating a producer can fail (if we are worried about it, we should just not catch the exception but die...?
if (firstEntry.getValue() instanceof KafkaException) { | ||
log.error("Hit Kafka exception while closing first task {} producer", firstEntry.getKey()); | ||
throw firstEntry.getValue(); | ||
} else { | ||
throw new RuntimeException( | ||
"Unexpected failure to close " + producerCloseExceptions.size() + | ||
" task(s) producers [" + producerCloseExceptions.keySet() + "]. " + | ||
"First unexpected exception (for task " + firstEntry.getKey() + ") follows.", firstEntry.getValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two cases don't seem to be different. I'd recommend just always wrapping the exception and throwing (currently the else block). If we just re-throw the first exception, reading the stack trace becomes very confusing. Especially since a lot of those exceptions don't even include the stack trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In newest trunk we always call task.closeDirty
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should wrap KafkaException
as StreamsException
but rethrow all other RuntimeException
unwrapped (at least this is the pattern we use everywhere else, and thus we should follow it here, too)
void createTaskProducer(final TaskId taskId) { | ||
final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); | ||
final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId); | ||
producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); | ||
log.info("Creating producer client for task {}", taskId); | ||
taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead keeping this private and only exposing reOpenTaskProducerIfNeeded
, which would take care of doing nothing if there's no task producer, etc. I'm concerned that otherwise, someone might call createTaskProducer
when there's already one there, leading to a "producer leak".
test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In newest trunk, we do not call closeAndRemoveTaskProducerIfNeeded
during handleCorrupted, the error message seems be coming from the old version where we close the producer inside task#close
.
So I'm wondering during task-corruption handling, could we still reuse the existing producer? Since task-corruption can only throw during:
- restoration (changelog-reader), where producers are not used to send a single record yet.
- creation with EOS (processor-state-manager), where the producers are not used to send a single record yet.
So I feel we do not need to close / recreate a new producer for handleCorruption. WDYT?
if (firstEntry.getValue() instanceof KafkaException) { | ||
log.error("Hit Kafka exception while closing first task {} producer", firstEntry.getKey()); | ||
throw firstEntry.getValue(); | ||
} else { | ||
throw new RuntimeException( | ||
"Unexpected failure to close " + producerCloseExceptions.size() + | ||
" task(s) producers [" + producerCloseExceptions.keySet() + "]. " + | ||
"First unexpected exception (for task " + firstEntry.getKey() + ") follows.", firstEntry.getValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In newest trunk we always call task.closeDirty
.
@vvcephei @guozhangwang Thanks for the review! After some offline discussion we believe fixing this issue is not urgent at the moment, as John's refactoring on the producer should already handle the case of closing producer instead of inside |
// We need to recreate the producer as it could potentially be in illegal state. | ||
if (task.isActive()) { | ||
try { | ||
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: task.id()
-> taskId
activeTaskCreator.createTaskProducer(taskId); | ||
} catch (final RuntimeException producerException) { | ||
final String uncleanMessage = String.format("Failed to close task %s producer cleanly. " + | ||
"Attempting to close remaining task producers before re-throwing:", task.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
final String uncleanMessage = String.format("Failed to close task %s producer cleanly. " + | ||
"Attempting to close remaining task producers before re-throwing:", task.id()); | ||
log.error(uncleanMessage, producerException); | ||
producerCloseExceptions.putIfAbsent(task.id(), producerException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
producerCloseExceptions.entrySet().iterator().next(); | ||
|
||
if (firstEntry.getValue() instanceof KafkaException) { | ||
log.error("Hit Kafka exception while closing first task {} producer", firstEntry.getKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to log here? All errors are logging in L163 already (and I think we would log it again in upper layers)
@abbccdda Overall nice find -- working on KIP-447 PR I was also wondering if we would need to create a new producer for this case. |
Closing for now |
The task producer cleanup doesn't involve handling of task corruption. Adding recreation of task producer to avoid reusing a fatal state producer in next cycle.
Committer Checklist (excluded from commit message)