/
fanout.go
156 lines (134 loc) · 2.33 KB
/
fanout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package link
import (
"fmt"
"sync"
"github.com/whiteboxio/flow/pkg/core"
)
type RingLink struct {
self core.Link
next *RingLink
prev *RingLink
}
type Fanout struct {
Name string
ringHead *RingLink
*sync.Mutex
*core.Connector
}
func New(name string, params core.Params) (core.Link, error) {
ft := &Fanout{
name,
nil,
&sync.Mutex{},
core.NewConnector(),
}
go ft.fanout()
return ft, nil
}
func (ft *Fanout) ConnectTo(core.Link) error {
panic("Fanout is not supposed to be connected")
}
func (ft *Fanout) fanout() {
for msg := range ft.GetMsgCh() {
head := ft.ringHead
if head == nil {
msg.AckFailed()
continue
}
head.self.Recv(msg)
ft.ringHead = head.next
}
}
func (ft *Fanout) LinkTo(links []core.Link) error {
for _, link := range links {
ft.AddLink(link)
}
return nil
}
func (ft *Fanout) AddLink(link core.Link) error {
return ft.addRingLink(&RingLink{self: link})
}
func (ft *Fanout) FindLink(link core.Link) (*RingLink, bool) {
ft.Lock()
defer ft.Unlock()
if link == nil {
return nil, false
}
if ft.ringHead == nil {
return nil, false
}
ptr := ft.ringHead
found := false
for {
if ptr.self == link {
found = true
break
}
ptr = ptr.next
if ptr == ft.ringHead {
break
}
}
if !found {
return nil, false
}
return ptr, true
}
func (ft *Fanout) addRingLink(rl *RingLink) error {
ft.Lock()
defer ft.Unlock()
head := ft.ringHead
if head == nil {
rl.next, rl.prev = rl, rl
} else {
rl.next, rl.prev = head.next, head
head.next.prev, head.next = rl, rl
}
ft.ringHead = rl
return nil
}
func (ft Fanout) RemoveLink(link core.Link) error {
if ptr, ok := ft.FindLink(link); ok {
return ft.removeRingLink(ptr)
} else {
return fmt.Errorf("Link could not be found in the ring")
}
}
func (ft *Fanout) removeRingLink(rl *RingLink) error {
ft.Lock()
defer ft.Unlock()
if rl == nil {
return fmt.Errorf("RingLink is empty")
}
next := rl.next
if rl != next {
rl.prev.next = rl.next
rl.next.prev = rl.prev
if rl == ft.ringHead {
ft.ringHead = next
}
} else {
ft.ringHead = nil
}
rl.next = nil
rl.prev = nil
return nil
}
func (ft *Fanout) RingSize() int {
if ft.ringHead == nil {
return 0
}
ft.Lock()
defer ft.Unlock()
cnt := 1
head := ft.ringHead
ptr := head.next
for {
if ptr == head {
break
}
cnt++
ptr = ptr.next
}
return cnt
}