/
instances_reconcile_reactor.go
127 lines (108 loc) · 4.67 KB
/
instances_reconcile_reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package cloudmap
import (
"context"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
)
const (
defaultInstancesReconcileReactorRequestChanBuffer = 10
)
// instancesReconcileReactor manages the asynchronous execution for instances reconcile.
type instancesReconcileReactor interface {
// Submit submits a instances reconcile request, it will asynchronously drive cloudMap service's subset to match desiredState.
Submit(ctx context.Context, service serviceSummary, subset serviceSubset, readyInstanceInfoByID map[string]instanceInfo, unreadyInstanceInfoByID map[string]instanceInfo) <-chan error
}
// newDefaultInstancesReconcileReactor constructs new defaultInstancesReconcileReactor
func newDefaultInstancesReconcileReactor(ctx context.Context, k8sClient client.Client, cloudMapSDK services.CloudMap, log logr.Logger) *defaultInstancesReconcileReactor {
instancesCache := newDefaultInstancesCache(cloudMapSDK)
reactor := &defaultInstancesReconcileReactor{
cloudMapSDK: cloudMapSDK,
instancesCache: instancesCache,
reconcileRequestChan: make(chan instancesReconcileRequest, defaultInstancesReconcileReactorRequestChanBuffer),
reconcileTaskByServiceSubset: make(map[serviceSubsetID]*instancesReconcileTask),
reconcileTaskByServiceSubsetMutex: sync.RWMutex{},
log: log,
}
go reactor.reactorLoop(ctx)
return reactor
}
var _ instancesReconcileReactor = &defaultInstancesReconcileReactor{}
type defaultInstancesReconcileReactor struct {
cloudMapSDK services.CloudMap
instancesCache instancesCache
// channel to receive reconcile requests
reconcileRequestChan chan instancesReconcileRequest
// reconcileTask by serviceSubsetID
reconcileTaskByServiceSubset map[serviceSubsetID]*instancesReconcileTask
// protects reconcileTaskByServiceSubset
reconcileTaskByServiceSubsetMutex sync.RWMutex
log logr.Logger
}
type instancesReconcileRequest struct {
service serviceSummary
subset serviceSubset
readyInstanceInfoByID map[string]instanceInfo
unreadyInstanceInfoByID map[string]instanceInfo
resultChan chan<- error
}
func (r *defaultInstancesReconcileReactor) Submit(ctx context.Context, service serviceSummary, subset serviceSubset, readyInstanceInfoByID map[string]instanceInfo, unreadyInstanceInfoByID map[string]instanceInfo) <-chan error {
resultChan := make(chan error, 1)
reconcileRequest := instancesReconcileRequest{
service: service,
subset: subset,
readyInstanceInfoByID: readyInstanceInfoByID,
unreadyInstanceInfoByID: unreadyInstanceInfoByID,
resultChan: resultChan,
}
select {
case <-ctx.Done():
resultChan <- ctx.Err()
close(resultChan)
case r.reconcileRequestChan <- reconcileRequest:
}
return resultChan
}
func (r *defaultInstancesReconcileReactor) reactorLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case request := <-r.reconcileRequestChan:
r.dispatch(ctx, request)
}
}
}
// dispatch cloudMap service subset reconcile task into existing task or new task.
// note: dispatch should run in a single goroutine.
func (r *defaultInstancesReconcileReactor) dispatch(ctx context.Context, request instancesReconcileRequest) {
serviceSubsetID := serviceSubsetID{serviceID: request.service.serviceID, subsetID: request.subset.SubsetID()}
r.reconcileTaskByServiceSubsetMutex.RLock()
reconcileTask, ok := r.reconcileTaskByServiceSubset[serviceSubsetID]
r.reconcileTaskByServiceSubsetMutex.RUnlock()
if ok {
select {
case <-reconcileTask.done: // existing task already quited.
r.dispatchToNewTask(ctx, serviceSubsetID, request)
case reconcileTask.instancesReconcileRequestChan <- request: // existing task accepted our new request.
}
} else {
r.dispatchToNewTask(ctx, serviceSubsetID, request)
}
}
// dispatch new cloudMap service subset reconcile task to run on a new task.
func (r *defaultInstancesReconcileReactor) dispatchToNewTask(ctx context.Context, serviceSubsetID serviceSubsetID, request instancesReconcileRequest) {
reconcileTask := newInstancesReconcileTask(r.cloudMapSDK, r.instancesCache, r.log, make(chan struct{}))
r.reconcileTaskByServiceSubsetMutex.Lock()
r.reconcileTaskByServiceSubset[serviceSubsetID] = reconcileTask
r.reconcileTaskByServiceSubsetMutex.Unlock()
go func() {
reconcileTask.Run(ctx)
r.reconcileTaskByServiceSubsetMutex.Lock()
delete(r.reconcileTaskByServiceSubset, serviceSubsetID)
r.reconcileTaskByServiceSubsetMutex.Unlock()
close(reconcileTask.done)
}()
reconcileTask.instancesReconcileRequestChan <- request
}