Skip to content

Commit

Permalink
Update: refine reconnect signal
Browse files Browse the repository at this point in the history
  • Loading branch information
ifooth committed Nov 20, 2023
1 parent e81ae27 commit b5e7de3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
3 changes: 1 addition & 2 deletions watch/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (w *Watcher) loopHeartbeat() error {
for {
select {
case <-w.vas.Ctx.Done():
logs.V(1).Infof("stream heartbeat stoped because of %s", w.vas.Ctx.Err().Error())
logs.Infof("stream heartbeat will closed because of %s", w.vas.Ctx.Err().Error())
return

case <-tick.C:
Expand All @@ -77,7 +77,6 @@ func (w *Watcher) loopHeartbeat() error {
logs.Warnf("stream heartbeat failed, notify reconnect upstream, err: %v, rid: %s", err, w.vas.Rid)

w.NotifyReconnect(types.ReconnectSignal{Reason: "stream heartbeat failed"})
return
}
logs.V(1).Infof("stream heartbeat successfully, rid: %s", w.vas.Rid)
}
Expand Down
26 changes: 17 additions & 9 deletions watch/reconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,40 @@ func (w *Watcher) NotifyReconnect(signal types.ReconnectSignal) {
}

func (w *Watcher) waitForReconnectSignal() {
for { // nolint
w.vas.Wg.Add(1)
defer w.vas.Wg.Done()

for {
select {
case <-w.vas.Ctx.Done():
logs.Warnf("watch reconnect signal will closed because of %s", w.vas.Ctx.Err().Error())
return

case signal := <-w.reconnectChan:
logs.Infof("received reconnect signal, reason: %s, rid: %s", signal.String(), w.vas.Rid)

if w.reconnecting.Load() {
logs.Warnf("received reconnect signal, but stream is already reconnecting, ignore this signal.")
return
continue
}

// stop the previous watch stream before close conn.
w.StopWatch()
w.tryReconnect(w.vas.Rid)
return
w.reconnecting.Store(true)
go func() {
defer w.reconnecting.Store(false)

w.StopWatch()
w.tryReconnect(w.vas.Rid)
}()
}
}
}

// tryReconnect, Use NotifyReconnect method instead of direct call
func (w *Watcher) tryReconnect(rid string) {
st := time.Now()
logs.Infof("start to reconnect the upstream server, rid: %s", rid)

w.reconnecting.Store(true)
// set reconnecting to false.
defer w.reconnecting.Store(false)

retry := tools.NewRetryPolicy(5, [2]uint{500, 15000})
for {
subRid := rid + strconv.FormatUint(uint64(retry.RetryCount()), 10)
Expand Down
10 changes: 6 additions & 4 deletions watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ func (w *Watcher) buildVas() (*kit.Vas, context.CancelFunc) {
// New return a Watcher
func New(u upstream.Upstream, opts option.WatchOptions) (*Watcher, error) {
w := &Watcher{
opts: opts,
upstream: u,
opts: opts,
upstream: u,
reconnectChan: make(chan types.ReconnectSignal, 3),
reconnecting: atomic.NewBool(false),
}

mh := sfs.SidecarMetaHeader{
BizID: w.opts.BizID,
Fingerprint: w.opts.Fingerprint,
Expand All @@ -86,8 +89,7 @@ func New(u upstream.Upstream, opts option.WatchOptions) (*Watcher, error) {
// StartWatch start watch stream
func (w *Watcher) StartWatch() error {
w.vas, w.cancel = w.buildVas()
w.reconnectChan = make(chan types.ReconnectSignal, 5)
w.reconnecting = atomic.NewBool(false)

var err error
apps := []sfs.SideAppMeta{}
for _, subscriber := range w.subscribers {
Expand Down

0 comments on commit b5e7de3

Please sign in to comment.