/
que.go
152 lines (130 loc) · 3.26 KB
/
que.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"log"
"math"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
/*
Queue service scans the working directory for job files.
The working directory should contain per-user subdirectories. E.g.:
arne/
bartel/
...
The in-memory representation is a cache and can be out-of-date at any point.
The queue service decides which job to hand out to a node if asked so.
*/
var (
Users = make(map[string]*User) // maps user -> joblist
)
// RPC-callable method: picks a job of the queue returns it
// for the node to run it.
func GiveJob(nodeAddr string) string {
WLock()
defer WUnlock()
user := nextUser()
if user == "" {
return ""
}
Users[user].FairShare += 1 // 1 second penalty because a job has started
return Users[user].giveJob(nodeAddr).ID
}
func AddFairShare(s string) string {
username := BaseDir(s)
share := atoi(s[len(username)+1:])
WLock()
defer WUnlock()
u := Users[username]
if u == nil {
return "no user " + username
}
log.Println("AddFairShare", username, share)
u.FairShare += float64(share)
return "" // ok
}
func nextUser() string {
// search user with least share and jobs in queue
leastShare := math.Inf(1)
var bestUser string
for n, u := range Users {
if u.HasJob() && u.FairShare < leastShare {
leastShare = u.FairShare
bestUser = n
}
}
return bestUser
}
// (Re-)load all jobs in the working directory.
// Called upon program startup.
func LoadJobs() {
dir, err := os.Open(".")
Fatal(err)
subdirs, err2 := dir.Readdir(-1)
Fatal(err2)
for _, d := range subdirs {
if d.IsDir() {
LoadUserJobs(d.Name())
}
}
}
// (Re-)load all jobs in the user's subdirectory.
func LoadUserJobs(dir string) string {
log.Println("LoadUserJobs", dir)
var newJobs []*Job
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, ".mx3") && !strings.HasPrefix(info.Name(), ".") {
ID := thisAddr + "/" + path
log.Println("addingJob", ID)
job := &Job{ID: ID}
job.Update()
newJobs = append(newJobs, job)
}
return nil
})
l := joblist(newJobs)
sort.Sort(&l)
Fatal(err) // TODO: recover?
WLock()
defer WUnlock()
if _, ok := Users[dir]; !ok {
Users[dir] = NewUser()
}
Users[dir].Jobs = newJobs
Users[dir].nextPtr = 0
return ""
}
type joblist []*Job
func (l *joblist) Len() int { return len(*l) }
func (l *joblist) Less(i, j int) bool { return (*l)[i].ID < (*l)[j].ID }
func (l *joblist) Swap(i, j int) { (*l)[i], (*l)[j] = (*l)[j], (*l)[i] }
// RPC-callable function. Refreshes the in-memory cached info about this job.
// Called, e.g., after a node has finished a job.
func UpdateJob(jobURL string) string {
WLock()
defer WUnlock()
j := JobByName(jobURL)
if j == nil {
log.Println("update", jobURL, ": no such job")
return "" // empty conventionally means error
}
j.Update()
return "updated " + jobURL // not used, but handy if called by Human.
}
// Periodically updates user's usedShare so they decay
// exponentially according to flag_haflife
func RunShareDecay() {
halflife := *flag_halflife
quantum := halflife / 100 // several updates per half-life gives smooth decay
reduce := math.Pow(0.5, float64(quantum)/float64(halflife))
for {
time.Sleep(quantum)
WLock()
for _, u := range Users {
u.FairShare *= reduce
}
WUnlock()
}
}