/
consumer_partition_offset.go
171 lines (152 loc) · 5.44 KB
/
consumer_partition_offset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package kafka
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
"time"
log "github.com/sirupsen/logrus"
)
// ConsumerPartitionOffset represents a consumer group commit which can be decoded from the consumer_offsets topic
type ConsumerPartitionOffset struct {
Group string
Topic string
Partition int32
Offset int64
Timestamp time.Time
}
type offsetValue struct {
Offset int64
Timestamp int64
}
// newConsumerPartitionOffset decodes a key and value buffer to ConsumerPartitionOffset entry
func newConsumerPartitionOffset(key *bytes.Buffer, value *bytes.Buffer, logger *log.Entry) (*ConsumerPartitionOffset, error) {
var err error
entry := ConsumerPartitionOffset{}
// Decode key which contains group, topic and partition information first
entry.Group, err = readString(key)
if err != nil {
logger.WithFields(log.Fields{
"error": err.Error(),
}).Error("failed to decode group from consumer offset")
return nil, fmt.Errorf("could not decode group from offset key buffer: %v", err)
}
entry.Topic, err = readString(key)
if err != nil {
logger.WithFields(log.Fields{
"group": entry.Group,
"error": err.Error(),
}).Error("failed to decode group from consumer offset")
return nil, fmt.Errorf("could not decode topic from offset key buffer: %v", err)
}
err = binary.Read(key, binary.BigEndian, &entry.Partition)
if err != nil {
logger.WithFields(log.Fields{
"group": entry.Group,
"topic": entry.Topic,
"error": err.Error(),
}).Error("failed to decode partition from consumer offset")
return nil, fmt.Errorf("could not decode partition from offset key buffer: %v", err)
}
offsetLogger := logger.WithFields(log.Fields{
"message_type": "offset",
"group": entry.Group,
"topic": entry.Topic,
"partition": entry.Partition,
})
// Decode value
// Decode value version so that we decode the message correctly
var valueVersion int16
err = binary.Read(value, binary.BigEndian, &valueVersion)
if err != nil {
offsetLogger.WithFields(log.Fields{
"reason": "no value version",
}).Warn("failed to decode")
return nil, fmt.Errorf("message value has no version")
}
offsetCommit.WithLabelValues(strconv.Itoa(int(valueVersion))).Add(1)
// Decode message value using the right decoding function for given version
var decodedValue offsetValue
switch valueVersion {
case 0, 1:
decodedValue, err = decodeOffsetValueV0(value, offsetLogger.WithField("value_version", valueVersion))
case 3:
decodedValue, err = decodeOffsetValueV3(value, offsetLogger.WithField("value_version", valueVersion))
default:
err = fmt.Errorf("unknown value version to decode offsetValue. Given version: '%v'", valueVersion)
}
if err != nil {
return nil, err
}
entry.Offset = decodedValue.Offset
entry.Timestamp = time.Unix(0, decodedValue.Timestamp*1e6)
return &entry, nil
}
func decodeOffsetValueV0(value *bytes.Buffer, logger *log.Entry) (offsetValue, error) {
offset := offsetValue{}
err := binary.Read(value, binary.BigEndian, &offset.Offset)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "offset",
"error": err.Error(),
}).Error("failed to decode offset value")
return offset, fmt.Errorf("failed to decode 'offset' field for OffsetValue V0: %v", err)
}
_, err = readString(value)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "metadata",
"error": err.Error(),
}).Error("failed to decode offset value")
return offset, fmt.Errorf("failed to decode 'metadata' field for OffsetValue V0: %v", err)
}
err = binary.Read(value, binary.BigEndian, &offset.Timestamp)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "timestamp",
"error": err.Error(),
}).Error("failed to decode offset value")
return offset, fmt.Errorf("failed to decode 'timestamp' field for OffsetValue V0: %v", err)
}
return offset, nil
}
func decodeOffsetValueV3(value *bytes.Buffer, logger *log.Entry) (offsetValue, error) {
offsetValue := offsetValue{}
err := binary.Read(value, binary.BigEndian, &offsetValue.Offset)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "offset",
"error": err.Error(),
}).Error("failed to decode offset value")
return offsetValue, fmt.Errorf("failed to decode 'offset' field for OffsetValue: %v", err)
}
// leaderEpoch refers to the number of leaders previously assigned by the controller.
// Every time a leader fails, the controller selects the new leader, increments the current "leader epoch" by 1
var leaderEpoch int32
err = binary.Read(value, binary.BigEndian, &leaderEpoch)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "leaderEpoch",
"error": err.Error(),
}).Error("failed to decode offset value")
return offsetValue, fmt.Errorf("failed to decode 'leaderEpoch' field for OffsetValue V3: %v", err)
}
// metadata field contains additional metadata information which can optionally be set by a consumer
_, err = readString(value)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "metadata",
"error": err.Error(),
}).Error("failed to decode offset value")
return offsetValue, fmt.Errorf("failed to decode 'metadata' field for OffsetValue V3: %v", err)
}
err = binary.Read(value, binary.BigEndian, &offsetValue.Timestamp)
if err != nil {
logger.WithFields(log.Fields{
"error_at": "timestamp",
"error": err.Error(),
}).Error("failed to decode offset value")
return offsetValue, fmt.Errorf("failed to decode 'timestamp' field for OffsetValue: %v", err)
}
return offsetValue, nil
}