Skip to content

Commit

Permalink
Make timestamp fields of type time.Time and time.Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-arista committed Nov 1, 2017
1 parent 5959a18 commit ed80c61
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 60 deletions.
8 changes: 1 addition & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,20 +532,14 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons
}
prelude = false

millis := batch.FirstTimestamp + rec.TimestampDelta
timestamp := time.Time{}
if millis >= 0 {
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}

if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: timestamp,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Headers: rec.Headers,
})
child.offset = offset + 1
Expand Down
22 changes: 3 additions & 19 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error {
pe.putInt8(attributes)

if m.Version >= 1 {
timestamp := int64(-1)

if !m.Timestamp.Before(time.Unix(0, 0)) {
timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
} else if !m.Timestamp.IsZero() {
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
return err
}

pe.putInt64(timestamp)
}

err := pe.putBytes(m.Key)
Expand Down Expand Up @@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) {
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version == 1 {
millis, err := pd.getInt64()
if err != nil {
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
return err
}

// negative timestamps are invalid, in these cases we should return
// a zero time
timestamp := time.Time{}
if millis >= 0 {
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}

m.Timestamp = timestamp
}

m.Key, err = pd.getBytes()
Expand Down
10 changes: 7 additions & 3 deletions record.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

const (
controlMask = 0x20
)
Expand Down Expand Up @@ -29,7 +31,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) {

type Record struct {
Attributes int8
TimestampDelta int64
TimestampDelta time.Duration
OffsetDelta int64
Key []byte
Value []byte
Expand All @@ -41,7 +43,7 @@ type Record struct {
func (r *Record) encode(pe packetEncoder) error {
pe.push(&r.length)
pe.putInt8(r.Attributes)
pe.putVarint(r.TimestampDelta)
pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
pe.putVarint(r.OffsetDelta)
if err := pe.putVarintBytes(r.Key); err != nil {
return err
Expand Down Expand Up @@ -69,9 +71,11 @@ func (r *Record) decode(pd packetDecoder) (err error) {
return err
}

if r.TimestampDelta, err = pd.getVarint(); err != nil {
timestamp, err := pd.getVarint()
if err != nil {
return err
}
r.TimestampDelta = time.Duration(timestamp) * time.Millisecond

if r.OffsetDelta, err = pd.getVarint(); err != nil {
return err
Expand Down
20 changes: 14 additions & 6 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"fmt"
"io/ioutil"
"time"

"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
Expand Down Expand Up @@ -41,8 +42,8 @@ type RecordBatch struct {
Codec CompressionCodec
Control bool
LastOffsetDelta int32
FirstTimestamp int64
MaxTimestamp int64
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Expand All @@ -64,8 +65,15 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
pe.push(newCRC32Field(crcCastagnoli))
pe.putInt16(b.computeAttributes())
pe.putInt32(b.LastOffsetDelta)
pe.putInt64(b.FirstTimestamp)
pe.putInt64(b.MaxTimestamp)

if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
return err
}

if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
return err
}

pe.putInt64(b.ProducerID)
pe.putInt16(b.ProducerEpoch)
pe.putInt32(b.FirstSequence)
Expand Down Expand Up @@ -122,11 +130,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
return err
}

if b.FirstTimestamp, err = pd.getInt64(); err != nil {
if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
return err
}

if b.MaxTimestamp, err = pd.getInt64(); err != nil {
if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
return err
}

Expand Down
66 changes: 41 additions & 25 deletions record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
)
Expand All @@ -17,8 +18,13 @@ var recordBatchTestCases = []struct {
oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
}{
{
name: "empty record",
batch: RecordBatch{Version: 2, Records: []*Record{}},
name: "empty record",
batch: RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(0, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{},
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 49, // Length
Expand All @@ -36,8 +42,14 @@ var recordBatchTestCases = []struct {
},
},
{
name: "control batch",
batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
name: "control batch",
batch: RecordBatch{
Version: 2,
Control: true,
FirstTimestamp: time.Unix(0, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{},
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 49, // Length
Expand All @@ -58,9 +70,10 @@ var recordBatchTestCases = []struct {
name: "uncompressed record",
batch: RecordBatch{
Version: 2,
FirstTimestamp: 10,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5,
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Expand All @@ -74,10 +87,10 @@ var recordBatchTestCases = []struct {
0, 0, 0, 70, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
219, 71, 20, 201, // CRC
84, 121, 97, 253, // CRC
0, 0, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
Expand All @@ -103,9 +116,10 @@ var recordBatchTestCases = []struct {
batch: RecordBatch{
Version: 2,
Codec: CompressionGZIP,
FirstTimestamp: 10,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5,
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Expand All @@ -118,11 +132,11 @@ var recordBatchTestCases = []struct {
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 94, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
15, 156, 184, 78, // CRC
2, // Version
159, 236, 182, 189, // CRC
0, 1, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
Expand All @@ -136,10 +150,10 @@ var recordBatchTestCases = []struct {
0, 0, 0, 94, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
144, 168, 0, 33, // CRC
0, 216, 14, 210, // CRC
0, 1, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
Expand All @@ -154,9 +168,10 @@ var recordBatchTestCases = []struct {
batch: RecordBatch{
Version: 2,
Codec: CompressionSnappy,
FirstTimestamp: 10,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5,
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Expand All @@ -169,11 +184,11 @@ var recordBatchTestCases = []struct {
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 72, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
95, 173, 35, 17, // CRC
2, // Version
21, 0, 159, 97, // CRC
0, 2, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
Expand All @@ -187,9 +202,10 @@ var recordBatchTestCases = []struct {
batch: RecordBatch{
Version: 2,
Codec: CompressionLZ4,
FirstTimestamp: 10,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5,
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Expand All @@ -202,11 +218,11 @@ var recordBatchTestCases = []struct {
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 89, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
129, 238, 43, 82, // CRC
2, // Version
169, 74, 119, 197, // CRC
0, 3, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
Expand Down
40 changes: 40 additions & 0 deletions timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sarama

import (
"fmt"
"time"
)

type Timestamp struct {
*time.Time
}

func (t Timestamp) encode(pe packetEncoder) error {
timestamp := int64(-1)

if !t.Before(time.Unix(0, 0)) {
timestamp = t.UnixNano() / int64(time.Millisecond)
} else if !t.IsZero() {
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)}
}

pe.putInt64(timestamp)
return nil
}

func (t Timestamp) decode(pd packetDecoder) error {
millis, err := pd.getInt64()
if err != nil {
return err
}

// negative timestamps are invalid, in these cases we should return
// a zero time
timestamp := time.Time{}
if millis >= 0 {
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}

*t.Time = timestamp
return nil
}

0 comments on commit ed80c61

Please sign in to comment.