From bfb253b28be5a8367533aa36712d9a0bcc443109 Mon Sep 17 00:00:00 2001 From: Kostya Golikov Date: Fri, 7 Aug 2015 18:03:02 +0300 Subject: [PATCH] Reducing code duplication and emphasizing pieces that actually changing --- .../producer/async/DefaultEventHandler.scala | 71 +++++++++---------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index a6179a902f7fa..6061702a006cb 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -122,14 +122,17 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = { - val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size) - events.foreach{e => + def serialize(rawKeyedMessages: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = { + val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](rawKeyedMessages.size) + rawKeyedMessages.foreach { keyedMessage => try { - if(e.hasKey) - serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) - else - serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) + val serializedMessage = + if(keyedMessage.hasKey) + new Message(key = keyEncoder.toBytes(keyedMessage.key), bytes = encoder.toBytes(keyedMessage.message)) + else + new Message(bytes = encoder.toBytes(keyedMessage.message)) + + serializedMessages += keyedMessage.copy(message = serializedMessage) } catch { case t: Throwable => producerStats.serializationErrorRate.mark() @@ -138,7 +141,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } else { // currently, if in async mode, we just log the serialization error. We need to revisit // this when doing kafka-496 - error("Error serializing message for topic %s".format(e.topic), t) + error("Error serializing message for topic %s".format(keyedMessage.topic), t) } } } @@ -177,7 +180,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, dataPerTopicPartition.append(message) } Some(ret) - }catch { // Swallow recoverable exceptions and return None so that they can be retried. + } catch { // Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None @@ -293,40 +296,34 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = { - /** enforce the compressed.topics config here. - * If the compression codec is anything other than NoCompressionCodec, - * Enable compression only for specified topics if any - * If the list of compressed topics is empty, then enable the specified compression codec for all topics - * If the compression codec is NoCompressionCodec, compression is disabled for all topics - */ try { val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) => val rawMessages = messages.map(_.message) - (topicAndPartition, - config.compressionCodec match { - case NoCompressionCodec => - debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) - new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) - case _ => - config.compressedTopics.size match { - case 0 => + val codec = config.compressionCodec match { + case NoCompressionCodec => + debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) + NoCompressionCodec + case _ => + config.compressedTopics.size match { + case 0 => + debug("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicAndPartition)) + config.compressionCodec + case _ => + if (config.compressedTopics.contains(topicAndPartition.topic)) { debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) - new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - case _ => - if (config.compressedTopics.contains(topicAndPartition.topic)) { - debug("Sending %d messages with compression codec %d to %s" - .format(messages.size, config.compressionCodec.codec, topicAndPartition)) - new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - } - else { - debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" - .format(messages.size, topicAndPartition, config.compressedTopics.toString)) - new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) - } - } + config.compressionCodec + } + else { + debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" + .format(messages.size, topicAndPartition, config.compressedTopics)) + NoCompressionCodec + } + } } - ) + + topicAndPartition -> new ByteBufferMessageSet(codec, rawMessages: _*) } Some(messagesPerTopicPartition) } catch {