Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3720 : Deprecated BufferExhaustedException and also removed its use and the related sensor metric #1417

Closed
wants to merge 3 commits into from

Conversation

MayureshGharat
Copy link
Contributor

@MayureshGharat MayureshGharat commented May 20, 2016

BufferExhaustedException is no longerthrown by the new producer. Removed it from the catch clause and deprecated the exception class and removed the corresponding metrics.

@ijuma
Copy link
Contributor

ijuma commented May 20, 2016

Thanks @MayureshGharat. During the the request timeout KIP, was there a discussion about a metric to replace the buffer exhausted one?

@ijuma
Copy link
Contributor

ijuma commented May 20, 2016

cc @junrao

@MayureshGharat
Copy link
Contributor Author

MayureshGharat commented May 20, 2016

@ijuma I don't think the KIP-19 had any discussion about that. Since now we are failing a request if there is not enough memory by default, its the same behavior as BufferExhaustException and we probably should add another metric for this I think to replace the old one.

@MayureshGharat
Copy link
Contributor Author

@junrao @ijuma any update on this

@junrao
Copy link
Contributor

junrao commented Jun 2, 2016

Thanks for the patch. Could you fix the compilation error?

[ant:checkstyle] intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:27:8: Unused import - org.apache.kafka.common.metrics.stats.Rate.
FAILED

@ijuma
Copy link
Contributor

ijuma commented Jun 2, 2016

@junrao any thoughts on whether we should add a metric to replace the buffer exhausted one?

@junrao
Copy link
Contributor

junrao commented Jun 3, 2016

@ijuma : Yes, instead of removing buffer-exhausted-records sensor, we can probably just update it when BufferPool throws a TimeoutException.

@MayureshGharat
Copy link
Contributor Author

@junrao Thanks. I will upload a new PR.

@@ -472,6 +472,12 @@ private static int parseAcks(String acksString) {
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (TimeoutException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all cases of this exception due to buffer exhaustion? I thought that it could also happen in other cases? One option would be to keep BufferExhaustedException and have it inherit from TimeoutException. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Actually yeah, it can even happen when updating metadata. My bad. This seems like a viable solution or we can throw a BuffereExhaustedException from the Bufferpool.allocate() method instead of TimeoutException and have a proper message of what happened. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you correctly, that's indeed what I had in mind. We throw BufferExhaustedException from BufferPool.allocate. To make it compatible with code that expects a TimeoutException, we make it a subclass of TimeoutException. And then here we catch BufferExhaustedException.

That would mean not deprecating BufferExhaustedException, but updating its documentation to say that it's thrown when a buffer allocation times out.

What do you think @junrao?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Will wait for @junrao to comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @junrao.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao PING.

@kawamuray
Copy link
Contributor

@ijuma @MayureshGharat

Can we proceed this PR?
I'm looking for this PR to complete and make buffer-exhausted-rate metrics meaningful again as it's one of important metrics to me.
Even for other users experience I think this PR should be proceeded ASAP, as as of now users can misunderstand there was no buffer exhaustion by watching this existing, but not working metric always pointing zero.

The idea you two are discussing sounds reasonable to me too, +1.

@MayureshGharat
Copy link
Contributor Author

MayureshGharat commented Nov 18, 2016

@junrao : Would you mind taking a look at the comments me and @ijuma discussed? I can re-submit a quick PR for this, if we have a conclusion :)

@junrao
Copy link
Contributor

junrao commented Nov 18, 2016

@MayureshGharat : Sorry for the delay. The approach that you and @ijuma described sounds good to me.

@MayureshGharat
Copy link
Contributor Author

@ijuma I have updated the PR. Would you mind taking another look?

this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
Copy link
Contributor

@kawamuray kawamuray Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is gonna be a slight spec change. Before this, a TimeoutException thrown either by waiting metadata or by waiting buffer allocation were caught by the following clause for ApiException(since TimeoutException extends RetriableException which is an ApiException), so a FutureFailure returned instead of throwing, and the callback triggered too.

As the result of this change, two TimeoutException cases are treated differently:

  • timeout occurred while waiting metadata update => callback called, FutureFailure returned instead of throw
  • timeout occurred while waiting buffer allocation => callback not called, throw exception

This sounds confusing and inconsistent. I think we can either take 1. always return FailureFuture for TimeoutException or 2. always throw for timeout exception.
IMO, by design of KafkaProducer(method blocking is part of interface), the latter one makes more sense to me.
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. The current implementation is a bit misleading with regards to the javadoc which states:

* @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>.

We don't actually throw that exception unless you do Future.get. It seems to me that the change in this PR actually fixes the implementation to match the specified contract.

Another option is to change the javadoc to match the implementation (probably less likely to break users) and then we would simply have a check in the ApiException catch block to record the metric for BufferExhaustedException. This seems safer.

Thoughts?

Copy link
Contributor

@kawamuray kawamuray Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's indeed an option which is much safer.
Still I think we better throw here by following reasons:

  • It sounds semantically correct more. Callback and Future should be used for providing result of asynchronous(background) processing. However, these two TimeoutException occurrs while a KafkaProducer is still doing synchronous(foreground) processing and that(kafka producer has to do some foreground processing before it appends record to the accumulator) is the reason why a caller of producer#send is forced to wait until the result turns out, so the caller should receive the result of that call in a synchronous way.
  • Assuming this is going to be shipped with 0.10.2.0, making a breaking change on behavior isn't preferred but allowed if we leave a correct note on "breaking changes" section.
  • We can expect this breaking change is relatively less-harm with expect to most users who uses producer in sensitive situation would already using it like below. Users may see some new error logs but it still doesn't break the whole processing(maybe I'm biased, feel free to leave objection if any :D)
try {
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
          // logging
        }
     });
} catch (RuntimeException/* or maybe KafkaException, TimeoutException whatever */ e) {
     // logging
}

Copy link
Contributor

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left just one suggestion.

@ijuma
Copy link
Contributor

ijuma commented Aug 2, 2020

Superseded by #8399.

@ijuma ijuma closed this Aug 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants