Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions #11382

Merged
merged 5 commits into from
Jan 27, 2022

Conversation

TheKnowles
Copy link
Contributor

@TheKnowles TheKnowles commented Oct 5, 2021

This change allows Source Connectors the option to set "error.tolerance" to "all" to allow them to handle/ignore producer exceptions. In the event the producer cannot write to Kafka, the connector commitRecord() callback is invoked with null RecordMetadata. This is new behavior for the errors.tolerance setting. Default behavior is still to kill the task unconditionally if errors.tolerance is "none".

A unit test has been added to validate the producer callback for failure being invoked. The sourceTask will ignore the exception and the task will not be killed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mimaison mimaison added the kip Requires or implements a KIP label Oct 6, 2021
@TheKnowles
Copy link
Contributor Author

Unrelated tests locally and in jenkins appear flaky. All tests related to this change pass deterministically.

… source connectors to ignore producer exceptions. The connector will receive null RecordMetadata in the commitRecord callback in lieu of the task failling unconditionally.
@TheKnowles
Copy link
Contributor Author

TheKnowles commented Nov 19, 2021

Rebased, squashed, and force pushed for merging post KIP vote. Tests related to this change pass locally. There are a handful of unrelated nondeterministic test failures.

edit: Fixed a related test that was missed until CI picked it up. lastSendFailed state was removed in WorkerSourceTask.

@@ -366,7 +367,11 @@ private boolean sendRecords() {
if (e != null) {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we modify this line to respect the errors.log.enable (and possibly errors.log.include.messages) properties?

I wonder if it might be useful to still unconditionally set producerSendException (or perhaps even convert that field from an AtomicReference<Throwable> to some kind of list, and append to it here) and then modify the contents (and possibly also name) of maybeThrowProducerSendException to have our error-handling logic. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that may complicate things by causing records to be given to SourceTask::commitRecord out of order (a record that caused a producer failure may be committed after a record that was dispatched to the producer after it). So probably best to keep the error-handling logic here, but I do still wonder if we can respect the logging-related configuration properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this could be a tolerated error, it makes sense to have it respect the errors.log.enable configuration, but the log line would be duplicated, unconditionally writing it in the event we do not tolerate and a config check if we do.

Are you envisioning something like this?

if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
    if (errorLogEnabled) { // get this value from the config in some manner
        log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
        log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
    }
    commitTaskRecord(preTransformRecord, null);
} else {
    log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
    log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
    producerSendException.compareAndSet(null, e);
}

I would need to look more closely at the other layers of objects on top of the SourceTask. enableErrorLog() is available in the ConnectorConfig, but only the SinkConnectorConfig makes use of it. I would need to spin up some additional infrastructure. Not sure if I would want to add WorkerErrantRecordReporter to WorkerSourceTask or have the configuration pass down in some other manner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking the behavior could be something like that code snippet, although we'd also want to respect the errors.log.include.messages property and would probably want the format of the error messages to be similar to the error messages we emit in other places where messages are tolerated (such as when conversion or transformation fails).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error retry handling infrastructure predominantly concerns itself with the sink side of the house. Insofar that any refactoring I would want to do would probably necessitate a KIP on its own. To that end, I have added an additional executeFailed() function to RetryWithToleranceOperator to allow the source worker to handle error logging with all of the existing infrastructure/configuration that exists for sink tasks.

I toy'ed around with the idea of having the new executeFailed() fire without a tolerance type check. This would work for failing/ignoring as expected, but with no mechanism to then decide if we should call commitRecord(). We could block on the future from executeFailed() and then check withinToleranceLimits() but that introduces non determinism with interrupt/execution exceptions.

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TheKnowles for the PR. I've made a first pass and left a few comments.

log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use == to compare enums.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

// executeFailed here allows the use of existing logging infrastructure/configuration
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
preTransformRecord, e);
commitTaskRecord(preTransformRecord, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a debug/trace log in this path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it was suggested to have the tolerance operator handle via the logging report. I would personally find it useful to have it in the connect log regardless of tolerance error logging configuration. I've moved the error/debug log lines to above the tolerance check to log in all instances.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be logging at ERROR level for every single record if we aren't failing the task unless the user has explicitly enabled this by setting errors.log.enable to true in their connector config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the existing trace and error log lines in the else block.
My suggestion is to add a line at the debug or trace level in the if block so users can know if an error is ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My misunderstanding, thank you both for the feedback. Update made.

// For source connectors that want to skip kafka producer errors.
// They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
// to write to kafka.
public synchronized ToleranceType getErrorToleranceType() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not. Type is immutable and thread safe. I had dug through the ticket that retroactively made this class thread safe and it seemed like a good idea at the time to slap a synchronized on it to match the rest of the class, but is not necessary at all. Removed.

Future<Void> errantRecordFuture = context.report();
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in error handler", error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this message can come from 2 different paths, should we add some context to the message to disambiguate them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some context to the string error message denoting it was a Source Worker. I am open to suggestions on how verbose this message should be.

@@ -222,6 +222,13 @@ private void createWorkerTask() {
createWorkerTask(TargetState.STARTED);
}

private void createWorkerTaskWithErrorToleration() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the createWorkerTask() method just below by passing a RetryWithToleranceOperator argument instead of creating the WorkerSourceTask object here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I have refactored the constructors to be cleaner with various parameter lists.


expectSendRecordOnce();
expectSendRecordProducerCallbackFail();
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of EasyMock.anyObject(RecordMetadata.class) should we use EasyMock.isNull() to assert we indeed pass null to the task in case there was a failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

…r task creation in test. Misc. code cleanup.
@TheKnowles
Copy link
Contributor Author

Thanks @TheKnowles for the PR. I've made a first pass and left a few comments.

@mimaison Thank you for reviewing. I've replied to each comment above and pushed changes.

@TheKnowles
Copy link
Contributor Author

Happy to squash and force push once everyone is pleased with the changes.

@mimaison
Copy link
Member

@TheKnowles Don't worry about squashing everything, it's done automatically when we merge PRs.

Thanks for the quick update, I'll take another look.

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM

@mimaison
Copy link
Member

@C0urante Do you have further comments or just I merged?

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Knowles!

@mimaison mimaison merged commit 9f2f63e into apache:trunk Jan 27, 2022
@mimaison
Copy link
Member

Thanks @TheKnowles for this contribution! Sorry it took so long between getting votes on the KIP and reviews on your PR. This feature will be in the next minor release, Kafka 3.2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connect kip Requires or implements a KIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants