forked from micro/micro
-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.go
executable file
·128 lines (109 loc) · 2.87 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package client
import (
gocontext "context"
"encoding/json"
"time"
pb "github.com/2637309949/micro/v3/proto/events"
"github.com/2637309949/micro/v3/service/client"
"github.com/2637309949/micro/v3/service/context"
"github.com/2637309949/micro/v3/service/events"
"github.com/2637309949/micro/v3/service/events/util"
log "github.com/2637309949/micro/v3/service/logger"
)
// NewStream returns an initialized stream service
func NewStream() events.Stream {
return new(stream)
}
type stream struct {
Client pb.StreamService
}
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}
// execute the RPC
_, err := s.client().Publish(context.DefaultContext, &pb.PublishRequest{
Topic: topic,
Payload: payload,
Metadata: options.Metadata,
Timestamp: options.Timestamp.Unix(),
}, client.WithAuthToken())
return err
}
func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) {
// parse options
options := events.ConsumeOptions{
AutoAck: true,
Context: gocontext.TODO(),
}
for _, o := range opts {
o(&options)
}
subReq := &pb.ConsumeRequest{
Topic: topic,
Group: options.Group,
Offset: options.Offset.Unix(),
AutoAck: options.AutoAck,
AckWait: options.AckWait.Nanoseconds(),
RetryLimit: int64(options.GetRetryLimit()),
}
// start the stream
// TODO: potentially pass in the context defined by the user
stream, err := s.client().Consume(context.DefaultContext, subReq, client.WithAuthToken())
if err != nil {
return nil, err
}
evChan := make(chan events.Event)
go func() {
for {
ev, err := stream.Recv()
if err != nil {
log.Errorf("Error receiving from stream %s", err)
close(evChan)
stream.Close()
return
}
evt := util.DeserializeEvent(ev)
if !options.AutoAck {
evt.SetNackFunc(func() error {
return stream.SendMsg(&pb.AckRequest{Id: evt.ID, Success: false})
})
evt.SetAckFunc(func() error {
return stream.SendMsg(&pb.AckRequest{Id: evt.ID, Success: true})
})
}
select {
case evChan <- evt:
case <-options.Context.Done():
log.Info("Consuming stream context canceled")
close(evChan)
stream.Close()
return
}
}
}()
return evChan, nil
}
// this is a tmp solution since the client isn't initialized when NewStream is called. There is a
// fix in the works in another PR.
func (s *stream) client() pb.StreamService {
if s.Client == nil {
s.Client = pb.NewStreamService("events", client.DefaultClient)
}
return s.Client
}