/
node.go
176 lines (159 loc) · 4.57 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package integration
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"google.golang.org/grpc"
agentutils "github.com/docker/swarmkit/agent/testutils"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/node"
"github.com/docker/swarmkit/testutils"
)
// TestNode is representation of *agent.Node. It stores listeners, connections,
// config for later access from tests.
type testNode struct {
config *node.Config
node *node.Node
stateDir string
}
// generateCerts generates/overwrites TLS certificates for a node in a particular directory
func generateCerts(tmpDir string, rootCA *ca.RootCA, nodeID, role, org string, writeKey bool) error {
signer, err := rootCA.Signer()
if err != nil {
return err
}
certDir := filepath.Join(tmpDir, "certificates")
if err := os.MkdirAll(certDir, 0o700); err != nil {
return err
}
certPaths := ca.NewConfigPaths(certDir)
if err := os.WriteFile(certPaths.RootCA.Cert, signer.Cert, 0o644); err != nil {
return err
}
if writeKey {
if err := os.WriteFile(certPaths.RootCA.Key, signer.Key, 0o600); err != nil {
return err
}
}
_, _, err = rootCA.IssueAndSaveNewCertificates(
ca.NewKeyReadWriter(certPaths.Node, nil, nil), nodeID, role, org)
return err
}
// newNode creates new node with specific role(manager or agent) and joins to
// existing cluster. if joinAddr is empty string, then new cluster will be initialized.
// It uses TestExecutor as executor. If lateBind is set, the remote API port is not
// bound. If rootCA is set, this root is used to bootstrap the node's TLS certs.
func newTestNode(joinAddr, joinToken string, lateBind bool, fips bool) (*testNode, error) {
tmpDir, err := os.MkdirTemp("", "swarmkit-integration-")
if err != nil {
return nil, err
}
cAddr := filepath.Join(tmpDir, "control.sock")
cfg := &node.Config{
ListenControlAPI: cAddr,
JoinAddr: joinAddr,
StateDir: tmpDir,
Executor: &agentutils.TestExecutor{},
JoinToken: joinToken,
FIPS: fips,
}
if !lateBind {
cfg.ListenRemoteAPI = "127.0.0.1:0"
}
node, err := node.New(cfg)
if err != nil {
return nil, err
}
return &testNode{
config: cfg,
node: node,
stateDir: tmpDir,
}, nil
}
// Pause stops the node, and creates a new swarm node while keeping all the state
func (n *testNode) Pause(forceNewCluster bool) error {
rAddr, err := n.node.RemoteAPIAddr()
if err != nil {
rAddr = "127.0.0.1:0"
}
if err := n.stop(); err != nil {
return err
}
cfg := n.config
cfg.ListenRemoteAPI = rAddr
// If JoinAddr is set, the node will connect to the join addr and ignore any
// other remotes that are stored in the raft directory.
cfg.JoinAddr = ""
cfg.JoinToken = ""
cfg.ForceNewCluster = forceNewCluster
node, err := node.New(cfg)
if err != nil {
return err
}
n.node = node
return nil
}
func (n *testNode) stop() error {
ctx, cancel := context.WithTimeout(context.Background(), opsTimeout)
defer cancel()
isManager := n.IsManager()
if err := n.node.Stop(ctx); err != nil {
// if the error is from trying to stop an already stopped stopped node, ignore the error
if strings.Contains(err.Error(), "node: not started") {
return nil
}
// TODO(aaronl): This stack dumping may be removed in the
// future once context deadline issues while shutting down
// nodes are resolved.
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, 2*len(buf))
}
os.Stderr.Write(buf)
if isManager {
return fmt.Errorf("error stop manager %s: %v", n.node.NodeID(), err)
}
return fmt.Errorf("error stop worker %s: %v", n.node.NodeID(), err)
}
return nil
}
// Stop stops the node and removes its state directory.
func (n *testNode) Stop() error {
if err := n.stop(); err != nil {
return err
}
return os.RemoveAll(n.stateDir)
}
// ControlClient returns grpc client to ControlAPI of node. It will panic for
// non-manager nodes.
func (n *testNode) ControlClient(ctx context.Context) (api.ControlClient, error) {
ctx, cancel := context.WithTimeout(ctx, opsTimeout)
defer cancel()
connChan := n.node.ListenControlSocket(ctx)
var controlConn *grpc.ClientConn
if err := testutils.PollFuncWithTimeout(nil, func() error {
select {
case controlConn = <-connChan:
default:
}
if controlConn == nil {
return fmt.Errorf("didn't get control api connection")
}
return nil
}, opsTimeout); err != nil {
return nil, err
}
return api.NewControlClient(controlConn), nil
}
func (n *testNode) IsManager() bool {
return n.node.Manager() != nil
}