Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6927: Message down-conversion causes Out Of Memory on broker #4871

Merged
merged 47 commits into from May 31, 2018

Conversation

dhruvilshah3
Copy link
Contributor

@dhruvilshah3 dhruvilshah3 commented Apr 13, 2018

Implementation for lazy down-conversion in a chunked manner for efficient memory usage during down-conversion. This pull request is mainly to get initial feedback on the direction of the patch. The patch includes all the main components from KIP-283.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dhruvilshah3 dhruvilshah3 changed the title KAFKA 6709: Message down-conversion causes Out Of Memory on broker KAFKA-6709: Message down-conversion causes Out Of Memory on broker Apr 13, 2018
@dhruvilshah3 dhruvilshah3 changed the title KAFKA-6709: Message down-conversion causes Out Of Memory on broker [WIP] KAFKA-6709: Message down-conversion causes Out Of Memory on broker Apr 27, 2018
@dhruvilshah3 dhruvilshah3 changed the title [WIP] KAFKA-6709: Message down-conversion causes Out Of Memory on broker KAFKA-6709: Message down-conversion causes Out Of Memory on broker May 3, 2018
@dhruvilshah3
Copy link
Contributor Author

Retest this please

@dhruvilshah3
Copy link
Contributor Author

@hachikuji could you please take a look. Thanks!

@@ -556,7 +557,7 @@ public void write(ByteBuffer buffer, Object o) {
}

@Override
public Records read(ByteBuffer buffer) {
public MemoryRecords read(ByteBuffer buffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a check above in write for FileRecords. Now that we have additional types to worry about, maybe we should change that check to to !(o instanceof MemoryRecords)?

// as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
// client.
val converted = new LazyDownConversionRecords(tp, unconvertedFetchResponse.records, magic, fetchContext.getFetchOffset(tp).get, Time.SYSTEM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the time field instead of Time.SYSTEM

FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedFetchResponse.logStartOffset, unconvertedFetchResponse.abortedTransactions,
converted)
}
}.getOrElse(new FetchResponse.PartitionData[BaseRecords](unconvertedFetchResponse.error, unconvertedFetchResponse.highWatermark,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if we have to construct a new instance anyway, I'd suggest simplifying this inner def a little bit. We can avoid the PartitionData object and pass the Records instance directly. Then we can construct the new PartitionData in the caller.

private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse): Unit = {
private def sendResponseMaybeThrottle(request: RequestChannel.Request,
createResponse: Int => AbstractResponse,
processingStatsCallback: Option[FetchResponseStats => Unit] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little odd to have a generic sendResponseMaybeThrottle method which has a callback which only makes sense when sending a fetch response. Maybe we should make the mechanism more generic? For example, maybe it can be an onComplete callback which gives access to the Send object?

@@ -16,6 +16,7 @@
*/
package kafka.server


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably unintentional

import kafka.common.TopicAndPartition
import kafka.log.LogConfig
import kafka.network.SocketServer
import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, _}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can get rid of OffsetAndMetadata in this import

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few more comments. LGTM overall.

def processingStatsCallback(recordConversionStats: FetchResponseStats): Unit = {
recordConversionStats.foreach { case (tp, info) =>
updateRecordsProcessingStats(request, tp, info)
def onComplete(send: Send): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could name this something more descriptive like updateConversionStats.

/**
* {@inheritDoc}
* Note that we do not have a way to return the exact size of down-converted messages, so we return the size of the
* pre-down-converted messages. The consumer however expects at least one full batch of messages to be sent out so
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is out of place after we moved the logic into the constructor.


public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0, -1);
public static final RecordConversionStats EMPTY = new RecordConversionStats(0L, 0, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from this patch, but it seems more natural to use 0 for conversionTimeNanos here. If we didn't convert anything, than I'd expect it took no time 😉.

Also, maybe we can add a no-arg constructor which initializes everything to 0. We have this in at least one other location.

}

private void doTestConversion(CompressionType compressionType, byte toMagic, DownConversionTest test) throws IOException {
System.out.println("{" + compressionType + ", " + toMagic + ", " + test + "}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this? We typically don't have printlns in test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a particular test case is flaky and fails occasionally, wouldn't this give us more information about which particular case is being flaky?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the parameterized test, we don't require this println so I removed it.


@Test
public void testConversion() throws IOException {
for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to do this is using a parameterized test case. You can see MemoryRecordsBuilderTest for an example how to do this.

request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
abstract class Response(val request: Request) {
val nowNs = Time.SYSTEM.nanoseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably do this in a locally block since we don't need nowNs except to initialize the fields below.

val responseString =
if (response.responseSend.isDefined)
response.responseAsString.getOrElse(
val responseString = response match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give Response a responseString method and skip this matching? The default implementation can return an empty string.

updateRequestMetrics(response)

// Invoke send completion callback
response match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment above. If you find yourself matching on types, it is worth considering whether we can instead add a method to the type. Here we could have Response.onComplete, and the default implementation can do nothing.

import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse}
import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse, Resource => RResource, ResourceType => RResourceType, _}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the Sasl* imports are redundant given the wildcard. I sort of hate all the flexibility that scala gives for importing things.

@@ -471,7 +470,7 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one more rename

override def onComplete: Option[Send => Unit] = onCompleteCallback

override def toString: String =
s"Response(responseType=SendResponse, request=$request, responseSend=$responseSend), responseAsString=$responseAsString"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there are a lot of "responses" here. Maybe we simplify a little:

s"Response(type=Send, request=$request, send=$responseSend), asString=$responseAsString"

Also, it looks like we have changed the location of the end parenthesis, was that intentional?

@@ -2255,23 +2271,25 @@ class KafkaApis(val requestChannel: RequestChannel,
sendErrorResponseExemptThrottle(request, e)
}

// Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to have lost this comment.

throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
else ""

val responseString = response.responseString.getOrElse("responseAsString should always be defined if request logging is enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be throwing IllegalStateException?

* @param channelThrottlingCallback Callback for channel throttling
*/
class ThrottledChannel(val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: (ResponseAction) => Unit)
class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: (Response) => Unit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parenthesis are unnecessary

@@ -730,7 +736,8 @@ private[kafka] class Processor(val id: Int,
}
}

private def updateRequestMetrics(response: RequestChannel.Response) {
private def updateRequestMetrics(response: RequestChannel.Response): Unit = {
// Record the amount of time this request spent on the network thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this comment added intentionally? The request metrics include more than how much time was spent on the network thread.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM. Just a few additional comments.


def throttle(session: Session, clientId: String, throttleTimeMs: Int,
channelThrottlingCallback: (ResponseAction) => Unit) {
def throttle(request: RequestChannel.Request, throttleTimeMs: Int, channelThrottlingCallback: (Response) => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeed parenthesis

val responseAsString: Option[String]) {
request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
abstract class Response(val request: Request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems more appropriate to treat this as a ResponseAction instead of a Response itself, especially given the new throttling behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will consider taking this up in a follow-on patch.

@hachikuji
Copy link
Contributor

retest this please

@hachikuji hachikuji merged commit 837f31d into apache:trunk May 31, 2018
// Length => Int32
// ...
// TODO: check if there is a better way to encapsulate this logic, perhaps in DefaultRecordBatch
log.debug("Constructing fake message batch for topic-partition {" + topicPartition() + "} for remaining length " + remaining);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not do string concatenation like this. Java only has strict evaluation so we are doing this operation even if the log is not enabled. Please use slf4j's interpolation. Also, no TODOs please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this in a follow-on PR

@omkreddy
Copy link
Contributor

omkreddy commented Jun 7, 2018

@hachikuji @dhruvilshah3
I am observing below exceptions while running old consumer related code (kafka.tools.TestLogCleaning) on compacted topics. I am not observing on normal topics. just want to let you guys know.

Server log:

[2018-06-07 18:15:58,164] WARN [SocketServer brokerId=0] Unexpected error from /10.200.5.75; closing connection (org.apache.kafka.common.network.Selector)
java.util.NoSuchElementException
	at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
	at org.apache.kafka.common.record.LazyDownConversionRecordsSend.writeTo(LazyDownConversionRecordsSend.java:59)
	at org.apache.kafka.common.record.RecordsSend.writeTo(RecordsSend.java:58)
	at org.apache.kafka.common.record.MultiRecordsSend.writeTo(MultiRecordsSend.java:93)
	at org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:339)
	at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:310)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:509)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
	at kafka.network.Processor.poll(SocketServer.scala:680)
	at kafka.network.Processor.run(SocketServer.scala:585)
	at java.base/java.lang.Thread.run(Thread.java:844)

client reconnect failing repeatedly with ClosedChannelException:

[2018-06-07 18:15:39,176] INFO Reconnect due to error: (kafka.consumer.SimpleConsumer:68)
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:135)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:134)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:133)
	at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:122)
	at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:37)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:149)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-06-07 18:15:39,176] DEBUG Disconnecting from 10.200.5.75:9092 (kafka.consumer.SimpleConsumer:62)

@dhruvilshah3
Copy link
Contributor Author

@omkreddy Thanks for reporting this. I will email you to get some more information about the issue.

@ijuma
Copy link
Contributor

ijuma commented Jun 7, 2018

@omkreddy I suggest filing a JIRA with more details.

@omkreddy
Copy link
Contributor

omkreddy commented Jun 7, 2018

Provided some more details to Dhruvil. Requested him to raise a JIRA, If he thinks its a issue.

ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…n broker [KIP-283] (apache#4871)

Implementation for lazy down-conversion in a chunked manner for efficient memory usage during down-conversion. This pull request is mainly to get initial feedback on the direction of the patch. The patch includes all the main components from KIP-283.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants