/
manager.go
114 lines (104 loc) · 3.89 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package virtualservice
import (
"context"
appmesh "github.com/aws/aws-app-mesh-controller-for-k8s/apis/appmesh/v1beta2"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/references"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/virtualservice"
"github.com/aws/aws-app-mesh-controller-for-k8s/test/framework/k8s"
"github.com/aws/aws-app-mesh-controller-for-k8s/test/framework/utils"
appmeshsdk "github.com/aws/aws-sdk-go/service/appmesh"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Manager interface {
WaitUntilVirtualServiceActive(ctx context.Context, vs *appmesh.VirtualService) (*appmesh.VirtualService, error)
WaitUntilVirtualServiceDeleted(ctx context.Context, vs *appmesh.VirtualService) error
CheckVirtualServiceInAWS(ctx context.Context, ms *appmesh.Mesh, vs *appmesh.VirtualService) error
}
func NewManager(k8sClient client.Client, appMeshSDK services.AppMesh) Manager {
return &defaultManager{
k8sClient: k8sClient,
appMeshSDK: appMeshSDK,
}
}
type defaultManager struct {
k8sClient client.Client
appMeshSDK services.AppMesh
}
func (m *defaultManager) WaitUntilVirtualServiceActive(ctx context.Context, vs *appmesh.VirtualService) (*appmesh.VirtualService, error) {
observedVS := &appmesh.VirtualService{}
retryCount := 0
return observedVS, wait.PollImmediateUntil(utils.PollIntervalShort, func() (bool, error) {
err := m.k8sClient.Get(ctx, k8s.NamespacedName(vs), observedVS)
if err != nil {
if retryCount >= utils.PollRetries {
return false, err
}
retryCount++
return false, nil
}
for _, condition := range observedVS.Status.Conditions {
if condition.Type == appmesh.VirtualServiceActive && condition.Status == corev1.ConditionTrue {
return true, nil
}
}
return false, nil
}, ctx.Done())
}
func (m *defaultManager) WaitUntilVirtualServiceDeleted(ctx context.Context, vs *appmesh.VirtualService) error {
observedVS := &appmesh.VirtualService{}
return wait.PollImmediateUntil(utils.PollIntervalShort, func() (bool, error) {
if err := m.k8sClient.Get(ctx, k8s.NamespacedName(vs), observedVS); err != nil {
if apierrs.IsNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
}, ctx.Done())
}
func (m *defaultManager) CheckVirtualServiceInAWS(ctx context.Context, ms *appmesh.Mesh, vs *appmesh.VirtualService) error {
// TODO: handle aws throttling
vnByKey := make(map[types.NamespacedName]*appmesh.VirtualNode)
vnRefs := virtualservice.ExtractVirtualNodeReferences(vs)
for _, vnRef := range vnRefs {
vn := &appmesh.VirtualNode{}
if err := m.k8sClient.Get(ctx, references.ObjectKeyForVirtualNodeReference(vs, vnRef), vn); err != nil {
return err
}
vnByKey[k8s.NamespacedName(vn)] = vn
}
vrByKey := make(map[types.NamespacedName]*appmesh.VirtualRouter)
vrRefs := virtualservice.ExtractVirtualRouterReferences(vs)
for _, vrRef := range vrRefs {
vr := &appmesh.VirtualRouter{}
if err := m.k8sClient.Get(ctx, references.ObjectKeyForVirtualRouterReference(vs, vrRef), vr); err != nil {
return err
}
vrByKey[k8s.NamespacedName(vr)] = vr
}
desiredSDKVSSpec, err := virtualservice.BuildSDKVirtualServiceSpec(vs, vnByKey, vrByKey)
if err != nil {
return err
}
resp, err := m.appMeshSDK.DescribeVirtualServiceWithContext(ctx, &appmeshsdk.DescribeVirtualServiceInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
VirtualServiceName: vs.Spec.AWSName,
})
if err != nil {
return err
}
opts := cmpopts.EquateEmpty()
if !cmp.Equal(desiredSDKVSSpec, resp.VirtualService.Spec, opts) {
return errors.New(cmp.Diff(desiredSDKVSSpec, resp.VirtualService.Spec, opts))
}
return nil
}