-
Notifications
You must be signed in to change notification settings - Fork 1
/
ccontainer.go
157 lines (141 loc) · 4 KB
/
ccontainer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package ccontainer
import (
"context"
proto "github.com/aperturerobotics/protobuf-go-lite"
"github.com/aperturerobotics/util/broadcast"
)
// CContainer is a concurrent container.
type CContainer[T comparable] struct {
bcast broadcast.Broadcast
val T
equal func(a, b T) bool
}
// NewCContainer builds a CContainer with an initial value.
func NewCContainer[T comparable](val T) *CContainer[T] {
return &CContainer[T]{val: val}
}
// NewCContainerWithEqual builds a CContainer with an initial value and a comparator.
func NewCContainerWithEqual[T comparable](val T, isEqual func(a, b T) bool) *CContainer[T] {
return &CContainer[T]{val: val, equal: isEqual}
}
// NewCContainerVT constructs a CContainer that uses VTEqual to check for equality.
func NewCContainerVT[T proto.EqualVT[T]](val T) *CContainer[T] {
return NewCContainerWithEqual[T](val, proto.CompareEqualVT[T]())
}
// GetValue returns the immediate value of the container.
func (c *CContainer[T]) GetValue() T {
var val T
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
val = c.val
})
return val
}
// SetValue sets the ccontainer value.
func (c *CContainer[T]) SetValue(val T) {
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
if !c.compare(c.val, val) {
c.val = val
broadcast()
}
})
}
// SwapValue locks the container, calls the callback, and stores the return value.
//
// Returns the updated value.
// If cb is nil returns the current value without changes.
func (c *CContainer[T]) SwapValue(cb func(val T) T) T {
var val T
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
val = c.val
if cb != nil {
val = cb(val)
if !c.compare(c.val, val) {
c.val = val
broadcast()
}
}
})
return val
}
// WaitValueWithValidator waits for any value that matches the validator in the container.
// errCh is an optional channel to read an error from.
func (c *CContainer[T]) WaitValueWithValidator(
ctx context.Context,
valid func(v T) (bool, error),
errCh <-chan error,
) (T, error) {
var ok bool
var err error
var emptyValue T
for {
var val T
var wake <-chan struct{}
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
val = c.val
wake = getWaitCh()
})
if valid != nil {
ok, err = valid(val)
} else {
ok = !c.compare(val, emptyValue)
err = nil
}
if err != nil {
return emptyValue, err
}
if ok {
return val, nil
}
select {
case <-ctx.Done():
return emptyValue, ctx.Err()
case err, ok := <-errCh:
if !ok {
// errCh was non-nil but was closed
// treat this as context canceled
return emptyValue, context.Canceled
}
if err != nil {
return emptyValue, err
}
case <-wake:
// woken, value changed
}
}
}
// WaitValue waits for any non-nil value in the container.
// errCh is an optional channel to read an error from.
func (c *CContainer[T]) WaitValue(ctx context.Context, errCh <-chan error) (T, error) {
return c.WaitValueWithValidator(ctx, func(v T) (bool, error) {
var emptyValue T
return !c.compare(emptyValue, v), nil
}, errCh)
}
// WaitValueChange waits for a value that is different than the given.
// errCh is an optional channel to read an error from.
func (c *CContainer[T]) WaitValueChange(ctx context.Context, old T, errCh <-chan error) (T, error) {
return c.WaitValueWithValidator(ctx, func(v T) (bool, error) {
return !c.compare(old, v), nil
}, errCh)
}
// WaitValueEmpty waits for an empty value.
// errCh is an optional channel to read an error from.
func (c *CContainer[T]) WaitValueEmpty(ctx context.Context, errCh <-chan error) error {
_, err := c.WaitValueWithValidator(ctx, func(v T) (bool, error) {
var emptyValue T
return c.compare(emptyValue, v), nil
}, errCh)
return err
}
// compare checks of two values are equal
func (c *CContainer[T]) compare(a, b T) bool {
if a == b {
return true
}
if c.equal != nil && c.equal(a, b) {
return true
}
return false
}
// _ is a type assertion
var _ Watchable[struct{}] = ((*CContainer[struct{}])(nil))