Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8591: WorkerConfigTransformer NPE on connector configuration reloading #6991

Merged
merged 3 commits into from Jul 9, 2019

Conversation

@nachomdo
Copy link
Contributor

commented Jun 24, 2019

A bug in WorkerConfigTransformer prevents the connector configuration reload when the ConfigData TTL expires.

The issue boils down to the fact that worker.herder().restartConnector is receiving a null callback.

[2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
[2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

This patch keeps the same behaviour than before in the WorkerConfigTransformer in terms of ignoring any error returned from the callback. Do we still want to behave in the same way or we would like to handle any potential error? 馃 maybe we can rely on the scheduleReload retrying again whenever the TTL expires or the connector task fails due to a stale configuration.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@nachomdo nachomdo changed the title WorkerConfigTransformer NPE on connector configuration reloading KAFKA-8591: WorkerConfigTransformer NPE on connector configuration reloading Jun 24, 2019

@nachomdo nachomdo force-pushed the nachomdo:config-provider-bug branch 2 times, most recently Jun 24, 2019

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java Outdated
@@ -86,7 +87,8 @@ private void scheduleReload(String connectorName, String path, long ttl) {
}
}
log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
HerderRequest request = worker.herder().restartConnector(ttl, connectorName, null);
FutureCallback<Void> cb = new FutureCallback<>();

This comment has been minimized.

Copy link
@rayokota

rayokota Jul 2, 2019

Contributor

Perhaps instead

CallBack<Void> cb = new Callback<Void>() {
    @Override
    public void onCompletion(Throwable error, Void result) {
        if (error != null) {
            log.error("Unexpected error during connector restart: ", error);
        }
    }
};
@rayokota
Copy link
Contributor

left a comment

Thanks for the PR @nachomdo ! Left a small comment.

nachomdo added 2 commits Jun 24, 2019

@nachomdo nachomdo force-pushed the nachomdo:config-provider-bug branch to aee5672 Jul 3, 2019

@nachomdo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 3, 2019

Feedback applied 鈿★笍 Thanks for the 馃憖 @rayokota! 馃弳

@rayokota
Copy link
Contributor

left a comment

Thanks @nachomdo ! LGTM

@rhauch, can you review and merge when you get a chance?

@hachikuji
Copy link
Contributor

left a comment

Thanks for the fix. Left a couple small comments.

@@ -95,13 +96,13 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
@Test
public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
EasyMock.expect(worker.herder()).andReturn(herder);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), anyObject())).andReturn(requestId);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 3, 2019

Contributor

nit: extra space before anyObject

@@ -84,8 +86,7 @@ public void testReplaceVariableWithTTL() {
@Test
public void testReplaceVariableWithTTLAndScheduleRestart() {
EasyMock.expect(worker.herder()).andReturn(herder);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);

EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), anyObject())).andReturn(requestId);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 3, 2019

Contributor

Hmm.. These tests all pass even if the callback is null. Perhaps we can use EasyMock.notNull()?

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Jul 8, 2019

retest this please

@hachikuji
Copy link
Contributor

left a comment

Thanks for the patch!

@hachikuji hachikuji merged commit 289ac09 into apache:trunk Jul 9, 2019

1 of 3 checks passed

JDK 11 and Scala 2.13 FAILURE 11627 tests run, 67 skipped, 1 failed.
Details
JDK 8 and Scala 2.11 FAILURE 11627 tests run, 67 skipped, 1 failed.
Details
JDK 11 and Scala 2.12 SUCCESS 11627 tests run, 67 skipped, 0 failed.
Details
hachikuji added a commit that referenced this pull request Jul 9, 2019
KAFKA-8591; WorkerConfigTransformer NPE on connector configuration re鈥
鈥oading (#6991)

A bug in `WorkerConfigTransformer` prevents the connector configuration reload when the ConfigData TTL expires. 

The issue boils down to the fact that `worker.herder().restartConnector` is receiving a null callback. 

```
[2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
[2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
```
This patch adds a callback which just logs the error.

Reviewers: Robert Yokota <rayokota@gmail.com>, Jason Gustafson <jason@confluent.io>
hachikuji added a commit that referenced this pull request Jul 9, 2019
KAFKA-8591; WorkerConfigTransformer NPE on connector configuration re鈥
鈥oading (#6991)

A bug in `WorkerConfigTransformer` prevents the connector configuration reload when the ConfigData TTL expires. 

The issue boils down to the fact that `worker.herder().restartConnector` is receiving a null callback. 

```
[2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
[2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
```
This patch adds a callback which just logs the error.

Reviewers: Robert Yokota <rayokota@gmail.com>, Jason Gustafson <jason@confluent.io>
rayokota added a commit to confluentinc/kafka that referenced this pull request Jul 16, 2019
KAFKA-8591; WorkerConfigTransformer NPE on connector configuration re鈥
鈥oading (apache#6991)

A bug in `WorkerConfigTransformer` prevents the connector configuration reload when the ConfigData TTL expires. 

The issue boils down to the fact that `worker.herder().restartConnector` is receiving a null callback. 

```
[2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
[2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
```
This patch adds a callback which just logs the error.

Reviewers: Robert Yokota <rayokota@gmail.com>, Jason Gustafson <jason@confluent.io>
ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 20, 2019
Merge remote-tracking branch 'apache-github/2.3' into ccs-2.3
* apache-github/2.3:
  MINOR: Update documentation for enabling optimizations (apache#7099)
  MINOR: Remove stale streams producer retry default docs. (apache#6844)
  KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085)
  KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054)
  KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094)
  KAFKA-8602: Separate PR for 2.3 branch (apache#7092)
  KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928)
  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086)
  KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050)
  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021)
  HOT FIX: close RocksDB objects in correct order (apache#7076)
  KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070)
  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705)
  Fixes #8198 KStreams testing docs use non-existent method pipe (apache#6678)
  KAFKA-5998: fix checkpointableOffsets handling (apache#7030)
  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072)
  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991)
  MINOR: add upgrade text (apache#7013)
  Bump version to 2.3.1-SNAPSHOT
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [4fdfe2b] KAFKA-8591; WorkerConfigTransformer NPE on鈥
鈥 connector configuration reloading (apache#6991)

TICKET = KAFKA-8591
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [4fdfe2b]
ORIGINAL_DESCRIPTION =

A bug in `WorkerConfigTransformer` prevents the connector configuration reload when the ConfigData TTL expires.

The issue boils down to the fact that `worker.herder().restartConnector` is receiving a null callback.

```
[2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
[2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
```
This patch adds a callback which just logs the error.

Reviewers: Robert Yokota <rayokota@gmail.com>, Jason Gustafson <jason@confluent.io>
(cherry picked from commit 4fdfe2b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can鈥檛 perform that action at this time.