-
Notifications
You must be signed in to change notification settings - Fork 0
/
x_disruptor.go
101 lines (89 loc) · 2.6 KB
/
x_disruptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package ipc
import (
"sync/atomic"
"time"
"github.com/benz9527/xboot/lib/bits"
"github.com/benz9527/xboot/lib/infra"
"github.com/benz9527/xboot/lib/queue"
)
type disruptorStatus int32
const (
disruptorReady disruptorStatus = iota
disruptorRunning
)
type xDisruptor[T any] struct {
pub interface {
Publisher[T]
stopper
}
sub interface {
Subscriber[T]
stopper
}
status disruptorStatus
}
func NewXDisruptor[T any](
capacity uint64,
strategy BlockStrategy,
handler EventHandler[T],
) Disruptor[T] {
capacity = bits.RoundupPowOf2ByCeil(capacity)
if capacity < 2 {
capacity = 2
}
seq := NewXSequencer(capacity)
// Can't start from 0, because 0 will be treated as nil value
seq.GetWriteCursor().Next()
seq.GetReadCursor().Next()
rb := queue.NewXRingBuffer[T](capacity)
pub := newXPublisher[T](seq, rb, strategy)
sub := newXSubscriber[T](rb, handler, seq, strategy)
d := &xDisruptor[T]{
pub: pub,
sub: sub,
status: disruptorReady,
}
return d
}
func (dis *xDisruptor[T]) Start() error {
if atomic.CompareAndSwapInt32((*int32)(&dis.status), int32(disruptorReady), int32(disruptorRunning)) {
if err := dis.sub.Start(); err != nil {
atomic.StoreInt32((*int32)(&dis.status), int32(disruptorReady))
return infra.WrapErrorStack(err)
}
if err := dis.pub.Start(); err != nil {
atomic.StoreInt32((*int32)(&dis.status), int32(disruptorReady))
return infra.WrapErrorStack(err)
}
return nil
}
return infra.NewErrorStack("[disruptor] already started")
}
func (dis *xDisruptor[T]) Stop() error {
if atomic.CompareAndSwapInt32((*int32)(&dis.status), int32(disruptorRunning), int32(disruptorReady)) {
if err := dis.pub.Stop(); err != nil {
atomic.CompareAndSwapInt32((*int32)(&dis.status), int32(disruptorRunning), int32(disruptorReady))
return infra.WrapErrorStack(err)
}
if err := dis.sub.Stop(); err != nil {
atomic.CompareAndSwapInt32((*int32)(&dis.status), int32(disruptorRunning), int32(disruptorReady))
return infra.WrapErrorStack(err)
}
return nil
}
return infra.NewErrorStack("[disruptor] already stopped")
}
func (dis *xDisruptor[T]) IsStopped() bool {
return atomic.LoadInt32((*int32)(&dis.status)) != int32(disruptorRunning)
}
func (dis *xDisruptor[T]) Publish(event T) (uint64, bool, error) {
return dis.pub.Publish(event)
}
func (dis *xDisruptor[T]) PublishTimeout(event T, timeout time.Duration) {
dis.pub.PublishTimeout(event, timeout)
}
func (dis *xDisruptor[T]) RegisterSubscriber(sub Subscriber[T]) error {
// Single pipeline disruptor only support one subscriber to consume the events.
// It will be registered at the construction.
return nil
}