Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to better handel shutdown/interrupt situations #2789

Closed
wants to merge 2 commits into from

Conversation

StephanEwen
Copy link
Contributor

NOTE: Only the second commit is relevant, the first commit only prepares by cleaning up some code in the Flink Kafka Consumers for 0.9 and 0.10

Rational

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that operates Kafka's consumer. That thread was shielded from interrupts, because the Kafka Consumer has not been handling thread interrupts well.

Since that thread was also the thread that emitted records, it would block in the network stack (backpressure) or in chained operators. The later case lead to situations where cancellations got very slow unless that thread would be interrupted (which it could not be).

Core changes

This commit changes the thread model:

  • A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes the batch of records into a sort of blocking queue
  • The main thread of the task will pull the record batches from the blocking queue and emit the records.

The "batches" are the fetch batches from Kafka's consumer, there is no additional buffering or so that would impact latency.

The thread-to-thread handover of the records batches is handled by a class Handover which is a size-one blocking queue with the additional ability to gracefully wake up the consumer thread if the main thread decided to shut down. That way we need no interrupts on the KafkaConsumerThread.

This also pulls the KafkaConsumerThread out of the fetcher class for some code cleanup (scope simplifications).
The method calls that were broken between Kafka 0.9 and 0.10 are handled via a "call bridge", which leads to fewer code changes in the fetchers for each method that needs to be adapted.

Tests

This adjusts some tests, but it removes the "short retention IT Cases" for Kafka 0.9 and 0.10 consumers.
While that type of test makes sense for the 0.8 consumer, for the newer ones the tests actually test purely Kafka and no Flink code.

In addition, they are virtually impossible to run stable and fast, because they rely on an artificial slowdown in the KafkaConsumer threads. That type of unhealthy interference is exactly what this patch here prevents ;-)

…er to better handel shutdown/interrupt situations

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block in the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless that thread would be interrupted (which it could not be).

This commit changes the thread model:
  - A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes the
    batch of records into a blocking queue (size one)
  - The main thread of the task will pull the record batches from the blocking queue and
    emit the records.
Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I've only skimmed through the changes, and I really like the proposed changes overall. I think the solution solves shutting down the KafkaConsumer on cancellation quite elegantly. The pre-hotfix code cleanups on the 1st commit seems good to me too.

Only some very minor comments for the first review. I'd like to look into the PR more over the weekend, especially the tests which I haven't looked at yet and some double checks on the cancellation flow. Would probably also need to check that some of the previous behaviours related to offset committing / offset state initialization isn't broken due to the re-scope to the new KafkaConsumerThread.

public class KafkaConsumerThread extends Thread {

/** Logger for this consumer */
final Logger log;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this package-private, because it is accessed by the nested class for the commit callback.
If I make it private, the compiler has to inject a bridge method.

I guess making it private is correct, though, it better documents how it should be used.


import static org.apache.flink.util.Preconditions.checkNotNull;

public final class Handover implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if this class has some Javadoc too ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed :-)

}

@VisibleForTesting
Object getLock() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't seem to be used, even in the tests.


private ConsumerRecords<byte[], byte[]> next;
private Throwable error;
private boolean wakeup;
Copy link
Contributor

@tzulitai tzulitai Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to perhaps producerWakeup ? If it only affects the producer side of the handover, the renaming will make it less confusing.


// ------------------------------------------------------------------------

public static final class ClosedException extends IllegalStateException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure if extending IllegalStateException is good here, because ClosedException will be rethrown to the fetcher even on a normal call to Handover#close().

I understand it's to allow the cancellation process be faster, but somehow I think a normal close() after poll() was called doesn't add up to me as an illegal state.


/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
private volatile ExceptionProxy errorHandler;
/** The thread that runs the proper KafkaConsumer and hand the record batches to this fetcher */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 'proper' confused me a bit at first. Perhaps 'actual'?

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work Stephan, I've gave the changes a second pass, and they look good to me on my side.

The only major suggestions I have about would probably be how we let the new consumer thread / fetcher thread call close() on the handover, and whether or not we should be suppressing the fetcher of further throwing Handover.ClosedException (left comments inline).

Perhaps we should also wait for @rmetzger to review also, as he might notice catches I have overlooked to look out for from the original thread model design.

* This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
* changing {@code assign(List)} to {@code assign(Collection)}.
*
* Because of that, we need to two versions whose compiled code goes against different method signatures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we need "to" two versions <-- redundant "to".

public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {

@Override
public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the type parameters for key / value of KafkaConsumer need to be generic? Seems like we will only be using <byte[], byte[]> anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generic is nice, because for this method, the key/value types do not matter. That way it is more future proof.

import org.junit.Test;

@SuppressWarnings("serial")
public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove these tests for 0.9+ connectors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with this removal, we can also completely remove the runFailOnAutoOffsetResetNone() from the KafkaShortRetentionTestBase.

The 0.8 connector runs runFailOnAutoOffsetResetNoneEager() instead of runFailOnAutoOffsetResetNone(). I think this is what we actually should also be doing for 0.9+ connectors, testing only the eager version, because that's a Flink-specific behaviour (just pointing this out, we can add this as a separate future task as this probably requires some some work on 0.9+).

@@ -143,133 +123,26 @@ public Kafka09Fetcher(

@Override
public void runFetchLoop() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will be throwing all exceptions, even if it's a Handover.ClosedException, correct?

I wonder if it makes sense to suppress Handover.ClosedExceptions to not reach the main task thread, and only restore the interruption state that follows cancel()? So basically, we catch InterruptedException on the whole runFetchLoop() scope.

This was what the exception passing behaviour was like before. Before, when cancel() was called on the fetcher, we won't be throwing any other exceptions, only restoring the interruption state to the main task thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe, I think the CloseExceptions should be re-thrown, as should all others.
Just for the case when we overlook something and the consumer thread could close the handover by itself or so. Any abnormal termination of the fetch loop should result in an exception - that is the safest we can do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I agree to be safe.

Also, I just realized that "end of stream" shouldn't lead to the ClosedException, only "cancellation", "fetcher error", "consumer error", and (hopefully not) any other stuff we overlooked will. So, basically, like what you said, only abnormal terminations. In that case, let's keep it this way.

running = false;

// this wakes up the consumer if it is blocked handing over records
handover.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually call handover.wakeupProducer() here, and call handover.close() in the finally clause of the run() loop?

I don't think it really matters that much on our case, but IMO, this way the cancellation flow between the fetcher loop and the consumer thread will be clearer.

My thinking is that, only the KafkaConsumerThread actually calls close() on the handover and immediately rethrow a Handover.ClosedException to the fetcher thread on blocking handover.pollNext()s. The fetcher thread only calls shutdown() on KafkaConsumerThread, either on cancellation (in which case the pollNext() can still immediately be rethrown either a Handover.ClosedException or InterruptedException, depending on which arrives first) or normal clean exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed this partly, but kept an eager call to handover.close() just to make the consumer thread cancellation double safe.

@@ -323,8 +329,154 @@ else if (partition.topic().equals("another")) {

// check that there were no errors in the fetcher
final Throwable caughtError = error.get();
if (caughtError != null) {
if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should be suppressing the fetcher of throwing Handover.ClosedException, as it doesn't really make sense to the main thread. Please see my above comments.

Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.putAll(secureProps);
consumerProps.setProperty("fetch.message.max.bytes", "100");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't be setting fetch.message.max.bytes here. The config key for this setting has changed across Kafka versions (for 0.9+ it's max.partition.fetch.bytes). The version-specific standardProps already set values for this config.

So, the original props that only contains standardProps and secureProps should be enough for the test to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover from getting the "short retention" tests to run with the modified source. will undo.

public void runAutoOffsetResetTest() throws Exception {
final String topic = "auto-offset-reset-test";

final int parallelism = 1;
final int elementsPerPartition = 50000;

Properties tprops = new Properties();
tprops.setProperty("retention.ms", "250");
tprops.setProperty("retention.ms", "100");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary?

runProducerConsumerTest(500, 2, 2);
}

private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move this private method down to the bottom of the file? Not entirely necessary, just that I have a preference of keeping private methods after the public ones.

// ------------------------------------------------------------------------

@SuppressWarnings("unchecked")
static ConsumerRecords<byte[], byte[]> createTestRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well make this private.

@StephanEwen
Copy link
Contributor Author

Thanks for the review, @tzulitai

I would go ahead and merge this, addressing the comments.

@StephanEwen
Copy link
Contributor Author

I would actually like to not change how/when handover.close() is called. It is called more often that necessary (probably), but since it is an idempotent operation, it does not matter.

The code is designed to lead to the quickest wakeup/termination possible in all cases:

  • cancellation
  • end of stream
  • error in the fetcher
  • error in the consumer

Also note that errors/close do not overwrite each other, which makes it fine if the other is called afterwards in addition.

Also, both the fetcher and the KafkaConsumerThread are written to encapsulate all necessary logic self contained. That means they do not rely on each other to call handover.close() in any situation - that makes the design more robust.

@tzulitai
Copy link
Contributor

Also, both the fetcher and the KafkaConsumerThread are written to encapsulate all necessary logic self contained. That means they do not rely on each other to call handover.close() in any situation - that makes the design more robust.

I think that makes sense. My suggestions will definitely make the fetcher thread rely on the KafkaConsumerThread to do correct calls.
Agree to keep it as is :)

@StephanEwen
Copy link
Contributor Author

Manually merged in a66e7ad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants