Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3710: MemoryOffsetBackingStore shutdown #1383

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.connect.storage;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this
Expand All @@ -40,7 +42,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);

protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
protected ExecutorService executor = Executors.newSingleThreadExecutor();
protected ExecutorService executor;

public MemoryOffsetBackingStore() {

Expand All @@ -51,12 +53,26 @@ public void configure(WorkerConfig config) {
}

@Override
public synchronized void start() {
public void start() {
executor = Executors.newSingleThreadExecutor();
}

@Override
public synchronized void stop() {
// Nothing to do since this doesn't maintain any outstanding connections/data
public void stop() {
if (executor != null) {
executor.shutdown();
// Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!executor.shutdownNow().isEmpty()) {
throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting without cleanly " +
"shutting down pending tasks and/or callbacks.");
}
executor = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() only triggers shutdown, but it does not wait for the tasks to be completed. How about adding a awaitTermination ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ishiihara I did not want to block in a synchronized method: deadlock risk. Can you guess why start() and stop() need to be synchronized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davispw I think this was just done defensively since the ExecutorService means some state will be mutated in another thread (you'll notice the Kafka*BackingStore implementations aren't synchronized). Since we're only touching the executor field here and it should be thread safe anyway, I think we can drop the synchronization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK will drop synchronization and do awaitTermination(timeout). @ewencp What do you think is a reasonable timeout before giving up? Note, get() and set() tasks should be quick but caller's callback could block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30s maybe? Timeouts like these should be fairly liberal, we really just want to bound how long we're willing to wait in the worst case, but usually it shouldn't block for any significant amount of time. (All "user" callbacks for this class are internal to Connect, and they don't do anything that should take a long time).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, 30s it is. For comparison, KafkaOffsetBackingStore.stop() uses a KafkaBasedLog which ultimately calls Thread.join() with no timeout, and throws a ConnectException if it is interrupted; but there are other paths where it just logs shutdown failures.

}
}

@Override
Expand Down