forked from cloudfoundry/bosh-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
concrete_manager.go
141 lines (110 loc) · 2.77 KB
/
concrete_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package task
import (
"encoding/json"
"path"
bosherr "github.com/cloudfoundry/bosh-utils/errors"
boshlog "github.com/cloudfoundry/bosh-utils/logger"
boshsys "github.com/cloudfoundry/bosh-utils/system"
)
type concreteManagerProvider struct{}
func NewManagerProvider() ManagerProvider {
return concreteManagerProvider{}
}
func (provider concreteManagerProvider) NewManager(
logger boshlog.Logger,
fs boshsys.FileSystem,
dir string,
) Manager {
return NewManager(logger, fs, path.Join(dir, "tasks.json"))
}
type concreteManager struct {
logger boshlog.Logger
fs boshsys.FileSystem
fsSem chan func()
tasksPath string
// Access to taskInfos must be synchronized via fsSem
taskInfos map[string]Info
}
func NewManager(logger boshlog.Logger, fs boshsys.FileSystem, tasksPath string) Manager {
m := &concreteManager{
logger: logger,
fs: fs,
fsSem: make(chan func()),
tasksPath: tasksPath,
taskInfos: make(map[string]Info),
}
go m.processFsFuncs()
return m
}
func (m *concreteManager) GetInfos() ([]Info, error) {
taskInfosChan := make(chan map[string]Info)
errCh := make(chan error)
m.fsSem <- func() {
taskInfos, err := m.readInfos()
m.taskInfos = taskInfos
taskInfosChan <- taskInfos
errCh <- err
}
taskInfos := <-taskInfosChan
err := <-errCh
if err != nil {
return nil, err
}
var r []Info
for _, taskInfo := range taskInfos {
r = append(r, taskInfo)
}
return r, nil
}
func (m *concreteManager) AddInfo(taskInfo Info) error {
errCh := make(chan error)
m.fsSem <- func() {
m.taskInfos[taskInfo.TaskID] = taskInfo
err := m.writeInfos(m.taskInfos)
errCh <- err
}
return <-errCh
}
func (m *concreteManager) RemoveInfo(taskID string) error {
errCh := make(chan error)
m.fsSem <- func() {
delete(m.taskInfos, taskID)
err := m.writeInfos(m.taskInfos)
errCh <- err
}
return <-errCh
}
func (m *concreteManager) processFsFuncs() {
defer m.logger.HandlePanic("Task Manager Process Fs Funcs")
for {
do := <-m.fsSem
do()
}
}
func (m *concreteManager) readInfos() (map[string]Info, error) {
taskInfos := make(map[string]Info)
exists := m.fs.FileExists(m.tasksPath)
if !exists {
return taskInfos, nil
}
tasksJSON, err := m.fs.ReadFile(m.tasksPath)
if err != nil {
return nil, bosherr.WrapError(err, "Reading tasks json")
}
err = json.Unmarshal(tasksJSON, &taskInfos)
if err != nil {
return nil, bosherr.WrapError(err, "Unmarshaling tasks json")
}
return taskInfos, nil
}
func (m *concreteManager) writeInfos(taskInfos map[string]Info) error {
newTasksJSON, err := json.Marshal(taskInfos)
if err != nil {
return bosherr.WrapError(err, "Marshalling tasks json")
}
err = m.fs.WriteFile(m.tasksPath, newTasksJSON)
if err != nil {
return bosherr.WrapError(err, "Writing tasks json")
}
return nil
}