-
Notifications
You must be signed in to change notification settings - Fork 1
/
helpers.go
169 lines (149 loc) · 4.41 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package client
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// Waiter is informed of current height, decided whether to quit early
type Waiter func(delta int64) (abort error)
// DefaultWaitStrategy is the standard backoff algorithm,
// but you can plug in another one
func DefaultWaitStrategy(delta int64) (abort error) {
if delta > 10 {
return fmt.Errorf("waiting for %d blocks... aborting", delta)
} else if delta > 0 {
// estimate of wait time....
// wait half a second for the next block (in progress)
// plus one second for every full block
delay := time.Duration(delta-1)*time.Second + 500*time.Millisecond
time.Sleep(delay)
}
return nil
}
// Wait for height will poll status at reasonable intervals until
// the block at the given height is available.
//
// If waiter is nil, we use DefaultWaitStrategy, but you can also
// provide your own implementation
func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
if waiter == nil {
waiter = DefaultWaitStrategy
}
delta := int64(1)
for delta > 0 {
s, err := c.Status(context.Background())
if err != nil {
return err
}
delta = h - s.SyncInfo.LatestBlockHeight
// wait for the time, or abort early
if err := waiter(delta); err != nil {
return err
}
}
return nil
}
// WaitForOneEvent subscribes to a websocket event for the given
// event time and returns upon receiving it one time, or
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// register for the next event of this type
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(eventValue).String())
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
// make sure to un-register after the test is over
defer func() {
if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil {
panic(err)
}
}()
select {
case event := <-eventCh:
return event.Data, nil
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
}
}
var (
// ErrClientRunning is returned by Start when the client is already running.
ErrClientRunning = errors.New("client already running")
// ErrClientNotRunning is returned by Stop when the client is not running.
ErrClientNotRunning = errors.New("client is not running")
)
// RunState is a helper that a client implementation can embed to implement
// common plumbing for keeping track of run state and logging.
//
// TODO(creachadair): This type is a temporary measure, and will be removed.
// See the discussion on #6971.
type RunState struct {
Logger log.Logger
mu sync.Mutex
name string
isRunning bool
quit chan struct{}
}
// NewRunState returns a new unstarted run state tracker with the given logging
// label and log sink. If logger == nil, a no-op logger is provided by default.
func NewRunState(name string, logger log.Logger) *RunState {
if logger == nil {
logger = log.NewNopLogger()
}
return &RunState{
name: name,
Logger: logger,
}
}
// Start sets the state to running, or reports an error.
func (r *RunState) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRunning {
r.Logger.Error("not starting client, it is already started", "client", r.name)
return ErrClientRunning
}
r.Logger.Info("starting client", "client", r.name)
r.isRunning = true
r.quit = make(chan struct{})
return nil
}
// Stop sets the state to not running, or reports an error.
func (r *RunState) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning {
r.Logger.Error("not stopping client; it is already stopped", "client", r.name)
return ErrClientNotRunning
}
r.Logger.Info("stopping client", "client", r.name)
r.isRunning = false
close(r.quit)
return nil
}
// SetLogger updates the log sink.
func (r *RunState) SetLogger(logger log.Logger) {
r.mu.Lock()
defer r.mu.Unlock()
r.Logger = logger
}
// IsRunning reports whether the state is running.
func (r *RunState) IsRunning() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRunning
}
// Quit returns a channel that is closed when a call to Stop succeeds.
func (r *RunState) Quit() <-chan struct{} {
r.mu.Lock()
defer r.mu.Unlock()
return r.quit
}