-
Notifications
You must be signed in to change notification settings - Fork 287
/
multicluster.go
160 lines (137 loc) Β· 5.16 KB
/
multicluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package framework
import (
"fmt"
"sync"
"testing"
"time"
"github.com/aws/eks-anywhere/internal/pkg/api"
"github.com/aws/eks-anywhere/pkg/retrier"
)
type MulticlusterE2ETest struct {
T *testing.T
ManagementCluster *ClusterE2ETest
WorkloadClusters WorkloadClusters
// MaxConcurrentWorkers defines the max number of workers for concurrent operations.
// If it's -1, it will use one worker per job.
MaxConcurrentWorkers int
workloadClusterNameCount int
}
func NewMulticlusterE2ETest(t *testing.T, managementCluster *ClusterE2ETest, workloadClusters ...*ClusterE2ETest) *MulticlusterE2ETest {
m := &MulticlusterE2ETest{
T: t,
ManagementCluster: managementCluster,
MaxConcurrentWorkers: -1,
}
m.WorkloadClusters = make(WorkloadClusters, len(workloadClusters))
for _, c := range workloadClusters {
c.clusterFillers = append(c.clusterFillers, api.WithManagementCluster(managementCluster.ClusterName))
c.ClusterName = m.NewWorkloadClusterName()
m.WithWorkloadClusters(c)
}
return m
}
// WithWorkloadClusters adds ClusterE2ETest's as workload clusters to the test.
func (m *MulticlusterE2ETest) WithWorkloadClusters(workloadClusters ...*ClusterE2ETest) {
for _, c := range workloadClusters {
m.WorkloadClusters[c.ClusterName] = &WorkloadCluster{
ClusterE2ETest: c,
ManagementClusterKubeconfigFile: m.ManagementCluster.KubeconfigFilePath,
}
}
}
// NewWorkloadClusterName returns a new unique name for a workload cluster based on the management cluster name.
// This is not thread safe.
func (m *MulticlusterE2ETest) NewWorkloadClusterName() string {
n := fmt.Sprintf("%s-w-%d", m.ManagementCluster.ClusterName, m.workloadClusterNameCount)
m.workloadClusterNameCount++
return n
}
func (m *MulticlusterE2ETest) RunInWorkloadClusters(flow func(*WorkloadCluster)) {
for name, w := range m.WorkloadClusters {
m.T.Logf("Running test flow in workload cluster %s", name)
flow(w)
}
}
// RunConcurrentlyInWorkloadClusters executes the given flow concurrently for all workload
// clusters. It respects MaxConcurrentWorkers.
func (m *MulticlusterE2ETest) RunConcurrentlyInWorkloadClusters(flow func(*WorkloadCluster)) {
jobs := make([]func(), 0, len(m.WorkloadClusters))
for name, wc := range m.WorkloadClusters {
w := wc
jobs = append(jobs, func() {
m.T.Logf("Running test flow in workload cluster %s", name)
flow(w)
})
}
m.RunConcurrently(jobs...)
}
// RunConcurrently runs the given jobs concurrently using no more than MaxConcurrentWorkers workers.
// If MaxConcurrentWorkers is -1, it will use one worker per job.
func (m *MulticlusterE2ETest) RunConcurrently(flows ...func()) {
wg := &sync.WaitGroup{}
workerNum := m.MaxConcurrentWorkers
if workerNum < 0 {
workerNum = len(flows)
}
jobs := make(chan func())
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go func() {
defer wg.Done()
for job := range jobs {
job()
}
}()
}
for _, flow := range flows {
jobs <- flow
}
close(jobs)
wg.Wait()
}
func (m *MulticlusterE2ETest) CreateManagementClusterForVersion(eksaVersion string, opts ...CommandOpt) {
m.ManagementCluster.GenerateClusterConfigForVersion(eksaVersion)
m.CreateManagementCluster(opts...)
}
// CreateManagementClusterWithConfig first generates a cluster config based on the management cluster test's
// previous configuration and proceeds to create a management cluster with the CLI.
func (m *MulticlusterE2ETest) CreateManagementClusterWithConfig(opts ...CommandOpt) {
m.ManagementCluster.GenerateClusterConfig()
m.ManagementCluster.CreateCluster(opts...)
}
func (m *MulticlusterE2ETest) CreateManagementCluster(opts ...CommandOpt) {
m.ManagementCluster.CreateCluster(opts...)
}
// CreateTinkerbellManagementCluster runs tinkerbell related steps for cluster creation.
func (m *MulticlusterE2ETest) CreateTinkerbellManagementCluster(opts ...CommandOpt) {
m.ManagementCluster.GenerateHardwareConfig()
m.ManagementCluster.PowerOffHardware()
m.ManagementCluster.CreateCluster(opts...)
}
func (m *MulticlusterE2ETest) DeleteManagementCluster() {
m.ManagementCluster.DeleteCluster()
}
// DeleteTinkerbellManagementCluster runs tinkerbell related steps for cluster deletion.
func (m *MulticlusterE2ETest) DeleteTinkerbellManagementCluster() {
m.ManagementCluster.StopIfFailed()
m.ManagementCluster.DeleteCluster()
m.ManagementCluster.ValidateHardwareDecommissioned()
}
// PushWorkloadClusterToGit builds the workload cluster config file for git and pushing changes to git.
func (m *MulticlusterE2ETest) PushWorkloadClusterToGit(w *WorkloadCluster, opts ...api.ClusterConfigFiller) {
err := retrier.Retry(10, 5*time.Second, func() error {
return m.ManagementCluster.pushWorkloadClusterToGit(w, opts...)
})
if err != nil {
w.T.Fatalf("Error pushing workload cluster changes to git: %v", err)
}
}
// DeleteWorkloadClusterFromGit deletes a workload cluster config file and pushes the changes to git.
func (m *MulticlusterE2ETest) DeleteWorkloadClusterFromGit(w *WorkloadCluster) {
err := retrier.Retry(10, 5*time.Second, func() error {
return m.ManagementCluster.deleteWorkloadClusterFromGit(w)
})
if err != nil {
w.T.Fatalf("Error deleting workload cluster changes from git: %v", err)
}
}