From 4a6d9b8f13158780ef75ef5a5d160f3bc01f5bb1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 25 Aug 2017 11:16:03 +0100 Subject: [PATCH] MINOR: Remove try/finally from processNewResponses --- .../scala/kafka/network/SocketServer.scala | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e6f6662a21b81..37382e4f294d2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -476,29 +476,25 @@ private[kafka] class Processor(val id: Int, } private def processNewResponses() { - var curr = requestChannel.receiveResponse(id) - while (curr != null) { - try { - curr.responseAction match { - case RequestChannel.NoOpAction => - // There is no response to send to the client, we need to read more pipelined requests - // that are sitting in the server's socket buffer - updateRequestMetrics(curr.request) - trace("Socket server received empty response to send, registering for read: " + curr) - val channelId = curr.request.connectionId - if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) - selector.unmute(channelId) - case RequestChannel.SendAction => - val responseSend = curr.responseSend.getOrElse( - throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) - sendResponse(curr, responseSend) - case RequestChannel.CloseConnectionAction => - updateRequestMetrics(curr.request) - trace("Closing socket connection actively according to the response code.") - close(selector, curr.request.connectionId) - } - } finally { - curr = requestChannel.receiveResponse(id) + var curr: RequestChannel.Response = null + while ({curr = requestChannel.receiveResponse(id); curr != null}) { + curr.responseAction match { + case RequestChannel.NoOpAction => + // There is no response to send to the client, we need to read more pipelined requests + // that are sitting in the server's socket buffer + updateRequestMetrics(curr.request) + trace("Socket server received empty response to send, registering for read: " + curr) + val channelId = curr.request.connectionId + if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) + selector.unmute(channelId) + case RequestChannel.SendAction => + val responseSend = curr.responseSend.getOrElse( + throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) + sendResponse(curr, responseSend) + case RequestChannel.CloseConnectionAction => + updateRequestMetrics(curr.request) + trace("Closing socket connection actively according to the response code.") + close(selector, curr.request.connectionId) } } }