Skip to content

Commit

Permalink
feat(ext/kv) add backoffSchedule to enqueue (#21474)
Browse files Browse the repository at this point in the history
Also reduces the time to run `kv_queue_undelivered_test.ts` test from
100 seconds down to 3 seconds.

closes #21437
  • Loading branch information
igorzi committed Dec 13, 2023
1 parent 0ceae7a commit 86769b0
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 8 deletions.
3 changes: 2 additions & 1 deletion cli/tests/unit/kv_queue_undelivered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ queueTest("queue with undelivered", async (db) => {
try {
await db.enqueue("test", {
keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]],
backoffSchedule: [10, 20],
});
await sleep(100000);
await sleep(3000);
const undelivered = await collect(db.list({ prefix: ["queue_failed"] }));
assertEquals(undelivered.length, 2);
assertEquals(undelivered[0].key, ["queue_failed", "a"]);
Expand Down
35 changes: 35 additions & 0 deletions cli/tests/unit/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,24 @@ queueTest("queue retries", async (db) => {
assertEquals(4, count);
});

queueTest("queue retries with backoffSchedule", async (db) => {
let count = 0;
const listener = db.listenQueue((_msg) => {
count += 1;
throw new TypeError("dequeue error");
});
try {
await db.enqueue("test", { backoffSchedule: [1] });
await sleep(2000);
} finally {
db.close();
await listener;
}

// There should have been 1 attempt + 1 retry
assertEquals(2, count);
});

queueTest("multiple listenQueues", async (db) => {
const numListens = 10;
let count = 0;
Expand Down Expand Up @@ -1876,6 +1894,23 @@ Deno.test({
},
});

dbTest("invalid backoffSchedule", async (db) => {
await assertRejects(
async () => {
await db.enqueue("foo", { backoffSchedule: [1, 1, 1, 1, 1, 1] });
},
TypeError,
"invalid backoffSchedule",
);
await assertRejects(
async () => {
await db.enqueue("foo", { backoffSchedule: [3600001] });
},
TypeError,
"invalid backoffSchedule",
);
});

dbTest("atomic operation is exposed", (db) => {
assert(Deno.AtomicOperation);
const ao = db.atomic();
Expand Down
24 changes: 21 additions & 3 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,11 @@ declare namespace Deno {
*/
enqueue(
value: unknown,
options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
options?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): this;
/**
* Commit the operation to the KV store. Returns a value indicating whether
Expand Down Expand Up @@ -1995,14 +1999,28 @@ declare namespace Deno {
* listener after several attempts. The values are set to the value of
* the queued message.
*
* The `backoffSchedule` option can be used to specify the retry policy for
* failed message delivery. Each element in the array represents the number of
* milliseconds to wait before retrying the delivery. For example,
* `[1000, 5000, 10000]` means that a failed delivery will be retried
* at most 3 times, with 1 second, 5 seconds, and 10 seconds delay
* between each retry.
*
* ```ts
* const db = await Deno.openKv();
* await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] });
* await db.enqueue("bar", {
* keysIfUndelivered: [["foo", "bar"]],
* backoffSchedule: [1000, 5000, 10000],
* });
* ```
*/
enqueue(
value: unknown,
options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
options?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): Promise<KvCommitResult>;

/**
Expand Down
36 changes: 32 additions & 4 deletions ext/kv/01_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) {
}
}

const maxQueueBackoffIntervals = 5;
const maxQueueBackoffInterval = 60 * 60 * 1000;

function validateBackoffSchedule(backoffSchedule: number[]) {
if (backoffSchedule.length > maxQueueBackoffIntervals) {
throw new TypeError("invalid backoffSchedule");
}
for (const interval of backoffSchedule) {
if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) {
throw new TypeError("invalid backoffSchedule");
}
}
}

interface RawKvEntry {
key: Deno.KvKey;
value: RawValue;
Expand Down Expand Up @@ -224,18 +238,25 @@ class Kv {

async enqueue(
message: unknown,
opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
opts?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
) {
if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay);
}
if (opts?.backoffSchedule !== undefined) {
validateBackoffSchedule(opts?.backoffSchedule);
}

const enqueues = [
[
core.serialize(message, { forStorage: true }),
opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [],
null,
opts?.backoffSchedule ?? null,
],
];

Expand Down Expand Up @@ -468,16 +489,23 @@ class AtomicOperation {

enqueue(
message: unknown,
opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
opts?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): this {
if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay);
}
if (opts?.backoffSchedule !== undefined) {
validateBackoffSchedule(opts?.backoffSchedule);
}
this.#enqueues.push([
core.serialize(message, { forStorage: true }),
opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [],
null,
opts?.backoffSchedule ?? null,
]);
return this;
}
Expand Down
3 changes: 3 additions & 0 deletions ext/kv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,9 @@ where

for enqueue in &enqueues {
total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
total_payload_size += 4 * schedule.len();
}
}

if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {
Expand Down

0 comments on commit 86769b0

Please sign in to comment.