-
Notifications
You must be signed in to change notification settings - Fork 12
/
interval-based.ts
129 lines (110 loc) · 3.29 KB
/
interval-based.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import type { Cron } from '../cron'
interface ITaskWrapper {
id: number
cron: Cron
nextExecution: Date
isOneTimeTask: boolean
task: () => unknown
}
/**
* A cron scheduler that is based on a single interval.
* Every interval, it checks whether a registered cron task
* was due during the last interval and executes it.
* This scheduler might be more performant depending on the use case,
* because it only creates a single interval for all scheduled crons,
* however depending on the interval and crons, tasks might be executed
* with a delay.
*/
export class IntervalBasedCronScheduler {
#interval: number
#intervalId?: ReturnType<typeof setInterval>
#tasks: ITaskWrapper[] = []
#nextTaskId = 1
/**
* Creates and starts a new scheduler with the given interval.
*/
public constructor(interval: number) {
this.#interval = interval
this.start()
}
/* Starts the scheduler. */
public start(): void {
if (this.#intervalId !== undefined) {
throw new Error('Scheduler already started.')
}
this.#intervalId = setInterval(this.processTasks.bind(this), this.#interval)
}
/* Ensures the scheduler is stopped. */
public stop(): void {
if (this.#intervalId) {
clearInterval(this.#intervalId)
this.#intervalId = undefined
}
}
/* Inserts a task in the tasks array at the right position sorted by nextExecution time. */
private insertTask(newTask: ITaskWrapper) {
const index = this.#tasks.findIndex(
(task) => task.nextExecution.getTime() > newTask.nextExecution.getTime()
)
this.#tasks.splice(index, 0, newTask)
}
/* Registers a new task. */
public registerTask(
cron: Cron,
task: () => unknown,
isOneTimeTask = false
): number {
const id = this.#nextTaskId
this.insertTask({
id,
cron,
nextExecution: cron.getNextDate(),
isOneTimeTask,
task,
})
this.#nextTaskId += 1
return id
}
/** Unregisters a task, causing it to no longer be executed. */
public unregisterTask(id: number): void {
const taskIndex = this.#tasks.findIndex((task) => task.id === id)
if (taskIndex === -1) throw new Error('Task not found.')
this.#tasks.splice(taskIndex, 1)
}
/* Sorts the tasks array based on the next execution time so that the next task is first in the array. */
private sortTasks(): void {
this.#tasks.sort((a, b) => {
return a.nextExecution.getTime() - b.nextExecution.getTime()
})
}
private processTasks(): void {
const now = Date.now()
let taskExecuted = false
let oneTimeTaskExecuted = false
// Execute all due tasks and update nextExecution for non-one-time tasks.
for (let i = 0; i < this.#tasks.length; i += 1) {
const task = this.#tasks[i]
if (task.nextExecution.getTime() <= now) {
task.task()
taskExecuted = true
if (!task.isOneTimeTask) {
task.nextExecution = task.cron.getNextDate()
} else {
oneTimeTaskExecuted = true
}
} else {
break
}
}
if (oneTimeTaskExecuted) {
// Remove one time tasks.
this.#tasks = this.#tasks.filter(
(task) => task.nextExecution.getTime() > now
)
}
if (taskExecuted) {
// Sort tasks based on nextExecution again.
this.sortTasks()
}
}
}