Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ credentials.txt
########
# root
.claude/
.codex/
.idea/
.mcp.json
.vscode/
Expand Down
5 changes: 0 additions & 5 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"lodash.mergewith": "^4.6.2",
"mongodb": "6.3",
"morgan": "^1.10.0",
"p-limit": "^7.2.0",
"rrule": "^2.7.2",
"saslprep": "^1.0.3",
"supertokens-node": "^23.0.1",
Expand Down
5 changes: 2 additions & 3 deletions packages/backend/src/sync/services/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
updateSync,
} from "@backend/sync/util/sync.queries";
import {
createConcurrencyLimiter,
getChannelExpiration,
isMissingGoogleRefreshToken,
isUsingHttps,
Expand Down Expand Up @@ -419,9 +420,7 @@ class SyncService {
users.push(user._id);
}

const { default: pLimit } = await import("p-limit"); // esm module support
// Limit concurrency to avoid resource exhaustion and API rate limits
const limit = pLimit(5); // Adjust concurrency as needed
const limit = createConcurrencyLimiter(5);

const run = await Promise.all(
users.map((user) =>
Expand Down
65 changes: 65 additions & 0 deletions packages/backend/src/sync/util/sync.util.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { createConcurrencyLimiter } from "@backend/sync/util/sync.util";

const waitForLimiterQueue = async () => {
await Promise.resolve();
await Promise.resolve();
};

describe("sync.util", () => {
describe("createConcurrencyLimiter", () => {
it("limits concurrent task execution", async () => {
const limit = createConcurrencyLimiter(2);
let activeCount = 0;
let maxActiveCount = 0;
const releaseTask: Array<() => void> = [];
const started: number[] = [];

const results = [0, 1, 2, 3].map((taskId) =>
limit(async () => {
started.push(taskId);
activeCount += 1;
maxActiveCount = Math.max(maxActiveCount, activeCount);

await new Promise<void>((resolve) => {
releaseTask.push(resolve);
});

activeCount -= 1;

return taskId;
}),
);

await waitForLimiterQueue();

expect(started).toEqual([0, 1]);
expect(activeCount).toBe(2);
expect(maxActiveCount).toBe(2);

releaseTask[0]?.();
await results[0];
await waitForLimiterQueue();

expect(started).toEqual([0, 1, 2]);
expect(activeCount).toBe(2);
expect(maxActiveCount).toBe(2);

releaseTask[1]?.();
await results[1];
await waitForLimiterQueue();

expect(started).toEqual([0, 1, 2, 3]);
expect(activeCount).toBe(2);
expect(maxActiveCount).toBe(2);

releaseTask[2]?.();
releaseTask[3]?.();

await expect(Promise.all(results)).resolves.toEqual([0, 1, 2, 3]);
});

it("throws when concurrency is less than 1", () => {
expect(() => createConcurrencyLimiter(0)).toThrow(RangeError);
});
});
});
38 changes: 38 additions & 0 deletions packages/backend/src/sync/util/sync.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,44 @@ export const canDoIncrementalSync = (sync: Schema_Sync) => {
return events.every((event) => event.nextSyncToken !== null);
};

type ConcurrencyLimiter = <Result>(
task: () => PromiseLike<Result> | Result,
) => Promise<Result>;

export const createConcurrencyLimiter = (
concurrency: number,
): ConcurrencyLimiter => {
if (!Number.isInteger(concurrency) || concurrency < 1) {
throw new RangeError("Concurrency must be an integer greater than 0");
}

let activeCount = 0;
const queue: Array<() => void> = [];

const runNext = () => {
activeCount -= 1;
queue.shift()?.();
};

return async <Result>(
task: () => PromiseLike<Result> | Result,
): Promise<Result> => {
if (activeCount >= concurrency) {
await new Promise<void>((resolve) => {
queue.push(resolve);
});
}

activeCount += 1;

try {
return await task();
} finally {
runNext();
}
};
};

export const isUsingHttps = () => getBaseURL().includes("https");

export const logExpirationReminder = (min: number) => {
Expand Down
Loading