/
DefaultMessageDeserializer.go
107 lines (85 loc) · 2.23 KB
/
DefaultMessageDeserializer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package consume
import (
"strconv"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/deviceinsight/kafkactl/v5/internal/output"
)
type DefaultMessageDeserializer struct {
}
type defaultMessage struct {
Partition int32
Offset int64
Headers map[string]string `json:",omitempty" yaml:",omitempty"`
Key *string `json:",omitempty" yaml:",omitempty"`
Value *string
Timestamp *time.Time `json:",omitempty" yaml:",omitempty"`
}
func (deserializer DefaultMessageDeserializer) newDefaultMessage(msg *sarama.ConsumerMessage, flags Flags) defaultMessage {
var key *string
var timestamp *time.Time
var headers map[string]string
var value = encodeBytes(msg.Value, flags.EncodeValue)
if flags.PrintKeys {
key = encodeBytes(msg.Key, flags.EncodeKey)
}
if flags.PrintTimestamps && !msg.Timestamp.IsZero() {
timestamp = &msg.Timestamp
}
if flags.PrintHeaders {
headers = encodeRecordHeaders(msg.Headers)
}
return defaultMessage{
Partition: msg.Partition,
Offset: msg.Offset,
Headers: headers,
Key: key,
Value: value,
Timestamp: timestamp,
}
}
func (deserializer DefaultMessageDeserializer) CanDeserialize(_ string) (bool, error) {
return true, nil
}
func (deserializer DefaultMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error {
msg := deserializer.newDefaultMessage(rawMsg, flags)
if flags.OutputFormat == "" {
var row []string
if flags.PrintHeaders {
if msg.Headers != nil {
column := toSortedArray(msg.Headers)
row = append(row, strings.Join(column[:], ","))
} else {
row = append(row, "")
}
}
if flags.PrintPartitions {
row = append(row, strconv.Itoa(int(msg.Partition)))
}
if flags.PrintKeys {
if msg.Key != nil {
row = append(row, *msg.Key)
} else {
row = append(row, "")
}
}
if flags.PrintTimestamps {
if msg.Timestamp != nil {
row = append(row, (*msg.Timestamp).Format(time.RFC3339))
} else {
row = append(row, "")
}
}
var value string
if msg.Value != nil {
value = *msg.Value
} else {
value = "null"
}
row = append(row, value)
output.PrintStrings(strings.Join(row[:], flags.Separator))
return nil
}
return output.PrintObject(msg, flags.OutputFormat)
}