Permalink
Browse files

MirrorMaker with shallow.iterator.enable=true produces unreadble mess…

…ages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732
  • Loading branch information...
1 parent eae1bd5 commit 771760ce23f00ba86b916420d8e209b2611b23c0 @junrao junrao committed Mar 7, 2013
@@ -125,12 +125,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
- /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
- * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
- * overhead of decompression.
- * */
- val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
-
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
@@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean,
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
@@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
.format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
- localCurrent =
- if (enableShallowIterator)
- currentDataChunk.messages.shallowIterator
- else
- currentDataChunk.messages.iterator
+ localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}
@@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean,
val clientId: String)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[K,V] =
- new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
+ new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId)
/**
* Create an iterator over messages in the stream.
@@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
- queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList
@@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
- config.shallowIteratorEnable,
config.clientId)
(queue, stream)
}).toList
@@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
consumerConfig.consumerTimeoutMs,
new StringDecoder(),
new StringDecoder(),
- enableShallowIterator = false,
clientId = "")
val receivedMessages = (0 until 5).map(i => iter.next.message).toList

0 comments on commit 771760c

Please sign in to comment.