/
channels.go
167 lines (128 loc) · 3.03 KB
/
channels.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package channels
import (
"errors"
"fmt"
"github.com/TIBCOSoftware/flogo-lib/logger"
"strings"
"strconv"
)
var channels = make(map[string]*channelImpl)
var active bool
type Channel interface {
RegisterCallback(callback OnMessage) error
Publish(msg interface{})
PublishNoWait(msg interface{}) bool
}
type OnMessage func(msg interface{})
// Creates a new channel, channels have to be created before the engine starts
func New(name string, bufferSize int) (Channel, error) {
if active {
return nil, errors.New("cannot create channel after engine has been started")
}
if _, dup := channels[name]; dup {
return nil, errors.New("channel already exists: " + name)
}
channel := &channelImpl{name: name, ch: make(chan interface{}, bufferSize)}
channels[name] = channel
return channel, nil
}
// Count returns the number of channels
func Count() int {
return len(channels)
}
// Get gets the named channel
func Get(name string) Channel {
return channels[name]
}
func Start() error {
active = true
var started []*channelImpl
for _, channel := range channels {
err := channel.Start()
if err != nil {
for _, startedChannel := range started {
startedChannel.Stop()
}
return fmt.Errorf("failed to start channel '%s', error: %s", channel.name, err.Error())
}
logger.Debugf("Started Engine Channel: %s", channel.name)
started = append(started, channel)
}
return nil
}
func Stop() error {
for _, channel := range channels {
err := channel.Stop()
if err != nil {
logger.Warnf("error stopping channel '%s', error: %s", channel.name, err.Error())
}
}
active = false
return nil
}
type channelImpl struct {
name string
callbacks []OnMessage
ch chan interface{}
active bool
}
func (c *channelImpl) Start() error {
c.active = true
go c.processEvents()
return nil
}
func (c *channelImpl) Stop() error {
close(c.ch)
c.active = false
return nil
}
func (c *channelImpl) RegisterCallback(callback OnMessage) error {
if c.active {
return errors.New("cannot add listener after channel has been started")
}
c.callbacks = append(c.callbacks, callback)
return nil
}
func (c *channelImpl) Publish(msg interface{}) {
c.ch <- msg
}
func (c *channelImpl) PublishNoWait(msg interface{}) bool {
sent := false
select {
case c.ch <- msg:
sent = true
default:
sent = false
}
return sent
}
func (c *channelImpl) processEvents() {
for {
select {
case val, ok := <-c.ch:
if !ok {
//channel closed, so return
return
}
for _, callback := range c.callbacks {
go callback(val)
}
}
}
}
// Decode decodes the channel descriptor
func Decode(channelDescriptor string) (string, int){
idx := strings.Index(channelDescriptor,":")
buffSize := 0
chanName := channelDescriptor
if idx > 0 {
bSize, err:= strconv.Atoi(channelDescriptor[idx+1:])
if err != nil {
logger.Warnf("invalid channel buffer size '%s', defaulting to buffer size of %d", channelDescriptor[idx+1:], buffSize)
} else {
buffSize = bSize
}
chanName = channelDescriptor[:idx]
}
return chanName, buffSize
}