Skip to content

Commit

Permalink
Avoid 429 responses by slowing down sync if needed.
Browse files Browse the repository at this point in the history
* If only 50% of remaining is left, stall next sync until (reset / remaining) seconds
* Bugfix - used registerSyncEvent() instead of triggerSync() - would only work in production with SW active.
* Simplify performGuardedJob() using navigator.locks.
  • Loading branch information
dfahlander committed Oct 7, 2023
1 parent a52b1e2 commit 3c4e698
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 150 deletions.
2 changes: 1 addition & 1 deletion addons/dexie-cloud/src/isEagerSyncDisabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { DexieCloudDB } from './db/DexieCloudDB';
export function isEagerSyncDisabled(db: DexieCloudDB) {
return (
db.cloud.options?.disableEagerSync ||
db.cloud.currentUser.value?.userId !== 'ok' ||
db.cloud.currentUser.value?.license?.status !== 'ok' ||
!db.cloud.options?.databaseUrl
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { registerSyncEvent } from '../sync/registerSyncEvent';
import { TXExpandos } from '../types/TXExpandos';
import { outstandingTransactions } from './outstandingTransaction';
import { isEagerSyncDisabled } from '../isEagerSyncDisabled';
import { triggerSync } from '../sync/triggerSync';

export interface MutationTrackingMiddlewareArgs {
currentUserObservable: BehaviorSubject<UserLogin>;
Expand Down Expand Up @@ -97,7 +98,7 @@ export function createMutationTrackingMiddleware({
tx.mutationsAdded &&
!isEagerSyncDisabled(db)
) {
registerSyncEvent(db, 'push');
triggerSync(db, 'push');
}
removeTransaction();
};
Expand Down
128 changes: 8 additions & 120 deletions addons/dexie-cloud/src/sync/performGuardedJob.ts
Original file line number Diff line number Diff line change
@@ -1,125 +1,13 @@
import { liveQuery, Table } from 'dexie';
import { MINUTES, SECONDS } from '../helpers/date-constants';
import { DexieCloudDB } from '../db/DexieCloudDB';
import { GuardedJob } from '../db/entities/GuardedJob';
import { myId } from './myId';
import { from } from 'rxjs';
import { filter, timeout } from 'rxjs/operators';
import { DexieCloudDB } from "../db/DexieCloudDB";

const GUARDED_JOB_HEARTBEAT = 1 * SECONDS;
const GUARDED_JOB_TIMEOUT = 1 * MINUTES;

export async function performGuardedJob(
export function performGuardedJob<T>(
db: DexieCloudDB,
jobName: string,
jobsTableName: string,
job: () => Promise<any>,
{ awaitRemoteJob }: { awaitRemoteJob?: boolean } = {}
): Promise<void> {
// Start working.
//
// Check if someone else is working on this already.
//
const jobsTable = db.table(jobsTableName) as Table<GuardedJob, string>;

async function aquireLock() {
const gotTheLock = await db.transaction('rw!', jobsTableName, async () => {
const currentWork = await jobsTable.get(jobName);
if (!currentWork) {
// No one else is working. Let's record that we are.
await jobsTable.add(
{
nodeId: myId,
started: new Date(),
heartbeat: new Date()
},
jobName
);
return true;
} else if (
currentWork.heartbeat.getTime() <
Date.now() - GUARDED_JOB_TIMEOUT
) {
console.warn(
`Latest ${jobName} worker seem to have died.\n`,
`The dead job started:`,
currentWork.started,
`\n`,
`Last heart beat was:`,
currentWork.heartbeat,
'\n',
`We're now taking over!`
);
// Now, take over!
await jobsTable.put(
{
nodeId: myId,
started: new Date(),
heartbeat: new Date()
},
jobName
);
return true;
}
return false;
});

if (gotTheLock) return true;

// Someone else took the job.
if (awaitRemoteJob) {
try {
const jobDoneObservable = from(
liveQuery(() => jobsTable.get(jobName))
).pipe(
timeout(GUARDED_JOB_TIMEOUT),
filter((job) => !job)
); // Wait til job is not there anymore.
await jobDoneObservable.toPromise();
return false;
} catch (err) {
if (err.name !== 'TimeoutError') {
throw err;
}
// Timeout stopped us! Try aquire the lock now.
// It will likely succeed this time unless
// another client took it.
return await aquireLock();
}
}
return false;
}

if (await aquireLock()) {
// We own the lock entry and can do our job undisturbed.
// We're not within a transaction, but these type of locks
// spans over transactions.

// Start our heart beat during the job.
// Use setInterval to make sure we are updating heartbeat even during long-lived fetch calls.
const heartbeat = setInterval(() => {
jobsTable.update(
jobName,
(job) => {
if (job.nodeId === myId) {
job.heartbeat = new Date();
}
}
);
}, GUARDED_JOB_HEARTBEAT);

try {
return await job();
} finally {
// Stop heartbeat
clearInterval(heartbeat);
// Remove the persisted job state:
await db.transaction('rw!', jobsTableName, async () => {
const currentWork = await jobsTable.get(jobName);
if (currentWork && currentWork.nodeId === myId) {
await jobsTable.delete(jobName);
}
});
}
job: () => Promise<T>
): Promise<T> {
if (typeof navigator === 'undefined' || !navigator.locks) {
// No support for guarding jobs. IE11, node.js, etc.
return job();
}
return navigator.locks.request(db.name + '|' + jobName, () => job());
}
34 changes: 34 additions & 0 deletions addons/dexie-cloud/src/sync/ratelimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { DexieCloudDB } from '../db/DexieCloudDB';

// If we get Ratelimit-Limit and Ratelimit-Remaining where Ratelimit-Remaining is below
// (Ratelimit-Limit / 2), we should delay the next sync by (Ratelimit-Reset / Ratelimit-Remaining)
// seconds (given that there is a Ratelimit-Reset header).

let syncRatelimitDelays = new WeakMap<DexieCloudDB, Date>();

export async function checkSyncRateLimitDelay(db: DexieCloudDB) {
const delatMilliseconds = (syncRatelimitDelays.get(db)?.getTime() ?? 0) - Date.now();
if (delatMilliseconds > 0) {
console.debug(`Stalling sync request ${delatMilliseconds} ms to spare ratelimits`);
await new Promise(resolve => setTimeout(resolve, delatMilliseconds));
}
}

export function updateSyncRateLimitDelays(db: DexieCloudDB, res: Response) {
const limit = res.headers.get('Ratelimit-Limit');
const remaining = res.headers.get('Ratelimit-Remaining');
const reset = res.headers.get('Ratelimit-Reset');
if (limit && remaining && reset) {
const limitNum = Number(limit);
const remainingNum = Math.max(0, Number(remaining));
const willResetInSeconds = Number(reset);
if (remainingNum < limitNum / 2) {
const delay = Math.ceil(willResetInSeconds / (remainingNum + 1));
syncRatelimitDelays.set(db, new Date(Date.now() + delay * 1000));
console.debug(`Sync ratelimit delay set to ${delay} seconds`);
} else {
syncRatelimitDelays.delete(db);
console.debug(`Sync ratelimit delay cleared`);
}
}
}
27 changes: 2 additions & 25 deletions addons/dexie-cloud/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,10 @@ import { isOnline } from './isOnline';
import { updateBaseRevs } from './updateBaseRevs';
import { getLatestRevisionsPerTable } from './getLatestRevisionsPerTable';
import { applyServerChanges } from './applyServerChanges';
import { checkSyncRateLimitDelay } from './ratelimit';

export const CURRENT_SYNC_WORKER = 'currentSyncWorker';

export let numberOfSyncRequests = 0;

/*
TODO:
1. V: Rätta flödet och gör det persistent mellan transaktioner
2. Sync-requestet ska autenticera sig med nuvarande användare.
MEN:
Vissa medskickade operationer kan vara gjorda av annan användare.
Därför: Om några av client-changes är andra användare, så måste de användarnas
tokens följa med som extra parameter till fetch-requestet.
Servern skall då validera och genomföra dessa operationer baserat på alternativt token.
Kanske kan vi skita i det flödet just nu och hindra att det uppstår istället.
Hur? Jo, genom:
1. Användare är ANONYMOUS
2. Data laddas ned.
3. Data modifieras.
4. Användare loggar in.
5. Sync: Några inledande requests är ANONYMOUS men autenticeras som användaren.
X: Se till att vi förhandlar initialt sync state eller uppdaterat sync state (tabell aliases etc)
Y: Använd Bison hjälpare för streamad BISON?
*/

export interface SyncOptions {
isInitialSync?: boolean;
cancelToken?: { cancelled: boolean };
Expand Down Expand Up @@ -361,6 +337,7 @@ async function _sync(
});
if (!done) {
console.debug('MORE SYNC NEEDED. Go for it again!');
await checkSyncRateLimitDelay(db);
return await _sync(db, options, schema, { isInitialSync, cancelToken });
}
console.debug('SYNC DONE', { isInitialSync });
Expand Down
8 changes: 7 additions & 1 deletion addons/dexie-cloud/src/sync/syncIfPossible.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DexieCloudDB } from '../db/DexieCloudDB';
import { sync, CURRENT_SYNC_WORKER, SyncOptions } from './sync';
import { DexieCloudOptions } from '../DexieCloudOptions';
import { assert, DexieCloudSchema } from 'dexie-cloud-common';
import { checkSyncRateLimitDelay } from './ratelimit';

const ongoingSyncs = new WeakMap<
DexieCloudDB,
Expand Down Expand Up @@ -61,19 +62,24 @@ export function syncIfPossible(
);
}
}

const promise = _syncIfPossible();
ongoingSyncs.set(db, { promise, pull: options?.purpose !== 'push' });
return promise;

async function _syncIfPossible() {
try {
// Check if should delay sync due to ratelimit:
await checkSyncRateLimitDelay(db);

// Check if we need to lock the sync job. Not needed if we are the service worker.
if (db.cloud.isServiceWorkerDB) {
// We are the dedicated sync SW:
await sync(db, cloudOptions, cloudSchema, options);
} else if (!db.cloud.usingServiceWorker) {
// We use a flow that is better suited for the case when multiple workers want to
// do the same thing.
await performGuardedJob(db, CURRENT_SYNC_WORKER, '$jobs', () =>
await performGuardedJob(db, CURRENT_SYNC_WORKER, () =>
sync(db, cloudOptions, cloudSchema, options)
);
} else {
Expand Down
9 changes: 7 additions & 2 deletions addons/dexie-cloud/src/sync/syncWithServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from 'dexie-cloud-common';
import { encodeIdsForServer } from './encodeIdsForServer';
import { UserLogin } from '../db/entities/UserLogin';
import { updateSyncRateLimitDelays } from './ratelimit';
//import {BisonWebStreamReader} from "dreambase-library/dist/typeson-simplified/BisonWebStreamReader";

export async function syncWithServer(
Expand All @@ -33,14 +34,16 @@ export async function syncWithServer(
'Content-Type': 'application/tson',
};
const updatedUser = await loadAccessToken(db);
/*
if (updatedUser?.license && changes.length > 0) {
/*if (updatedUser.license.status === 'expired') {
if (updatedUser.license.status === 'expired') {
throw new Error(`License has expired`);
}
if (updatedUser.license.status === 'deactivated') {
throw new Error(`License deactivated`);
}*/
}
}
*/
const accessToken = updatedUser?.accessToken;
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`;
Expand Down Expand Up @@ -75,6 +78,8 @@ export async function syncWithServer(
phase: 'pulling',
});

updateSyncRateLimitDelays(db, res);

if (!res.ok) {
throw new HttpError(res);
}
Expand Down

0 comments on commit 3c4e698

Please sign in to comment.