/
swappable.go
94 lines (79 loc) · 2.43 KB
/
swappable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package common
import (
"context"
"fmt"
"sync"
)
// Stoppable represents a resource (a Benthos stream) that can be stopped.
type Stoppable interface {
Stop(ctx context.Context) error
}
// CombineStoppables returns a single Stoppable that will call each provided
// Stoppable in the order they are specified on a Stop. If any stoppable returns
// an error all subsequent stoppables will still be called before an error is
// returned.
func CombineStoppables(stoppables ...Stoppable) Stoppable {
return &combinedStoppables{
stoppables: stoppables,
}
}
type combinedStoppables struct {
stoppables []Stoppable
}
func (c *combinedStoppables) Stop(ctx context.Context) (stopErr error) {
for _, s := range c.stoppables {
if err := s.Stop(ctx); err != nil && stopErr == nil {
stopErr = err
}
}
return
}
// SwappableStopper wraps an active Stoppable resource in a mechanism that
// allows changing the resource for something else after stopping it.
type SwappableStopper struct {
stopped bool
current Stoppable
mut sync.Mutex
}
// NewSwappableStopper creates a new swappable stopper resource around an
// initial stoppable.
func NewSwappableStopper(s Stoppable) *SwappableStopper {
return &SwappableStopper{
current: s,
}
}
// Stop the wrapped resource.
func (s *SwappableStopper) Stop(ctx context.Context) error {
s.mut.Lock()
defer s.mut.Unlock()
if s.stopped {
return nil
}
s.stopped = true
return s.current.Stop(ctx)
}
// Replace the resource with something new only once the existing one is
// stopped. In order to avoid unnecessary start up of the swapping resource we
// accept a closure that constructs it and is only called when we're ready.
func (s *SwappableStopper) Replace(ctx context.Context, fn func() (Stoppable, error)) error {
s.mut.Lock()
defer s.mut.Unlock()
if s.stopped {
// If the outer stream has been stopped then do not create a new one.
return nil
}
// The underlying implementation is expected to continue shutting resources
// down in the background. An error here indicates that it hasn't managed to
// fully clean up before reaching a context deadline.
//
// However, aborting the creation of the replacement would not be
// appropriate as it would leave the service stateless, we therefore stop
// blocking and proceed.
_ = s.current.Stop(ctx)
newStoppable, err := fn()
if err != nil {
return fmt.Errorf("failed to init updated stream: %w", err)
}
s.current = newStoppable
return nil
}