generated from hashicorp/packer-plugin-scaffolding
/
step_watch_source.go
225 lines (182 loc) · 5.89 KB
/
step_watch_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:generate packer-sdc struct-markdown
//go:generate packer-sdc mapstructure-to-hcl2 -type WatchSourceConfig
package supervisor
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/hashicorp/packer-plugin-sdk/multistep"
"github.com/hashicorp/packer-plugin-sdk/retry"
vmopv1alpha1 "github.com/vmware-tanzu/vm-operator/api/v1alpha1"
"k8s.io/apimachinery/pkg/fields"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
DefaultWatchTimeoutSec = 1800
StateKeyVMIP = "vm_ip"
StateKeyCommunicateIP = "ip"
)
var (
Mu sync.Mutex
IsWatchingVM bool
)
type WatchSourceConfig struct {
// The timeout in seconds to wait for the source VM to be ready. Defaults to `1800`.
WatchSourceTimeoutSec int `mapstructure:"watch_source_timeout_sec"`
}
func (c *WatchSourceConfig) Prepare() []error {
if c.WatchSourceTimeoutSec == 0 {
c.WatchSourceTimeoutSec = DefaultWatchTimeoutSec
}
return nil
}
type StepWatchSource struct {
Config *WatchSourceConfig
SourceName, Namespace string
KubeWatchClient client.WithWatch
}
func (s *StepWatchSource) Run(ctx context.Context, state multistep.StateBag) multistep.StepAction {
logger := state.Get("logger").(*PackerLogger)
logger.Info("Waiting for the source VM to be powered-on and accessible...")
var err error
defer func() {
if err != nil {
state.Put("error", err)
}
}()
if err = s.initStep(state); err != nil {
return multistep.ActionHalt
}
timedCtx, cancel := context.WithTimeout(ctx, time.Duration(s.Config.WatchSourceTimeoutSec)*time.Second)
defer cancel()
vmIP := ""
vmIP, err = s.waitForVMReady(timedCtx, logger)
if err != nil {
return multistep.ActionHalt
}
state.Put(StateKeyVMIP, vmIP)
// Only get the VM ingress IP if the VM service has been created (i.e. communicator is not 'none').
if state.Get(StateKeyVMServiceCreated) == true {
ingressIP := ""
ingressIP, err = s.getVMIngressIP(timedCtx, logger)
if err != nil {
return multistep.ActionHalt
}
state.Put(StateKeyCommunicateIP, ingressIP)
}
logger.Info("Source VM is now ready in Supervisor cluster")
return multistep.ActionContinue
}
func (s *StepWatchSource) Cleanup(state multistep.StateBag) {}
func (s *StepWatchSource) initStep(state multistep.StateBag) error {
if err := CheckRequiredStates(state,
StateKeyKubeClient,
StateKeySupervisorNamespace,
StateKeySourceName,
); err != nil {
return err
}
var (
ok bool
sourceName, namespace string
kubeWatchClient client.WithWatch
)
if sourceName, ok = state.Get(StateKeySourceName).(string); !ok {
return fmt.Errorf("failed to cast %s to type string", StateKeySourceName)
}
if namespace, ok = state.Get(StateKeySupervisorNamespace).(string); !ok {
return fmt.Errorf("failed to cast %s to type string", StateKeySupervisorNamespace)
}
if kubeWatchClient, ok = state.Get(StateKeyKubeClient).(client.WithWatch); !ok {
return fmt.Errorf("failed to cast %s to type client.WithWatch", StateKeyKubeClient)
}
s.SourceName = sourceName
s.Namespace = namespace
s.KubeWatchClient = kubeWatchClient
return nil
}
func (s *StepWatchSource) waitForVMReady(ctx context.Context, logger *PackerLogger) (string, error) {
vmWatch, err := s.KubeWatchClient.Watch(ctx, &vmopv1alpha1.VirtualMachineList{}, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", s.SourceName),
Namespace: s.Namespace,
})
if err != nil {
logger.Error("Failed to watch the VM object in Supervisor cluster")
return "", err
}
defer func() {
vmWatch.Stop()
Mu.Lock()
IsWatchingVM = false // This is used when mocking the watch process in test.
Mu.Unlock()
}()
Mu.Lock()
IsWatchingVM = true
Mu.Unlock()
for {
select {
case event := <-vmWatch.ResultChan():
if event.Object == nil {
continue
}
vmObj, ok := event.Object.(*vmopv1alpha1.VirtualMachine)
if !ok {
continue
}
vmIP := vmObj.Status.VmIp
if vmIP != "" && net.ParseIP(vmIP) != nil && net.ParseIP(vmIP).To4() != nil {
logger.Info("Successfully obtained the source VM IP: %s", vmIP)
return vmIP, nil
}
// If the code reaches here, then the VM object is not ready yet.
// Provide additional logging based on the current VM power state.
vmPowerState := vmObj.Status.PowerState
if vmPowerState == vmopv1alpha1.VirtualMachinePoweredOn {
logger.Info("Source VM is powered-on, waiting for an IP to be assigned...")
} else {
logger.Info("Source VM is NOT powered-on yet, continue watching...")
}
case <-ctx.Done():
return "", fmt.Errorf("timed out watching for source VM object to be ready")
}
}
}
func (s *StepWatchSource) getVMIngressIP(ctx context.Context, logger *PackerLogger) (string, error) {
logger.Info("Getting source VM ingress IP from the VMService object")
vmServiceObj := &vmopv1alpha1.VirtualMachineService{}
vmServiceObjKey := client.ObjectKey{
Namespace: s.Namespace,
Name: s.SourceName,
}
var vmIngressIP string
err := retry.Config{
RetryDelay: func() time.Duration {
return 5 * time.Second
},
ShouldRetry: func(err error) bool {
return !errors.Is(err, context.DeadlineExceeded)
},
}.Run(ctx, func(ctx context.Context) error {
if err := s.KubeWatchClient.Get(ctx, vmServiceObjKey, vmServiceObj); err != nil {
logger.Error("Failed to get the VMService object in Supervisor cluster")
return err
}
ingress := vmServiceObj.Status.LoadBalancer.Ingress
if len(ingress) == 0 || ingress[0].IP == "" {
logger.Info("VMService object's ingress IP is empty, continue checking...")
return errors.New("VMService object's ingress IP is empty, continue checking...")
}
logger.Info("Successfully retrieved the source VM ingress IP: %s", ingress[0].IP)
vmIngressIP = ingress[0].IP
return nil
})
if err != nil {
return "", fmt.Errorf("timed out checking for VMService object's ingress IP")
}
return vmIngressIP, nil
}