Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Include response in request log #3801

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException
import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
import org.apache.kafka.common.utils.Time

import scala.collection.JavaConverters._

/**
* Handle the notificationMessage.
*/
Expand Down
93 changes: 52 additions & 41 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest}
import kafka.server.QuotaId
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
Expand All @@ -39,6 +39,8 @@ import scala.reflect.ClassTag
object RequestChannel extends Logging {
private val requestLogger = Logger.getLogger("kafka.request.logger")

def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled

sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest

Expand Down Expand Up @@ -90,7 +92,7 @@ object RequestChannel extends Logging {
math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
}

def updateRequestMetrics(networkThreadTimeNanos: Long) {
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response) {
val endTimeNanos = Time.SYSTEM.nanoseconds
// In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
// processing time is really small. This value is set in KafkaApis from a request handling thread.
Expand All @@ -103,15 +105,23 @@ object RequestChannel extends Logging {
if (apiRemoteCompleteTimeNanos < 0)
apiRemoteCompleteTimeNanos = responseCompleteTimeNanos

def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
/**
* Converts nanos to millis with micros precision as additional decimal places in the request log have low
* signal to noise ratio. When it comes to metrics, there is little difference either way as we round the value
* to the nearest long.
*/
def nanosToMs(nanos: Long): Double = {
val positiveNanos = math.max(nanos, 0)
TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
}

val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on naming these with units

val fetchMetricNames =
if (header.apiKey == ApiKeys.FETCH) {
val isFromFollower = body[FetchRequest].isFromFollower
Expand All @@ -125,13 +135,13 @@ object RequestChannel extends Logging {
metricNames.foreach { metricName =>
val m = RequestMetrics.metricsMap(metricName)
m.requestRate.mark()
m.requestQueueTimeHist.update(requestQueueTime)
m.localTimeHist.update(apiLocalTime)
m.remoteTimeHist.update(apiRemoteTime)
m.throttleTimeHist.update(apiThrottleTime)
m.responseQueueTimeHist.update(responseQueueTime)
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
m.throttleTimeHist.update(Math.round(apiThrottleTimeMs))
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
m.totalTimeHist.update(Math.round(totalTimeMs))
}

// Records network handler thread usage. This is included towards the request quota for the
Expand All @@ -142,28 +152,26 @@ object RequestChannel extends Logging {
// the total time spent on authentication, which may be significant for SASL/SSL.
recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))

if (requestLogger.isDebugEnabled) {
if (isRequestLoggingEnabled) {
val detailsEnabled = requestLogger.isTraceEnabled
def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)

requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} from connection ${context.connectionId};" +
s"totalTime:$totalTimeMs," +
s"requestQueueTime:$requestQueueTimeMs," +
s"localTime:$apiLocalTimeMs," +
s"remoteTime:$apiRemoteTimeMs," +
s"throttleTime:$apiThrottleTimeMs," +
s"responseQueueTime:$responseQueueTimeMs," +
s"sendTime:$responseSendTimeMs," +
s"securityProtocol:${context.securityProtocol}," +
s"principal:${context.principal}," +
s"listener:${context.listenerName.value}")
val responseString = response.responseAsString.getOrElse(
throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))

val builder = new StringBuilder(256)
builder.append("Completed request:").append(requestDesc(detailsEnabled))
.append(",response:").append(responseString)
.append(" from connection ").append(context.connectionId)
.append(";totalTime:").append(totalTimeMs)
.append(",requestQueueTime:").append(requestQueueTimeMs)
.append(",localTime:").append(apiLocalTimeMs)
.append(",remoteTime:").append(apiRemoteTimeMs)
.append(",throttleTime:").append(apiThrottleTimeMs)
.append(",responseQueueTime:").append(responseQueueTimeMs)
.append(",sendTime:").append(responseSendTimeMs)
.append(",securityProtocol:").append(context.securityProtocol)
.append(",principal:").append(session.principal)
.append(",listener:").append(context.listenerName.value)
requestLogger.debug(builder.toString)
}
}

Expand All @@ -183,13 +191,16 @@ object RequestChannel extends Logging {

}

class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction) {
/** responseAsString should only be defined if request logging is enabled */
class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction,
val responseAsString: Option[String]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of unfortunate but I guess it's a consequence of only having the Send at this point. Not seeing any great alternatives, but I guess we could build the string representation into the Send object itself. That or maybe we should just keep around the Response object a bit longer when request logging is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have various Send implementations thought, so it seems like it would be messier than this. Or am I missing something?

I avoided retaining the actual response object as it could be pretty large in some cases. An example is a down converted FetchResponse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's really only 2-3 implementations of Send. What I had in mind was potentially adding the string representation as a member field which was set upon construction and surfaced through toString. Might not be any better than this option though, just shifts the ugliness elsewhere.

Copy link
Contributor Author

@ijuma ijuma Sep 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 5 implementations of Send although not sure if we'd have to do it for all of them. Also, we only want to compute the toString eagerly if request logging is enabled. From that perspective, it seemed best to leave this ugliness in the RequestChannel classes instead of spreading it through other classes that shouldn't care about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. Just thinking through the other options.

For what it's worth, it seems like the only reason we keep around the request body is to log this message. That means we could do a similar trick and convert it to a string. I guess this is less important because we already have logic to free the records in produce requests though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll file a separate JIRA for the request body changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds

def processor: Int = request.processor

override def toString = s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction)"
override def toString =
s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction), responseAsString=$responseAsString"
}

trait ResponseAction
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.protocol.types.SchemaException
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.utils.{KafkaThread, Time}
Expand Down Expand Up @@ -484,7 +484,7 @@ private[kafka] class Processor(val id: Int,
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
updateRequestMetrics(curr.request)
updateRequestMetrics(curr)
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.context.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
Expand All @@ -494,7 +494,7 @@ private[kafka] class Processor(val id: Int,
throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
sendResponse(curr, responseSend)
case RequestChannel.CloseConnectionAction =>
updateRequestMetrics(curr.request)
updateRequestMetrics(curr)
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.context.connectionId)
}
Expand All @@ -511,7 +511,7 @@ private[kafka] class Processor(val id: Int,
// `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
if (channel(connectionId).isEmpty) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
response.request.updateRequestMetrics(0L)
response.request.updateRequestMetrics(0L, response)
}
// Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
// removed from the Selector after discarding any pending staged receives.
Expand Down Expand Up @@ -561,22 +561,23 @@ private[kafka] class Processor(val id: Int,
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
updateRequestMetrics(resp.request)
updateRequestMetrics(resp)
selector.unmute(send.destination)
}
}

private def updateRequestMetrics(request: RequestChannel.Request) {
private def updateRequestMetrics(response: RequestChannel.Response) {
val request = response.request
val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
request.updateRequestMetrics(networkThreadTimeNanos)
request.updateRequestMetrics(networkThreadTimeNanos, response)
}

private def processDisconnected() {
selector.disconnected.keySet.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2001,16 +2001,19 @@ class KafkaApis(val requestChannel: RequestChannel,
private def closeConnection(request: RequestChannel.Request): Unit = {
// This case is used when the request handler has encountered an error, but the client
// does not expect a response (e.g. when produce request has acks set to 0)
requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction))
requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None))
}

private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction))
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.header.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
case None =>
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction))
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class KafkaRequestHandler(id: Int,

req match {
case RequestChannel.ShutdownRequest =>
debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
latch.countDown()
return

case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, request))
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
apis.handle(request)
} catch {
case e: FatalExitError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
}, 2, TimeUnit.SECONDS)
consumer.poll(0)

def sendRecords(numRecords: Int, topic: String = this.topic) {
def sendRecords(numRecords: Int, topic: String) {
var remainingRecords = numRecords
val endTimeMs = System.currentTimeMillis + 20000
while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
}

private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
numRecords: Int,
startingOffset: Int = 0,
topic: String = topic,
part: Int = part) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TransactionsTest extends KafkaServerTestHarness {
consumer
}

private def createReadUncommittedConsumer(group: String = "group") = {
private def createReadUncommittedConsumer(group: String) = {
val props = new Properties()
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
}

private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
counter += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
cleanerManager
}

private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
private def createLog(segmentSize: Int, cleanupPolicy: String): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
logProps.put(LogConfig.RetentionMsProp, 1: Integer)
Expand All @@ -243,7 +243,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
log
}

private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
private def makeLog(dir: File = logDir, config: LogConfig) =
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1270,12 +1270,12 @@ class LogCleanerTest extends JUnitSuite {
partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
}

private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = {
private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Seq[Int] => LogAppendInfo = {
appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
}

private def appendIdempotentAsLeader(log: Log, producerId: Long,
producerEpoch: Short = 0,
producerEpoch: Short,
isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = {
var sequence = 0
keys: Seq[Int] => {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class LogSegmentTest {
private def endTxnRecords(controlRecordType: ControlRecordType,
producerId: Long,
producerEpoch: Short,
offset: Long = 0L,
offset: Long,
partitionLeaderEpoch: Int = 0,
coordinatorEpoch: Int = 0,
timestamp: Long = RecordBatch.NO_TIMESTAMP): MemoryRecords = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,9 @@ class LogValidatorTest {
isFromClient = true)
}

private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
private def createRecords(magicValue: Byte,
timestamp: Long = RecordBatch.NO_TIMESTAMP,
codec: CompressionType = CompressionType.NONE): MemoryRecords = {
codec: CompressionType): MemoryRecords = {
val buf = ByteBuffer.allocate(512)
val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
Expand Down