forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messaging.go
155 lines (123 loc) · 3.63 KB
/
messaging.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package test
import (
"io"
"net/url"
"sync"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/messaging"
)
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
messagesByTopicID map[uint64][]*messaging.Message // message by topic
PublishFunc func(*messaging.Message) (uint64, error)
ConnFunc func(topicID uint64) influxdb.MessagingConn
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
c := &MessagingClient{
messagesByTopicID: make(map[uint64][]*messaging.Message),
}
c.PublishFunc = c.DefaultPublishFunc
c.ConnFunc = c.DefaultConnFunc
return c
}
func (c *MessagingClient) Open(path string) error { return nil }
// Close closes all open connections.
func (c *MessagingClient) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
for _, conn := range c.conns {
conn.Close()
}
return nil
}
func (c *MessagingClient) URLs() []url.URL { return []url.URL{{Host: "local"}} }
func (c *MessagingClient) SetURLs([]url.URL) {}
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { return c.PublishFunc(m) }
// DefaultPublishFunc sets an autoincrementing index on the message and sends it to each topic connection.
func (c *MessagingClient) DefaultPublishFunc(m *messaging.Message) (uint64, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Increment index and assign it to message.
c.index++
m.Index = c.index
// Append message to the topic.
c.messagesByTopicID[m.TopicID] = append(c.messagesByTopicID[m.TopicID], m)
// Send to each connection for the topic.
for _, conn := range c.conns {
if conn.topicID == m.TopicID {
conn.Send(m)
}
}
return m.Index, nil
}
func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}
// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()
defer c.mu.Unlock()
// Create new connection.
conn := NewMessagingConn(topicID)
// Track connections.
c.conns = append(c.conns, conn)
return conn
}
// Sync blocks until a given index has been sent through the client.
func (c *MessagingClient) Sync(index uint64) {
for {
c.mu.Lock()
if c.index >= index {
c.mu.Unlock()
time.Sleep(10 * time.Millisecond)
return
}
c.mu.Unlock()
// Otherwise wait momentarily and check again.
time.Sleep(1 * time.Millisecond)
}
}
func (c *MessagingClient) SetLogOutput(_ io.Writer) {}
// MessagingConn represents a mockable connection implementing influxdb.MessagingConn.
type MessagingConn struct {
mu sync.Mutex
topicID uint64
index uint64
c chan *messaging.Message
}
// NewMessagingConn returns a new instance of MessagingConn.
func NewMessagingConn(topicID uint64) *MessagingConn {
return &MessagingConn{
topicID: topicID,
}
}
// Open starts the stream from a given index.
func (c *MessagingConn) Open(index uint64, streaming bool) error {
// TODO: Fill connection stream with existing messages.
c.c = make(chan *messaging.Message, 1024)
return nil
}
// Close closes the streaming channel.
func (c *MessagingConn) Close() error {
close(c.c)
return nil
}
// C returns a channel for streaming message.
func (c *MessagingConn) C() <-chan *messaging.Message { return c.c }
func (c *MessagingConn) Send(m *messaging.Message) {
// Ignore any old messages.
c.mu.Lock()
if m.Index <= c.index {
c.mu.Unlock()
return
}
c.index = m.Index
c.mu.Unlock()
// Send message to channel.
c.c <- m
}