-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_scheduler.go
63 lines (57 loc) · 1.3 KB
/
task_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"context"
"crypto/rand"
"sync"
"time"
)
const (
StatusEnded = 0
StatusOngoing = 1
)
type Scheduler struct {
Ctx context.Context
MaxTasks int
TaskDeadline time.Duration
DefinedTasks map[string]Task
Ongoing int
Lock sync.Mutex
MaxConcurrentTasks int
RetryAfter time.Duration
}
func (sched *Scheduler) NewTask(payload, id string, task string) {
if sched.Ongoing >= sched.MaxConcurrentTasks {
go func() {
time.Sleep(sched.RetryAfter)
sched.NewTask(payload, id, task)
}()
LStderr.Printf("Max concurrent task count reached, waiting and retrying in %s ...", sched.RetryAfter.String())
return
}
uuid := make([]byte, 15)
_, err := rand.Read(uuid)
if err != nil {
LStderr.Println("error:", err)
}
status := make(chan int, 1)
go sched.Track(uuid, &status)
go sched.DefinedTasks[task](payload, id, &status)
}
func (sched *Scheduler) Track(uuid []byte, status *chan int) {
for {
select {
case n := <-(*status):
sched.Lock.Lock()
if n == StatusOngoing {
sched.Ongoing++
} else if n == StatusEnded {
sched.Ongoing--
LStdout.Println("Received task finished - ending ongoing task track")
return
}
sched.Lock.Unlock()
default:
time.Sleep(time.Millisecond * 500)
}
}
}