-
Notifications
You must be signed in to change notification settings - Fork 10
/
observed_worker.go
126 lines (113 loc) · 2.15 KB
/
observed_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package monitor
import (
"crypto/rand"
"errors"
"fmt"
"log"
"sync"
)
const (
Stopped = iota
Running
Failed
Completed
)
type DiscretWork interface {
DoWork() (bool, error)
GetProgress() interface{}
BeforeRun() error
AfterStop() error
}
func genUid() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
type MonitoredWorker struct {
lc sync.Mutex
Itw DiscretWork
wgrun sync.WaitGroup
guid string
state int
chsig chan int
stwg sync.WaitGroup
}
func (mw *MonitoredWorker) wgoroute() {
log.Println("info: work start", mw.GetId())
defer func() {
log.Print("info: realease work guid ", mw.GetId())
mw.wgrun.Done()
}()
for {
select {
case newState := <-mw.chsig:
if newState == Stopped {
mw.state = newState
log.Println("info: work stopped")
return
}
default:
{
isdone, err := mw.Itw.DoWork()
if err != nil {
log.Println("error: guid", mw.guid, " work failed", err)
mw.state = Failed
return
}
if isdone {
mw.state = Completed
log.Println("info: work done")
return
}
}
}
}
}
func (mw MonitoredWorker) GetState() int {
return mw.state
}
func (mw *MonitoredWorker) GetId() string {
if len(mw.guid) == 0 {
mw.guid = genUid()
}
return mw.guid
}
func (mw *MonitoredWorker) Start() error {
mw.lc.Lock()
defer mw.lc.Unlock()
if mw.state == Completed {
return errors.New("error: try run completed job")
}
if mw.state == Running {
return errors.New("error: try run runing job")
}
if err := mw.Itw.BeforeRun(); err != nil {
mw.state = Failed
return err
}
mw.chsig = make(chan int, 1)
mw.state = Running
mw.wgrun.Add(1)
go mw.wgoroute()
return nil
}
func (mw *MonitoredWorker) Stop() error {
mw.lc.Lock()
defer mw.lc.Unlock()
if mw.state != Running {
return errors.New("error: imposible stop non runing job")
}
mw.chsig <- Stopped
mw.wgrun.Wait()
close(mw.chsig)
if err := mw.Itw.AfterStop(); err != nil {
return err
}
return nil
}
func (mw *MonitoredWorker) Wait() {
mw.wgrun.Wait()
}
func (mw MonitoredWorker) GetProgress() interface{} {
return mw.Itw.GetProgress()
}