-
Notifications
You must be signed in to change notification settings - Fork 4
/
refresher.go
123 lines (107 loc) · 3.33 KB
/
refresher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/* Copyright 2019 DevFactory FZ LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
package time
import (
"sync"
"time"
)
// Refresher is a general thread safe periodic job runner with Start/Stop
// methods.
type Refresher interface {
// StartRefreshing() starts asynchronous periodic refresh of interfaces and
// assigned IPs from the operating system
StartRefreshing()
// StopRefreshing() stops asynchronous periodic refresh of interfaces and
// assigned IPs from the operating system
StopRefreshing()
// GetRefreshCount() returns how many times was the interfaces information
// reloaded
GetRefreshCount() int
// IsRefreshing() returns true if currently the Refresher is Started
// and running action
IsRefreshing() bool
}
type channelRefresher struct {
isRefreshing bool
refreshPadlock *sync.Mutex
refreshDoneChan chan interface{}
channel <-chan time.Time
refreshCounter int
action func()
}
// NewTickerRefresher returns a Refresher where refresh events are generated by a timed channel
// created with time.Ticker.
// It means, that if action() takes longer than refreshPeriod, refresh messages will be
// queued in the channel and a new action() will start immediately after the previous one
// ends.
func NewTickerRefresher(action func(), refreshOnCreate, autoRefresh bool, autoRefreshPeriod time.Duration) Refresher {
ticker := time.NewTicker(autoRefreshPeriod)
return NewRefresher(ticker.C, action, refreshOnCreate, autoRefresh)
}
// NewRefresher returns a new thread safe refresher, which runs action() if only it was previously Started.
// action() is run every time there's a new message in channel. Messages from channel are received in sequence.
func NewRefresher(channel <-chan time.Time, action func(), refreshOnCreate, autoRefresh bool) Refresher {
res := &channelRefresher{
isRefreshing: false,
refreshPadlock: &sync.Mutex{},
refreshDoneChan: make(chan interface{}, 1),
channel: channel,
action: action,
}
if refreshOnCreate {
res.runAction()
}
if autoRefresh {
res.StartRefreshing()
}
return res
}
func (r *channelRefresher) runAction() {
r.action()
r.refreshCounter++
}
func (r *channelRefresher) StartRefreshing() {
r.refreshPadlock.Lock()
defer r.refreshPadlock.Unlock()
if r.isRefreshing {
return
}
go func() {
for {
select {
case <-r.channel:
r.runAction()
case <-r.refreshDoneChan:
break
}
}
}()
r.isRefreshing = true
}
func (r *channelRefresher) StopRefreshing() {
r.refreshPadlock.Lock()
defer r.refreshPadlock.Unlock()
if !r.isRefreshing {
return
}
r.refreshDoneChan <- 0
r.isRefreshing = false
}
func (r *channelRefresher) GetRefreshCount() int {
r.refreshPadlock.Lock()
defer r.refreshPadlock.Unlock()
return r.refreshCounter
}
func (r *channelRefresher) IsRefreshing() bool {
r.refreshPadlock.Lock()
defer r.refreshPadlock.Unlock()
return r.isRefreshing
}