New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6927: Message down-conversion causes Out Of Memory on broker #4871
Changes from 1 commit
2185e03
19c30f5
e619124
4b04c09
1226496
2b4e36f
90e87bc
ca7c623
3dcb201
d88fd93
910525a
53a76dd
4ea1deb
46c067e
69cee5f
ad7daf4
6d44315
1289818
a0474a8
b31b6ec
8d6b5b9
aea9548
78c5327
99471e2
459c7c9
127d6ee
72df999
3aa8887
b187fe2
e45dde0
da99558
9b61658
d0068d5
2e76d91
3e9f9ba
40d598c
de1d294
5ca9365
88437a0
84f7b78
cc03a52
fed927f
002b5e5
af74c13
74f714d
9deb553
2e35d5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -704,7 +704,15 @@ private[kafka] class Processor(val id: Int, | |
val response = inflightResponses.remove(send.destination).getOrElse { | ||
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") | ||
} | ||
updateRequestMetrics(response, send) | ||
updateRequestMetrics(response) | ||
|
||
// Invoke send completion callback | ||
response match { | ||
case response: RequestChannel.SendResponse => response.onComplete.foreach { | ||
onComplete => onComplete(send) | ||
} | ||
case _ => | ||
} | ||
selector.unmute(send.destination) | ||
} catch { | ||
case e: Throwable => processChannelException(send.destination, | ||
|
@@ -713,17 +721,6 @@ private[kafka] class Processor(val id: Int, | |
} | ||
} | ||
|
||
private def updateRequestMetrics(response: RequestChannel.Response, send: Send): Unit = { | ||
// If we have a callback for updating record processing statistics, invoke that now. This is used for cases | ||
// where network threads process records, for example when we lazily down-convert records. | ||
(response, send) match { | ||
case (response: RequestChannel.SendResponse, send: MultiRecordsSend) if (send.recordConversionStats().size() > 0) => | ||
response.processingStatsCallback.foreach(callback => callback(send.recordConversionStats().asScala.toMap)) | ||
case _ => | ||
} | ||
updateRequestMetrics(response) | ||
} | ||
|
||
private def updateRequestMetrics(response: RequestChannel.Response): Unit = { | ||
// Record the amount of time this request spent on the network thread | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this comment added intentionally? The request metrics include more than how much time was spent on the network thread. |
||
val request = response.request | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,9 +43,9 @@ import org.apache.kafka.common.errors._ | |
import org.apache.kafka.common.internals.FatalExitError | ||
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} | ||
import org.apache.kafka.common.metrics.Metrics | ||
import org.apache.kafka.common.network.ListenerName | ||
import org.apache.kafka.common.network.{ListenerName, Send} | ||
import org.apache.kafka.common.protocol.{ApiKeys, Errors} | ||
import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, RecordBatch, RecordConversionStats, Records} | ||
import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats, Records} | ||
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse | ||
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} | ||
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo | ||
|
@@ -459,7 +459,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
|
||
def processingStatsCallback(processingStats: FetchResponseStats): Unit = { | ||
processingStats.foreach { case (tp, info) => | ||
updateRecordsProcessingStats(request, tp, info) | ||
updateRecordConversionStats(request, tp, info) | ||
} | ||
} | ||
|
||
|
@@ -533,8 +533,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
}) | ||
} | ||
|
||
def convertedPartitionData(tp: TopicPartition, | ||
unconvertedFetchResponse: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = { | ||
def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = { | ||
// Down-conversion of the fetched records is needed when the stored magic version is | ||
// greater than that supported by the client (as indicated by the fetch request version). If the | ||
// configured magic version for the topic is less than or equal to that supported by the version of the | ||
|
@@ -544,9 +543,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |
// which were written in the new format prior to the version downgrade. | ||
replicaManager.getMagic(tp).flatMap { magic => | ||
val downConvertMagic = { | ||
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedFetchResponse.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) | ||
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) | ||
Some(RecordBatch.MAGIC_VALUE_V0) | ||
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedFetchResponse.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) | ||
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) | ||
Some(RecordBatch.MAGIC_VALUE_V1) | ||
else | ||
None | ||
|
@@ -559,14 +558,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |
// as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked | ||
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the | ||
// client. | ||
val converted = new LazyDownConversionRecords(tp, unconvertedFetchResponse.records, magic, fetchContext.getFetchOffset(tp).get, Time.SYSTEM) | ||
new FetchResponse.PartitionData[BaseRecords](unconvertedFetchResponse.error, unconvertedFetchResponse.highWatermark, | ||
FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedFetchResponse.logStartOffset, unconvertedFetchResponse.abortedTransactions, | ||
converted) | ||
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) | ||
} | ||
}.getOrElse(new FetchResponse.PartitionData[BaseRecords](unconvertedFetchResponse.error, unconvertedFetchResponse.highWatermark, | ||
unconvertedFetchResponse.lastStableOffset, unconvertedFetchResponse.logStartOffset, unconvertedFetchResponse.abortedTransactions, | ||
unconvertedFetchResponse.records)) | ||
}.getOrElse(unconvertedRecords) | ||
} | ||
|
||
// the callback for process a fetch response, invoked before throttling | ||
|
@@ -584,13 +578,20 @@ class KafkaApis(val requestChannel: RequestChannel, | |
// fetch response callback invoked after any throttling | ||
def fetchResponseCallback(bandwidthThrottleTimeMs: Int) { | ||
def createResponse(requestThrottleTimeMs: Int): FetchResponse[BaseRecords] = { | ||
// Down-convert messages for each partition, if needed | ||
val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]] | ||
unconvertedFetchResponse.responseData().asScala.foreach { case (tp, partitionData) => | ||
if (partitionData.error != Errors.NONE) | ||
unconvertedFetchResponse.responseData().asScala.foreach { case (tp, unconvertedPartitionData) => | ||
if (unconvertedPartitionData.error != Errors.NONE) | ||
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + | ||
s"on partition $tp failed due to ${partitionData.error.exceptionName}") | ||
convertedData.put(tp, convertedPartitionData(tp, partitionData)) | ||
s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}") | ||
val convertedRecords = convertRecords(tp, unconvertedPartitionData.records) | ||
val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error, | ||
unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset, | ||
unconvertedPartitionData.abortedTransactions, convertedRecords) | ||
convertedData.put(tp, convertedPartitionData) | ||
} | ||
|
||
// Prepare fetch response from converted data | ||
val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, | ||
bandwidthThrottleTimeMs + requestThrottleTimeMs, unconvertedFetchResponse.sessionId()) | ||
response.responseData.asScala.foreach { case (topicPartition, data) => | ||
|
@@ -603,16 +604,20 @@ class KafkaApis(val requestChannel: RequestChannel, | |
trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " + | ||
s"metadata=${unconvertedFetchResponse.sessionId()}") | ||
|
||
def processingStatsCallback(recordConversionStats: FetchResponseStats): Unit = { | ||
recordConversionStats.foreach { case (tp, info) => | ||
updateRecordsProcessingStats(request, tp, info) | ||
def onComplete(send: Send): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we could name this something more descriptive like |
||
send match { | ||
case send: MultiRecordsSend if send.recordConversionStats != null => | ||
send.recordConversionStats.asScala.toMap.foreach { | ||
case (tp, stats) => updateRecordConversionStats(request, tp, stats) | ||
} | ||
case _ => | ||
} | ||
} | ||
|
||
if (fetchRequest.isFromFollower) | ||
sendResponseExemptThrottle(request, createResponse(0), Some(processingStatsCallback)) | ||
sendResponseExemptThrottle(request, createResponse(0), Some(onComplete)) | ||
else | ||
sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs), Some(processingStatsCallback)) | ||
sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs), Some(onComplete)) | ||
} | ||
|
||
// When this callback is triggered, the remote API call has completed. | ||
|
@@ -2207,9 +2212,10 @@ class KafkaApis(val requestChannel: RequestChannel, | |
throw new ClusterAuthorizationException(s"Request $request is not authorized.") | ||
} | ||
|
||
private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition, | ||
processingStats: RecordConversionStats): Unit = { | ||
val conversionCount = processingStats.numRecordsConverted | ||
private def updateRecordConversionStats(request: RequestChannel.Request, | ||
tp: TopicPartition, | ||
conversionStats: RecordConversionStats): Unit = { | ||
val conversionCount = conversionStats.numRecordsConverted | ||
if (conversionCount > 0) { | ||
request.header.apiKey match { | ||
case ApiKeys.PRODUCE => | ||
|
@@ -2221,9 +2227,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |
case _ => | ||
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests") | ||
} | ||
request.messageConversionsTimeNanos = processingStats.conversionTimeNanos | ||
request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos | ||
} | ||
request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes | ||
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes | ||
} | ||
|
||
private def handleError(request: RequestChannel.Request, e: Throwable) { | ||
|
@@ -2237,9 +2243,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |
|
||
private def sendResponseMaybeThrottle(request: RequestChannel.Request, | ||
createResponse: Int => AbstractResponse, | ||
processingStatsCallback: Option[FetchResponseStats => Unit] = None): Unit = { | ||
onComplete: Option[Send => Unit] = None): Unit = { | ||
quotas.request.maybeRecordAndThrottle(request, | ||
throttleTimeMs => sendResponse(request, Some(createResponse(throttleTimeMs)), processingStatsCallback)) | ||
throttleTimeMs => sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete)) | ||
} | ||
|
||
private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable) { | ||
|
@@ -2248,9 +2254,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |
|
||
private def sendResponseExemptThrottle(request: RequestChannel.Request, | ||
response: AbstractResponse, | ||
processingStatsCallback: Option[FetchResponseStats => Unit] = None): Unit = { | ||
onComplete: Option[Send => Unit] = None): Unit = { | ||
quotas.request.maybeRecordExempt(request) | ||
sendResponse(request, Some(response), processingStatsCallback) | ||
sendResponse(request, Some(response), onComplete) | ||
} | ||
|
||
private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = { | ||
|
@@ -2281,7 +2287,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
|
||
private def sendResponse(request: RequestChannel.Request, | ||
responseOpt: Option[AbstractResponse], | ||
processingStatsCallback: Option[FetchResponseStats => Unit]): Unit = { | ||
onComplete: Option[Send => Unit]): Unit = { | ||
// Update error metrics for each error code in the response including Errors.NONE | ||
responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala)) | ||
|
||
|
@@ -2291,7 +2297,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
val responseString = | ||
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion)) | ||
else None | ||
new RequestChannel.SendResponse(request, responseSend, responseString, processingStatsCallback) | ||
new RequestChannel.SendResponse(request, responseSend, responseString, onComplete) | ||
case None => | ||
new RequestChannel.NoOpResponse(request) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the comment above. If you find yourself matching on types, it is worth considering whether we can instead add a method to the type. Here we could have
Response.onComplete
, and the default implementation can do nothing.