forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
plug.go
118 lines (102 loc) · 2.4 KB
/
plug.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package plug
import (
"sync"
)
// Plug represents a synchronization primitive that holds and releases
// execution for other objects.
type Plug interface {
// Begins operation of the plug and unblocks WaitForStart().
// May be invoked multiple times but only the first invocation has
// an effect.
Start()
// Ends operation of the plug and unblocks WaitForStop()
// May be invoked multiple times but only the first invocation has
// an effect. Calling Stop() before Start() is undefined.
Stop()
// Blocks until Start() is invoked
WaitForStart()
// Blocks until Stop() is invoked
WaitForStop()
// Returns true if Start() has been invoked
IsStarted() bool
}
// plug is the default implementation of Plug
type plug struct {
start sync.Once
stop sync.Once
startCh chan struct{}
stopCh chan struct{}
}
// New returns a new plug that can begin in the Started state.
func New(started bool) Plug {
p := &plug{
startCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
if started {
p.Start()
}
return p
}
func (p *plug) Start() {
p.start.Do(func() { close(p.startCh) })
}
func (p *plug) Stop() {
p.stop.Do(func() { close(p.stopCh) })
}
func (p *plug) IsStarted() bool {
select {
case <-p.startCh:
return true
default:
return false
}
}
func (p *plug) WaitForStart() {
<-p.startCh
}
func (p *plug) WaitForStop() {
<-p.stopCh
}
// Leaser controls access to a lease
type Leaser interface {
// AcquireAndHold tries to acquire the lease and hold it until it expires, the lease is deleted,
// or we observe another party take the lease. The notify channel will be sent a value
// when the lease is held, and closed when the lease is lost.
AcquireAndHold(chan struct{})
Release()
}
// leased uses a Leaser to control Start and Stop on a Plug
type Leased struct {
Plug
leaser Leaser
}
var _ Plug = &Leased{}
// NewLeased creates a Plug that starts when a lease is acquired
// and stops when it is lost.
func NewLeased(leaser Leaser) *Leased {
return &Leased{
Plug: New(false),
leaser: leaser,
}
}
// Stop releases the acquired lease
func (l *Leased) Stop() {
l.leaser.Release()
l.Plug.Stop()
}
// Run tries to acquire and hold a lease, invoking Start()
// when the lease is held and invoking Stop() when the lease
// is lost.
func (l *Leased) Run() {
ch := make(chan struct{}, 1)
go l.leaser.AcquireAndHold(ch)
defer l.Stop()
for {
_, ok := <-ch
if !ok {
return
}
l.Start()
}
}