/
mock.go
148 lines (121 loc) · 3.22 KB
/
mock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package brokers
import (
"context"
"strconv"
"errors"
"fmt"
"github.com/ARGOeu/argo-messaging/messages"
"strings"
"time"
)
// MockBroker struct
type MockBroker struct {
MsgList []string
Topics map[string]string
TopicTimeIndices map[string][]TimeToOffset
}
type TimeToOffset struct {
Timestamp time.Time
Offset int64
}
// PopulateOne Adds three messages to the mock broker
func (b *MockBroker) PopulateOne() {
msg1 := `{
"messageId": "0",
"attributes":
{
"foo":"bar"
},
"data": "YmFzZTY0ZW5jb2RlZA==",
"publishTime": "2016-02-24T11:55:09.786127994Z"
}`
b.MsgList = make([]string, 0)
b.MsgList = append(b.MsgList, msg1)
}
// PopulateThree Adds three messages to the mock broker
func (b *MockBroker) PopulateThree() {
msg1 := `{
"messageId": "0",
"attributes":
{
"foo":"bar"
},
"data": "YmFzZTY0ZW5jb2RlZA==",
"publishTime": "2016-02-24T11:55:09.786127994Z"
}`
msg2 := `{
"messageId": "1",
"attributes":
{
"foo2":"bar2"
},
"data": "YmFzZTY0ZW5jb2RlZA==",
"publishTime": "2016-02-24T11:55:09.827678754Z"
}`
msg3 := `{
"messageId": "2",
"attributes":
{
"foo2":"bar2"
},
"data": "YmFzZTY0ZW5jb2RlZA==",
"publishTime": "2016-02-24T11:55:09.830417467Z"
}`
b.MsgList = make([]string, 0)
b.MsgList = append(b.MsgList, msg1)
b.MsgList = append(b.MsgList, msg2)
b.MsgList = append(b.MsgList, msg3)
}
// CloseConnections closes open producer, consumer and client
func (b *MockBroker) CloseConnections() {
}
// InitConfig creates a new configuration for kafka broker
func (b *MockBroker) InitConfig() {
}
// Initialize the broker struct
func (b *MockBroker) Initialize(peers []string) {
b.MsgList = make([]string, 0)
}
// Publish function publish a message to the broker
func (b *MockBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error) {
payload, _ := msg.ExportJSON()
b.MsgList = append(b.MsgList, payload)
off := b.GetMaxOffset(topic) - 1
msgID := strconv.FormatInt(off, 10)
// split the name that SHOULD come in the form of project_uuid.topic_name
s := strings.Split(topic, ".")
return msgID, fmt.Sprintf("%s.%s", s[0], s[1]), 0, int64(len(b.MsgList)), nil
}
// GetOffset returns a current topic's offset
func (b *MockBroker) GetMaxOffset(topic string) int64 {
return int64(len(b.MsgList) + 1)
}
// GetOffset returns a current topic's offset
func (b *MockBroker) GetMinOffset(topic string) int64 {
return int64(len(b.MsgList))
}
// Consume function to consume a message from the broker
func (b *MockBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error) {
return b.MsgList, nil
}
// Delete topic from the broker
func (b *MockBroker) DeleteTopic(topic string) error {
_, ok := b.Topics[topic]
if !ok {
return errors.New("topic not found on the broker")
}
delete(b.Topics, topic)
return nil
}
func (b *MockBroker) TimeToOffset(topic string, time time.Time) (int64, error) {
topicTimeIndices, ok := b.TopicTimeIndices[topic]
if !ok {
return -1, errors.New("topic not found on the broker")
}
for _, item := range topicTimeIndices {
if item.Timestamp.Equal(time) || item.Timestamp.After(time) {
return item.Offset, nil
}
}
return -1, nil
}