forked from thatguystone/cog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eio.go
206 lines (160 loc) · 4.41 KB
/
eio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package eio
import (
"fmt"
"runtime"
"strings"
"github.com/iheartradio/cog"
)
// MakeProducer creates a new Producer
type MakeProducer func(args Args) (Producer, error)
// A Producer writes messages
type Producer interface {
// Send everything here
Produce(b []byte)
// Select on this, otherwise a producer might block
Errs() <-chan error
// Some producers rotate (files). Call this to get them to reopen any
// underlying files.
//
// The error is returned here so that logrotate (and friends) can quickly
// determine if rotation succeeded.
Rotate() error
// Tear the producer down and flush any pending messages.
//
// If you use NewProducer(), this is automatically called when the object
// is GCd.
Close() cog.Errors
}
type producer struct {
Producer
}
// TopicProducer is just like Producer, except it allows you to send events to
// specific topics.
type TopicProducer interface {
Producer
// Send to the given topic
ProduceTo(topic string, b []byte)
}
type topicProducer struct {
TopicProducer
}
// MakeConsumer creates a new Consumer
type MakeConsumer func(args Args) (Consumer, error)
// A Consumer reads messages
type Consumer interface {
// Get messages from here.
Next() ([]byte, error)
// Tear the consumer down and wait for it to exit.
//
// If you use NewConsumer(), this is automatically called when the object
// is GCd.
Close() cog.Errors
}
type consumer struct {
Consumer
}
var (
// ClosedErrCh should be returned from Errs() when no errors can be
// produced. This ensures that any receiver immediately returns and doesn't
// block forever.
ClosedErrCh chan error
regdPs = map[string]MakeProducer{}
regdCs = map[string]MakeConsumer{}
)
func init() {
ClosedErrCh = make(chan error)
close(ClosedErrCh)
}
// RegisterProducer registers a Producer for use. Names are case insensitive.
//
// If your producer implements TopicProducer, it will automatically be made
// available.
func RegisterProducer(name string, np MakeProducer) {
lname := strings.ToLower(name)
if _, ok := regdPs[lname]; ok {
panic(fmt.Errorf("producer `%s` already exists", name))
}
regdPs[lname] = np
}
func newProducer(name string, args Args) (Producer, error) {
lname := strings.ToLower(name)
np, ok := regdPs[lname]
if !ok {
return nil, fmt.Errorf("producer `%s` does not exist", name)
}
p, err := np(args)
if err != nil {
return nil, fmt.Errorf("failed to create producer `%s`: %v", name, err)
}
return p, nil
}
// NewProducer creates a new producer with the given arguments
func NewProducer(name string, args Args) (Producer, error) {
p, err := newProducer(name, args)
if err != nil {
return nil, err
}
pp := &producer{Producer: p}
runtime.SetFinalizer(pp, finalizeProducer)
return pp, nil
}
func (p *producer) Close() (es cog.Errors) {
runtime.SetFinalizer(p, nil)
return p.Producer.Close()
}
func finalizeProducer(p *producer) {
go p.Close()
}
// NewTopicProducer creates a new producer, provided that the producer
// implements TopicProducer.
func NewTopicProducer(name string, args Args) (TopicProducer, error) {
p, err := newProducer(name, args)
if err != nil {
return nil, err
}
tp, ok := p.(TopicProducer)
if !ok {
p.Close()
return nil, fmt.Errorf("`%s` does not implement TopicProducer", name)
}
tpp := &topicProducer{TopicProducer: tp}
runtime.SetFinalizer(tpp, finalizeTopicProducer)
return tpp, nil
}
func (tp *topicProducer) Close() (es cog.Errors) {
runtime.SetFinalizer(tp, nil)
return tp.TopicProducer.Close()
}
func finalizeTopicProducer(tp *topicProducer) {
go tp.Close()
}
// RegisterConsumer registers a Consumer for use. Names are case insensitive.
func RegisterConsumer(name string, nc MakeConsumer) {
lname := strings.ToLower(name)
if _, ok := regdCs[lname]; ok {
panic(fmt.Errorf("consumer %s already exists", name))
}
regdCs[lname] = nc
}
// NewConsumer creates a new consumer with the given arguments
func NewConsumer(name string, args Args) (Consumer, error) {
lname := strings.ToLower(name)
nc, ok := regdCs[lname]
if !ok {
return nil, fmt.Errorf("consumer `%s` does not exist", name)
}
c, err := nc(args)
if err != nil {
return nil, fmt.Errorf("failed to create consumer `%s`: %v", name, err)
}
cp := &consumer{Consumer: c}
runtime.SetFinalizer(cp, finalizeConsumer)
return cp, nil
}
func (c *consumer) Close() (es cog.Errors) {
runtime.SetFinalizer(c, nil)
return c.Consumer.Close()
}
func finalizeConsumer(c *consumer) {
go c.Close()
}