Skip to content

Commit

Permalink
Fix compression timestamps (#759)
Browse files Browse the repository at this point in the history
Fix message version/timestamp for compressed messages on kafka >= 0.10.0.0

Use default timestamp of time.Now and message version 1 if possible.

The first messages timestamp of a set is a proxy for the actual
timestamp of the group in many cases.
  • Loading branch information
rtreffer authored and eapache committed Oct 6, 2016
1 parent af0513c commit 133322f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
17 changes: 13 additions & 4 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ func (ps *produceSet) add(msg *ProducerMessage) error {

set.msgs = append(set.msgs, msg)
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) && !msg.Timestamp.IsZero() {
msgToSend.Timestamp = msg.Timestamp
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
if msg.Timestamp.IsZero() {
msgToSend.Timestamp = time.Now()
} else {
msgToSend.Timestamp = msg.Timestamp
}
msgToSend.Version = 1
}
set.setToSend.addMessage(msgToSend)
Expand Down Expand Up @@ -90,11 +94,16 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{
compMsg := &Message{
Codec: ps.parent.conf.Producer.Compression,
Key: nil,
Value: payload,
})
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp
}
req.AddMessage(topic, partition, compMsg)
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,45 @@ func TestProduceSetRequestBuilding(t *testing.T) {
t.Error("Wrong number of topics in request")
}
}

func TestProduceSetCompressedRequestBuilding(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.RequiredAcks = WaitForAll
parent.conf.Producer.Timeout = 10 * time.Second
parent.conf.Producer.Compression = CompressionGZIP
parent.conf.Version = V0_10_0_0

msg := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Timestamp: time.Now(),
}
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
}

req := ps.buildRequest()

if req.Version != 2 {
t.Error("Wrong request version")
}

for _, msgBlock := range req.msgSets["t1"][0].Messages {
msg := msgBlock.Msg
err := msg.decodeSet()
if err != nil {
t.Error("Failed to decode set from payload")
}
for _, compMsgBlock := range msg.Set.Messages {
compMsg := compMsgBlock.Msg
if compMsg.Version != 1 {
t.Error("Wrong compressed message version")
}
}
if msg.Version != 1 {
t.Error("Wrong compressed parent message version")
}
}
}

0 comments on commit 133322f

Please sign in to comment.