-
Notifications
You must be signed in to change notification settings - Fork 13
/
instancevalidator.go
163 lines (137 loc) · 5.35 KB
/
instancevalidator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package agent
import (
"fmt"
"sync"
"github.com/Axway/agent-sdk/pkg/util"
defs "github.com/Axway/agent-sdk/pkg/apic/definitions"
apiV1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
"github.com/Axway/agent-sdk/pkg/jobs"
utilErrors "github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
"github.com/Axway/agent-sdk/pkg/util/log"
)
type instanceValidator struct {
jobs.Job
logger log.FieldLogger
cacheLock *sync.Mutex
}
func newInstanceValidator() *instanceValidator {
return &instanceValidator{
logger: logger.WithComponent("instanceValidator"),
cacheLock: &sync.Mutex{},
}
}
// Ready -
func (j *instanceValidator) Ready() bool {
status, _ := hc.GetGlobalStatus()
return status == string(hc.OK)
}
// Status -
func (j *instanceValidator) Status() error {
j.logger.Trace("status check")
if status, _ := hc.GetGlobalStatus(); status != string(hc.OK) {
err := fmt.Errorf("agent is marked as not running")
j.logger.WithError(err).Trace("status failed")
return err
}
return nil
}
// Execute -
func (j *instanceValidator) Execute() error {
if getAPIValidator() != nil {
j.logger.Trace("executing")
PublishingLock()
defer PublishingUnlock()
j.validateAPIOnDataplane()
} else {
j.logger.Trace("no registered validator")
}
j.logger.Trace("finished executing")
return nil
}
func (j *instanceValidator) validateAPIOnDataplane() {
j.cacheLock.Lock()
defer j.cacheLock.Unlock()
logger := j.logger
logger.Trace("validating api service instances on dataplane")
// Validate the API on dataplane. If API is not valid, mark the consumer instance as "DELETED"
for _, key := range agent.cacheManager.GetAPIServiceInstanceKeys() {
logger := logger.WithField("instanceCacheID", key)
logger.Tracef("validating")
instance, err := agent.cacheManager.GetAPIServiceInstanceByID(key)
if err != nil {
logger.WithError(err).Trace("could not get instance from cache")
continue
}
logger = logger.WithField("name", instance.Name)
externalAPIID, _ := util.GetAgentDetailsValue(instance, defs.AttrExternalAPIID)
if externalAPIID == "" {
logger.Trace("could not get instance external id")
continue // skip service instances without external api id
} else if err != nil {
logger.WithError(err).Trace("could not get instance external id")
}
logger = logger.WithField("externalAPIID", externalAPIID)
externalAPIStage, _ := util.GetAgentDetailsValue(instance, defs.AttrExternalAPIStage)
if externalAPIStage != "" {
logger = logger.WithField("externalAPIStage", externalAPIStage)
}
externalPrimaryKey, _ := util.GetAgentDetailsValue(instance, defs.AttrExternalAPIPrimaryKey)
if externalPrimaryKey != "" {
logger = logger.WithField("externalPrimaryKey", externalPrimaryKey)
}
// Check if the consumer instance was published by agent, i.e. following attributes are set
// - externalAPIID should not be empty
// - externalAPIStage could be empty for dataplanes that do not support it
logger.Trace("validating API Instance on dataplane")
apiValidator := getAPIValidator()
if externalAPIID != "" && !apiValidator(externalAPIID, externalAPIStage) {
logger.Trace("removing API Instance no longer on dataplane")
j.deleteServiceInstance(logger, instance, externalPrimaryKey, externalAPIID)
}
}
j.validateServices()
}
func (j *instanceValidator) validateServices() {
logger.Trace("validating api services have at least one instance on dataplane")
for _, key := range agent.cacheManager.GetAPIServiceKeys() {
logger := logger.WithField("serviceCacheID", key)
logger.Tracef("validating")
service := agent.cacheManager.GetAPIServiceWithPrimaryKey(key)
if service == nil {
logger.Trace("service was no longer in the cache")
continue
}
logger = logger.WithField("name", service.Name)
instanceCount := agent.cacheManager.GetAPIServiceInstanceCount(service.Name)
logger = logger.WithField("instanceCount", instanceCount)
if agent.cacheManager.GetAPIServiceInstanceCount(service.Name) == 0 {
logger.Trace("service has no more instances")
j.deleteService(logger, service)
}
}
}
func (j *instanceValidator) deleteServiceInstance(logger log.FieldLogger, ri *apiV1.ResourceInstance, primaryKey, apiID string) {
// delete if it is an api service instance
logger = logger.WithField("instanceTitle", ri.Title)
logger.Infof("API no longer exists on the dataplane, deleting the API Service Instance")
err := agent.apicClient.DeleteAPIServiceInstance(ri.Name)
if err != nil {
logger.WithError(utilErrors.Wrap(ErrDeletingServiceInstanceItem, err.Error()).FormatError(ri.Title)).Error("deleting instance")
return
}
agent.cacheManager.DeleteAPIServiceInstance(ri.Metadata.ID)
logger.Debugf("Deleted API Service Instance item from Amplify Central")
}
func (j *instanceValidator) deleteService(logger log.FieldLogger, ri *apiV1.ResourceInstance) {
logger = logger.WithField("serviceTitle", ri.Title)
logger.Infof("API Service no longer has a service instance; deleting the API Service")
// deleting the service will delete all associated resources, including the consumerInstance
err := agent.apicClient.DeleteServiceByName(ri.Name)
if err != nil {
logger.WithError(utilErrors.Wrap(ErrDeletingService, err.Error()).FormatError(ri.Title)).Error("deleting service")
return
}
agent.cacheManager.DeleteAPIService(ri.Name)
logger.Debugf("Deleted API Service from Amplify Central")
}