Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8586: Fail source tasks when producers fail to send records #6993

Merged
merged 5 commits into from Aug 25, 2019

Conversation

@C0urante
Copy link
Contributor

commented Jun 24, 2019

Jira

Previously, if the producer for a source task failed to send a record with a non-retriable error, the record would be silently skipped over. The source task would be allowed to commit offsets for the skipped record, and its status would remain at RUNNING.

The changes here cause source tasks to transition to the FAILED state if their producers fail to send a record with a non-retriable error, and they also change the logic for offset commits to wait for confirmation that records have made it to Kafka before their offsets can be committed.

Tested by running Connect unit tests locally.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
@C0urante

This comment has been minimized.

Copy link
Contributor Author

commented Jun 24, 2019

@wicknicks would you mind taking a look?

@C0urante

This comment has been minimized.

Copy link
Contributor Author

commented Jun 24, 2019

@rayokota would you mind taking a look?

@wicknicks

This comment has been minimized.

Copy link
Contributor

commented Jun 24, 2019

@C0urante do we know what errors cause the producer.send() callback to be invoked? Ideally, with the source connector, we should retry forever (internally by the producer API, and we should never see the if (e!=null) block executed in that callback). I'm wondering if the data loss you observed was a result of overriding any of these configs.

@C0urante

This comment has been minimized.

Copy link
Contributor Author

commented Jun 24, 2019

@wicknicks no, this is not the result of overriding any producer configs. Like is detailed in the Jira, the producer only retries on retriable errors, and TopicAuthorizationExceptions are not retriable.

@wicknicks
Copy link
Contributor

left a comment

some questions/comments

...ime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java Outdated Show resolved Hide resolved
...src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java Outdated Show resolved Hide resolved

@C0urante C0urante force-pushed the C0urante:kafka-8586 branch to 34ce0c2 Jul 16, 2019

@C0urante

This comment has been minimized.

Copy link
Contributor Author

commented Jul 16, 2019

@wicknicks I've rebased on the latest trunk and addressed all of your comments; could you please make a second pass when you get a chance?

@wicknicks
Copy link
Contributor

left a comment

Modulo some comments where you wanted external inputs, LGTM.

@C0urante

This comment has been minimized.

Copy link
Contributor Author

commented Jul 31, 2019

Thanks @wicknicks!

@rhauch could you take a look at this when you get a chance?

@rhauch
Copy link
Contributor

left a comment

Looks pretty good, though I have one comment and one question about potentially removing the second call to maybeThrowProducerSendException().

@rhauch rhauch merged commit 237e83d into apache:trunk Aug 25, 2019

2 of 3 checks passed

JDK 11 and Scala 2.12 FAILURE 11683 tests run, 67 skipped, 1 failed.
Details
JDK 11 and Scala 2.13 SUCCESS 11683 tests run, 67 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11683 tests run, 67 skipped, 0 failed.
Details
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
rhauch added a commit that referenced this pull request Aug 25, 2019
KAFKA-8586: Fail source tasks when producers fail to send records (#6993
)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 27, 2019
[LI-CHERRY-PICK] [1a5062c] KAFKA-8586: Fail source tasks when produce…
…rs fail to send records (apache#6993)

TICKET = KAFKA-8586
LI_DESCRIPTION =

EXIT_CRITERIA = HASH [1a5062c]
ORIGINAL_DESCRIPTION =

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.

(cherry picked from commit 1a5062c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.