Skip to content

Commit

Permalink
Merge pull request #73 from Rotorsoft/fix-infinite-retry
Browse files Browse the repository at this point in the history
fix: pause non-flowing subscription
  • Loading branch information
Rotorsoft committed May 30, 2024
2 parents 7c72623 + 78c541c commit fdba51a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
11 changes: 11 additions & 0 deletions libs/eventually-broker/src/cluster/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ export const work = async (
[],
position: lastResponse.id
};
if(!retryable) {
sub.loop.pause();
}
sendState(state);
return retryable;
}
Expand Down Expand Up @@ -295,12 +298,20 @@ export const work = async (
position: sub.state.position
};
pump(sub, trigger, sub.state.retryTimeoutSecs * 1000 * sub.retry_count);
} else {
sub.loop.pause();
}
};

const pumpChannel: TriggerCallback = (trigger) => {
subs.forEach((sub) => {
sub.retry_count = 0;
if(
trigger.operation === "REFRESH" ||
trigger.operation === "RESTART"
) {
sub.loop.resume();
}
pump(sub, trigger);
});
};
Expand Down
44 changes: 30 additions & 14 deletions libs/eventually/src/utils/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { sleep } from "./utils";
/**
* Scheduler statuses
*/
export type Status = "running" | "stopping" | "stopped";
export type Status = "running" | "paused" | "stopping" | "stopped";

/**
* Scheduler actions
Expand All @@ -28,6 +28,8 @@ export type Action = {
export interface Schedule extends Disposable {
push: (action: Action) => void;
stop: () => Promise<void>;
pause: () => void;
resume: () => void;
status: () => Status;
pending: () => number;
}
Expand All @@ -40,12 +42,14 @@ export interface Schedule extends Disposable {
export const scheduler = (name: string): Schedule => {
const queue: Array<Action> = [];
const delayed = new Map<string, NodeJS.Timeout>();
let status: Status = "stopped";
let _status: Status = "stopped";

log().green().trace(`Schedule "${name}" created`);

const status = (): Status => (_status)

const schedule = (action: Action): void => {
if (status === "stopping") return;
if (_status === "stopping" || _status === "paused") return;
clearTimeout(delayed.get(action.id));
delayed.set(
action.id,
Expand All @@ -57,43 +61,53 @@ export const scheduler = (name: string): Schedule => {
};

const enqueue = (action: Action): void => {
if (status === "stopping") return;
if (_status === "stopping" || _status === "paused") return;
queue.push(action);
setImmediate(dequeue);
};

const dequeue = async (): Promise<void> => {
if (status === "stopping") return;
status = "running";
while (queue.length && status === "running") {
if (_status === "stopping" || _status === "paused") return;
_status = "running";
while (queue.length && _status === "running") {
const action = queue.shift();
if (action) {
const result = await action.action();
action.callback && action.callback(action.id, result);
}
}
status = "stopped";
if(status() !== "paused") _status = "stopped";
};

const stop = async (): Promise<void> => {
if (status === "stopping") return;
status = "stopping";
if (_status === "stopping") return;
_status = "stopping";
delayed.forEach((timeout) => clearTimeout(timeout));
delayed.clear();
for (
let attempt = 1;
queue.length && status === "stopping" && attempt <= 10;
queue.length && _status === "stopping" && attempt <= 10;
attempt++
) {
log()
.red()
.trace(
`Schedule "${name}" - ${status} [${queue.length}] (${attempt})...`
`Schedule "${name}" - ${_status} [${queue.length}] (${attempt})...`
);
await sleep(1000);
}
queue.length = 0;
status = "stopped";
_status = "stopped";
};

const pause = (): void => {
if (_status !== "running") return;
_status = "paused";
};

const resume = (): void => {
if (_status !== "paused") return;
_status = "running";
};

return {
Expand All @@ -103,7 +117,9 @@ export const scheduler = (name: string): Schedule => {
action.delay ? schedule(action) : enqueue(action);
},
stop,
status: () => status,
pause,
resume,
status: () => _status,
pending: () => delayed.size
};
};

0 comments on commit fdba51a

Please sign in to comment.