-
Notifications
You must be signed in to change notification settings - Fork 4
/
default_scheduler.ts
115 lines (96 loc) · 2.32 KB
/
default_scheduler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import {PID} from 'erlang-types'
import ProcessQueue from './process_queue'
class DefaultScheduler {
isRunning: boolean
invokeLater: (callback: () => void) => void
reductions_per_process: number
queues: Map<PID, ProcessQueue>
constructor(throttle: number = 0, reductions_per_process: number = 8) {
this.isRunning = false
this.invokeLater = function(callback) {
setTimeout(callback, throttle)
}
// In our case a reduction is equal to a task call
// Controls how many tasks are called at a time per process
this.reductions_per_process = reductions_per_process
this.queues = new Map()
this.run()
}
addToQueue(pid: PID, task: () => any) {
if (!this.queues.has(pid)) {
this.queues.set(pid, new ProcessQueue(pid))
}
const queue = this.queues.get(pid)
if (queue) {
queue.add(task)
}
}
removePid(pid: PID) {
this.isRunning = true
this.queues.delete(pid)
this.isRunning = false
}
_run(run: Function) {
this.invokeLater(() => {
run()
})
}
run() {
if (this.isRunning) {
this._run(this.run.bind(this))
} else {
for (let [pid, queue] of this.queues) {
let reductions = 0
while (
queue &&
!queue.empty() &&
reductions < this.reductions_per_process
) {
let task = queue.next()
this.isRunning = true
let result
try {
if (task) {
result = task()
}
} catch (e) {
console.error(e)
result = e
}
this.isRunning = false
if (result instanceof Error) {
throw result
}
reductions++
}
}
this._run(this.run.bind(this))
}
}
addToScheduler(pid: PID, task: () => any, dueTime: number = 0) {
if (dueTime === 0) {
this.invokeLater(() => {
this.addToQueue(pid, task)
})
} else {
setTimeout(() => {
this.addToQueue(pid, task)
}, dueTime)
}
}
schedule(pid: PID, task: () => any) {
this.addToScheduler(pid, () => {
task()
})
}
scheduleFuture(pid: PID, dueTime: number, task: () => any) {
this.addToScheduler(
pid,
() => {
task()
},
dueTime
)
}
}
export default DefaultScheduler