Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't produce compressed messages with timestamp #757

Closed
niels-s opened this issue Oct 4, 2016 · 1 comment · Fixed by #759
Closed

Can't produce compressed messages with timestamp #757

niels-s opened this issue Oct 4, 2016 · 1 comment · Fixed by #759

Comments

@niels-s
Copy link

niels-s commented Oct 4, 2016

Versions

Sarama Version: Sarama af0513c
Kafka Version: 0.10.0.1
Go Version: 1.7

Configuration

I've got a example snippet over here

For Kafka I'm using the following configuration:

INFO KafkaConfig values:
    advertised.host.name = null
    metric.reporters = []
    quota.producer.default = 9223372036854775807
    offsets.topic.num.partitions = 50
    log.flush.interval.messages = 9223372036854775807
    auto.create.topics.enable = true
    controller.socket.timeout.ms = 30000
    log.flush.interval.ms = null
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    replica.socket.receive.buffer.bytes = 65536
    min.insync.replicas = 1
    replica.fetch.wait.max.ms = 500
    num.recovery.threads.per.data.dir = 1
    ssl.keystore.type = JKS
    sasl.mechanism.inter.broker.protocol = GSSAPI
    default.replication.factor = 1
    ssl.truststore.password = null
    log.preallocate = false
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    fetch.purgatory.purge.interval.requests = 1000
    ssl.endpoint.identification.algorithm = null
    replica.socket.timeout.ms = 30000
    message.max.bytes = 1000012
    num.io.threads = 8
    offsets.commit.required.acks = -1
    log.flush.offset.checkpoint.interval.ms = 60000
    delete.topic.enable = false
    quota.window.size.seconds = 1
    ssl.truststore.type = JKS
    offsets.commit.timeout.ms = 5000
    quota.window.num = 11
    zookeeper.connect = localhost:2181
    authorizer.class.name =
    num.replica.fetchers = 1
    log.retention.ms = null
    log.roll.jitter.hours = 0
    log.cleaner.enable = true
    offsets.load.buffer.size = 5242880
    log.cleaner.delete.retention.ms = 86400000
    ssl.client.auth = none
    controlled.shutdown.max.retries = 3
    queued.max.requests = 500
    offsets.topic.replication.factor = 3
    log.cleaner.threads = 1
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    socket.request.max.bytes = 104857600
    ssl.trustmanager.algorithm = PKIX
    zookeeper.session.timeout.ms = 6000
    log.retention.bytes = -1
    log.message.timestamp.type = CreateTime
    sasl.kerberos.min.time.before.relogin = 60000
    zookeeper.set.acl = false
    connections.max.idle.ms = 600000
    offsets.retention.minutes = 1440
    replica.fetch.backoff.ms = 1000
    inter.broker.protocol.version = 0.10.0-IV1
    log.retention.hours = 168
    num.partitions = 1
    broker.id.generation.enable = true
    listeners = null
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    log.roll.ms = null
    log.flush.scheduler.interval.ms = 9223372036854775807
    ssl.cipher.suites = null
    log.index.size.max.bytes = 10485760
    ssl.keymanager.algorithm = SunX509
    security.inter.broker.protocol = PLAINTEXT
    replica.fetch.max.bytes = 1048576
    advertised.port = null
    log.cleaner.dedupe.buffer.size = 134217728
    replica.high.watermark.checkpoint.interval.ms = 5000
    log.cleaner.io.buffer.size = 524288
    sasl.kerberos.ticket.renew.window.factor = 0.8
    zookeeper.connection.timeout.ms = 6000
    controlled.shutdown.retry.backoff.ms = 5000
    log.roll.hours = 168
    log.cleanup.policy = delete
    host.name =
    log.roll.jitter.ms = null
    max.connections.per.ip = 2147483647
    offsets.topic.segment.bytes = 104857600
    background.threads = 10
    quota.consumer.default = 9223372036854775807
    request.timeout.ms = 30000
    log.message.format.version = 0.10.0-IV1
    log.index.interval.bytes = 4096
    log.dir = /tmp/kafka-logs
    log.segment.bytes = 1073741824
    log.cleaner.backoff.ms = 15000
    offset.metadata.max.bytes = 4096
    ssl.truststore.location = null
    group.max.session.timeout.ms = 300000
    ssl.keystore.password = null
    zookeeper.sync.time.ms = 2000
    port = 9092
    log.retention.minutes = null
    log.segment.delete.delay.ms = 60000
    log.dirs = /tmp/kafka-logs
    controlled.shutdown.enable = true
    compression.type = producer
    max.connections.per.ip.overrides =
    log.message.timestamp.difference.max.ms = 9223372036854775807
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    auto.leader.rebalance.enable = true
    leader.imbalance.check.interval.seconds = 300
    log.cleaner.min.cleanable.ratio = 0.5
    replica.lag.time.max.ms = 10000
    num.network.threads = 3
    ssl.key.password = null
    reserved.broker.max.id = 1000
    metrics.num.samples = 2
    socket.send.buffer.bytes = 102400
    ssl.protocol = TLS
    socket.receive.buffer.bytes = 102400
    ssl.keystore.location = null
    replica.fetch.min.bytes = 1
    broker.rack = null
    unclean.leader.election.enable = true
    sasl.enabled.mechanisms = [GSSAPI]
    group.min.session.timeout.ms = 6000
    log.cleaner.io.buffer.load.factor = 0.9
    offsets.retention.check.interval.ms = 600000
    producer.purgatory.purge.interval.requests = 1000
    metrics.sample.window.ms = 30000
    broker.id = 0
    offsets.topic.compression.codec = 0
    log.retention.check.interval.ms = 300000
    advertised.listeners = null
    leader.imbalance.per.broker.percentage = 10
Logs
[sarama] 2016/10/04 16:11:36 Initializing new client
[sarama] 2016/10/04 16:11:36 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2016/10/04 16:11:36 Connected to broker at localhost:9092 (unregistered)
[sarama] 2016/10/04 16:11:36 client/brokers registered new broker #0 at 192.168.8.101:9092
[sarama] 2016/10/04 16:11:36 Successfully initialized new client
[sarama] 2016/10/04 16:11:36 Producer shutting down.
[sarama] 2016/10/04 16:11:36 producer/broker/0 starting up
[sarama] 2016/10/04 16:11:36 producer/broker/0 state change to [open] on bla/0
[sarama] 2016/10/04 16:11:36 Connected to broker at 192.168.8.101:9092 (registered as #0)
[sarama] 2016/10/04 16:11:36 Closing Client

Kafka logging

[2016-10-04 15:50:52,568] ERROR [Replica Manager on Broker 0]: Error processing append operation on partition test-1 (kafka.server.ReplicaManager)
java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1
    at kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:143)
    at kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:111)
    at kafka.message.ByteBufferMessageSet$$anon$1.<init>(ByteBufferMessageSet.scala:109)
    at kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
    at kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:367)
    at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:380)
    at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:335)
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
    at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:438)
    at kafka.log.Log.liftedTree1$1(Log.scala:339)
    at kafka.log.Log.append(Log.scala:338)
    at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:441)
    at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:427)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:427)
    at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:409)
    at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:395)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:395)
    at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:331)
    at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)
Problem Description

When I try to produce a message with Timestamp set and compression enabled for the producer it will give an error! When I try producing without compression enabled everything goes well and the right timestamp is applied to the kafka message

@rtreffer
Copy link
Contributor

rtreffer commented Oct 5, 2016

I think the bug is at buildRequest in produce_set.go: https://github.com/Shopify/sarama/blob/HEAD/produce_set.go#L93

The lowest timestamp and highest version should be copied to the newly wrapped message. Generating a new wrapped message will have version 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants