-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.ts
116 lines (99 loc) · 2.95 KB
/
main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const PORT = 8080;
const MAX_QUEUE_SIZE = 4;
interface TasksQueueRequestBody extends Record<string, number> {}
interface Status {
pending: Task[];
running: Task[];
}
class Task {
key: string;
duration: number;
constructor(key: string, duration: number) {
this.key = key;
this.duration = duration;
}
run(onDone: VoidFunction) {
console.log(`Task ${this.key} started. Running for ${this.duration}ms`);
setTimeout(onDone, this.duration);
}
}
class TaskScheduler {
private pending: Task[] = [];
private running: Task[] = [];
scheduleTask(task: Task) {
if (this.running.length >= MAX_QUEUE_SIZE) {
console.log(`Max queue size exceeded, ${task.key} is pending`);
this.pending.push(task);
return;
}
if (this.running.some(({ key }) => key === task.key)) {
console.log(`Task ${task.key} is already running. Queuing`);
this.pending.push(task);
return;
}
this.running.push(task);
task.run(() => {
console.log(`Task ${task.key} finished in ${task.duration}`);
const runningTaskIndex = this.running.indexOf(task);
if (runningTaskIndex > -1) {
this.running.splice(runningTaskIndex, 1);
}
this.scheduleNextTask();
});
}
scheduleNextTask() {
console.log("Scheduling next task");
if (this.pending.length === 0) {
console.log("No pending tasks");
return;
}
const nextViableTaskIndex = this.pending.findIndex(
({ key }) => !this.running.some((runningTask) => runningTask.key === key)
);
if (nextViableTaskIndex > -1) {
const task = this.pending[nextViableTaskIndex];
this.pending.splice(nextViableTaskIndex, 1);
this.scheduleTask(task);
} else {
console.log("No viable next task found");
}
}
append(tasks: Task[]) {
console.log(`Apending ${tasks.length} tasks`);
tasks.forEach((task) => this.scheduleTask(task));
}
status(): Status {
return {
pending: this.pending,
running: this.running,
};
}
}
const taskScheduler = new TaskScheduler();
const handler = async (request: Request): Promise<Response> => {
if (request.method === "GET" && request.url.endsWith("/queue/status")) {
return new Response(JSON.stringify(taskScheduler.status()), {
status: 200,
headers: {
"content-type": "application/json",
},
});
}
if (request.method === "POST" && request.url.endsWith("/queue/tasks")) {
try {
const body: TasksQueueRequestBody = await request.json();
taskScheduler.append(
Object.entries(body).reduce((acc: Task[], [key, value]) => {
acc.push(new Task(key, value));
return acc;
}, [])
);
return new Response("ok", { status: 200 });
} catch (e) {
return new Response("nok", { status: 500 });
}
}
return new Response("", { status: 404 });
};
console.log(`HTTP server running. Access it at: http://localhost:${PORT}/`);
Deno.serve({ port: PORT }, handler);