/
receiver.go
151 lines (119 loc) · 2.84 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package protonats
import (
"context"
"io"
"sync"
cn "github.com/cloudevents/sdk-go/protocol/nats/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/nats-io/nats.go"
)
type NatsReceiver interface {
protocol.Receiver
}
var _ protocol.Receiver = (*Receiver)(nil)
type Receiver struct {
incoming <-chan *nats.Msg
}
func NewReceiver(ch <-chan *nats.Msg) NatsReceiver {
return &Receiver{
incoming: ch,
}
}
func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) {
select {
case in, ok := <-r.incoming:
if !ok {
return nil, io.EOF
}
return cn.NewMessage(in), nil
case <-ctx.Done():
return nil, io.EOF
}
}
type Consumer struct {
NatsReceiver
// receivers chan
ch chan *nats.Msg
Conn *nats.Conn
Subject string
Subscriber Subscriber
subMtx sync.Mutex
internalClose chan struct{}
connOwned bool
}
func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error) {
conn, err := nats.Connect(url, natsOpts...)
if err != nil {
return nil, err
}
c, err := NewConsumerFromConn(conn, subject, opts...)
if err != nil {
conn.Close()
return nil, err
}
c.connOwned = true
return c, err
}
func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error) {
ch := make(chan *nats.Msg)
c := &Consumer{
ch: ch,
NatsReceiver: NewReceiver(ch),
Conn: conn,
Subject: subject,
Subscriber: &RegularSubscriber{},
internalClose: make(chan struct{}, 1),
}
err := c.applyOptions(opts...)
if err != nil {
return nil, err
}
return c, nil
}
func (c *Consumer) OpenInbound(ctx context.Context) error {
c.subMtx.Lock()
defer c.subMtx.Unlock()
// Subscribe
sub, err := c.Subscriber.Subscribe(c.Conn, c.Subject, c.ch)
if err != nil {
return err
}
// Wait until external or internal context done
select {
case <-ctx.Done():
case <-c.internalClose:
}
// Finish to consume messages in the queue and close the subscription
return sub.Drain()
}
func (c *Consumer) Close(_ context.Context) error {
// Before closing, let's be sure OpenInbound completes
// We send a signal to close and then we lock on subMtx in order
// to wait OpenInbound to finish draining the queue
c.internalClose <- struct{}{}
c.subMtx.Lock()
defer c.subMtx.Unlock()
if c.connOwned {
c.Conn.Close()
}
close(c.internalClose)
close(c.ch)
return nil
}
type ConsumerOption func(*Consumer) error
func (c *Consumer) applyOptions(opts ...ConsumerOption) error {
for _, fn := range opts {
if err := fn(c); err != nil {
return err
}
}
return nil
}
var _ protocol.Opener = (*Consumer)(nil)
var _ protocol.Receiver = (*Consumer)(nil)
var _ protocol.Closer = (*Consumer)(nil)