forked from kubernetes/ingress-gce
/
syncer.go
164 lines (142 loc) · 3.86 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncers
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
type syncerCore interface {
sync() error
}
// syncer is a NEG syncer skeleton.
// It handles state transitions and backoff retry operations.
type syncer struct {
// metadata
NegSyncerKey
negName string
// NEG sync function
core syncerCore
// event recording
serviceLister cache.Indexer
recorder record.EventRecorder
// syncer states
stateLock sync.Mutex
stopped bool
shuttingDown bool
// sync signal and retry handling
syncCh chan interface{}
clock clock.Clock
backoff backoffHandler
}
func newSyncer(negSyncerKey NegSyncerKey, networkEndpointGroupName string, serviceLister cache.Indexer, recorder record.EventRecorder, core syncerCore) *syncer {
return &syncer{
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
core: core,
serviceLister: serviceLister,
recorder: recorder,
stopped: true,
shuttingDown: false,
clock: clock.RealClock{},
backoff: NewExponentialBackendOffHandler(maxRetries, minRetryDelay, maxRetryDelay),
}
}
func (s *syncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s is already running.", s.NegSyncerKey.String())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.NegSyncerKey.String())
}
glog.V(2).Infof("Starting NEG syncer for service port %s", s.NegSyncerKey.String())
s.init()
go func() {
for {
// equivalent to never retry
retryCh := make(<-chan time.Time)
err := s.core.sync()
if err != nil {
delay, retryErr := s.backoff.NextRetryDelay()
retryMesg := ""
if retryErr == ErrRetriesExceeded {
retryMesg = "(will not retry)"
} else {
retryCh = s.clock.After(delay)
retryMesg = "(will retry)"
}
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeWarning, "SyncNetworkEndpointGroupFailed", "Failed to sync NEG %q %s: %v", s.negName, retryMesg, err)
}
} else {
s.backoff.ResetRetryDelay()
}
select {
case _, open := <-s.syncCh:
if !open {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s", s.NegSyncerKey.String())
return
}
case <-retryCh:
// continue to sync
}
}
}()
return nil
}
func (s *syncer) init() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
s.stopped = false
s.syncCh = make(chan interface{}, 1)
}
func (s *syncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.NegSyncerKey.String())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
}
}
func (s *syncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s is already stopped.", s.NegSyncerKey.String())
return false
}
select {
case s.syncCh <- struct{}{}:
return true
default:
return false
}
}
func (s *syncer) IsStopped() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.stopped
}
func (s *syncer) IsShuttingDown() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.shuttingDown
}