/
consumer.go
42 lines (34 loc) · 944 Bytes
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package inmem
import (
"time"
"github.com/blendle/go-streamprocessor/stream"
)
// NewConsumer returns a consumer that can iterate over messages on a stream.
func (c *Client) NewConsumer() stream.Consumer {
consumer := &Consumer{messages: make(chan *stream.Message)}
topic := c.store.NewTopic(c.ConsumerTopic)
go func() {
defer close(consumer.messages)
for _, msg := range topic.Messages() {
consumer.messages <- &stream.Message{
Value: msg.Value,
Key: msg.Key,
Timestamp: time.Now(),
}
}
}()
return consumer
}
// Consumer implements the stream.Consumer interface for standardstream.
type Consumer struct {
messages chan *stream.Message
}
// Messages returns the read channel for the messages that are returned by the
// stream.
func (c *Consumer) Messages() <-chan *stream.Message {
return c.messages
}
// Close closes the consumer connection.
func (c *Consumer) Close() error {
return nil
}