/
model.go
108 lines (89 loc) · 2.62 KB
/
model.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package db
import (
"strconv"
"github.com/Shopify/Sarama"
"github.com/vmihailenco/msgpack"
)
type Paging struct {
Limit int `json:"limit"`
Pages []int `json:"pages"`
}
type SearchRequest struct {
Search string `json:"search"`
Earliest int64 `json:"earliest"`
Latest int64 `json:"latest"`
Page int `json:"page"`
Offset []byte `json:"offset"`
Paging *Paging `json:"paging"`
}
type SearchResponse struct {
Total int `json:"total"`
Earliest int64 `json:"earliest"`
Latest int64 `json:"latest"`
Rows []*Message `json:"rows"`
Page int `json:"page"`
OffsetUsed bool `json:"offsetUsed"`
Offsets map[string][]byte `json:"offsets"`
Took int64 `json:"took"`
TimedOut bool `json:"timedOut"`
}
func (r *SearchResponse) TotalPages() int {
if r.Offsets != nil {
return len(r.Offsets)
}
return 0
}
func (r *SearchResponse) CleanOffsets() {
r.Offsets = make(map[string][]byte)
}
func (r *SearchResponse) AddOffset(page int, offset []byte) {
if r.Offsets == nil {
r.Offsets = make(map[string][]byte)
}
r.Offsets[strconv.Itoa(page)] = offset
}
func (r *SearchRequest) SkipRows() int {
return (r.Page - 1) * r.Paging.Limit
}
type Tag struct {
Value string `json:"value"`
Details string `json:"details,omitempty"`
}
type BinaryData struct {
Value []byte `json:"value"`
Size int `json:"size"`
Type string `json:"type"`
}
type Message struct {
Key BinaryData `json:"key"`
Value BinaryData `json:"value"`
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
BlockTimestamp int64 `json:"blockTimestamp"`
Tags map[string]Tag `json:"tags"`
}
func (m *Message) AddTag(tag, value, details string) {
m.Tags[tag] = Tag{Value: value, Details: details}
}
func NewMessage(msg *sarama.ConsumerMessage) *Message {
return &Message{
Key: BinaryData{Value: msg.Key, Size: len(msg.Key)},
Value: BinaryData{Value: msg.Value, Size: len(msg.Value)},
Topic: msg.Topic,
Partition: msg.Partition,
Offset: msg.Offset,
Timestamp: msg.Timestamp.UnixNano(),
BlockTimestamp: msg.BlockTimestamp.UnixNano(),
Tags: make(map[string]Tag),
}
}
func MessageFromBytes(blob []byte) (*Message, error) {
var out Message
err := msgpack.Unmarshal(blob, &out)
return &out, err
}
func (m *Message) ToBytes() ([]byte, error) {
return msgpack.Marshal(&m)
}