diff --git a/foundations/server/packages/client/src/blob.ts b/foundations/server/packages/client/src/blob.ts index 65171c86119..30b30324ae3 100644 --- a/foundations/server/packages/client/src/blob.ts +++ b/foundations/server/packages/client/src/blob.ts @@ -46,6 +46,8 @@ export class BlobClient { ): Promise { let written = 0 const chunkSize = 50 * 1024 * 1024 + let emptyChunkRetries = 0 + const maxEmptyChunkRetries = 3 // Use ranges to iterave through file with retry if required. while (written < size) { @@ -65,6 +67,24 @@ export class BlobClient { }) const chunk = Buffer.concat(chunks) + // Check for empty chunk to prevent infinite loop + if (chunk.length === 0) { + emptyChunkRetries++ + if (emptyChunkRetries >= maxEmptyChunkRetries) { + ctx.error('Empty chunk received multiple times, aborting', { name, written, size, emptyChunkRetries }) + await new Promise((resolve) => { + writable.end(resolve) + }) + throw new Error( + `Empty chunk received ${emptyChunkRetries} times for blob ${name} at offset ${written}/${size}` + ) + } + ctx.warn('Empty chunk received, retrying', { name, written, size, retry: emptyChunkRetries }) + await new Promise((resolve) => setTimeout(resolve, 100 * emptyChunkRetries)) + continue + } + emptyChunkRetries = 0 // Reset on successful non-empty chunk + await new Promise((resolve, reject) => { writable.write(chunk, (err) => { if (err != null) { @@ -86,13 +106,13 @@ export class BlobClient { ctx.info('No such key', { name }) return } - if (i > 4) { + if (i >= 4) { await new Promise((resolve) => { writable.end(resolve) }) throw err } - await new Promise((resolve) => setTimeout(resolve, 10)) + await new Promise((resolve) => setTimeout(resolve, 100 * (i + 1))) // retry } } diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 5a630113002..6672bc6ac4c 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -108,8 +108,8 @@ export async function backup ( timeout: 0, skipDomains: [], connectTimeout: 30000, - skipBlobContentTypes: ['video/', 'image/', 'audio/'], - blobDownloadLimit: 5, + skipBlobContentTypes: ['video/', 'audio/'], + blobDownloadLimit: 2, keepSnapshots: 7 * 12 } ): Promise { @@ -349,7 +349,12 @@ export async function backup ( } } + let retryCount = 0 + const maxRetries = 5 while (true) { + if (canceled()) { + return { changed: 0, needRetrieveChunks: [] } + } try { const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(ctx, domain, idx)) if (domain === DOMAIN_BLOB) { @@ -360,6 +365,7 @@ export async function backup ( idx = currentChunk.idx ops++ + retryCount = 0 // Reset retry count on success let needRetrieve: RetriavableChunks = new Map() @@ -446,13 +452,19 @@ export async function backup ( break } } catch (err: any) { - ctx.error('failed to load chunks', { error: err }) + retryCount++ + ctx.error('failed to load chunks', { error: err, retryCount, maxRetries }) if (idx !== undefined) { await ctx.with('closeChunk', {}, async () => { await connection.closeChunk(ctx, idx as number) }) } - // Try again + if (retryCount >= maxRetries) { + ctx.error('Max retries exceeded in loadChangesFromServer', { domain, retryCount }) + throw new Error(`Max retries (${maxRetries}) exceeded while loading chunks for domain ${domain}`) + } + // Try again with delay + await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount)) idx = undefined processed = 0 } @@ -596,6 +608,8 @@ export async function backup ( } catch (err) {} let lastSize = 0 + const chunkRetryCount = new Map() + const maxChunkRetries = 3 while (needRetrieveChunks.length > 0) { if (canceled()) { @@ -659,8 +673,22 @@ export async function backup ( } ops++ } catch (err: any) { - ctx.error('error loading docs', { domain, err, workspace: workspaceId }) - // Put back. + const currentRetry = (chunkRetryCount.get(needRetrieve) ?? 0) + 1 + chunkRetryCount.set(needRetrieve, currentRetry) + ctx.error('error loading docs', { + domain, + err, + workspace: workspaceId, + retry: currentRetry, + maxRetries: maxChunkRetries + }) + if (currentRetry >= maxChunkRetries) { + ctx.error('Max retries exceeded for chunk, skipping', { domain, chunkSize: needRetrieve.size }) + // Skip this chunk after max retries + continue + } + // Put back with delay. + await new Promise((resolve) => setTimeout(resolve, 1000 * currentRetry)) needRetrieveChunks.push(needRetrieve) continue } @@ -763,7 +791,7 @@ export async function backup ( const descrJson = JSON.stringify(d) if (blob.size > options.blobDownloadLimit * 1024 * 1024) { - ctx.info('skip blob download, limit excheed', { + ctx.info('skip blob download, limit exceeded', { blob: blob._id, provider: blob.provider, size: Math.round(blob.size / (1024 * 1024)), @@ -1185,7 +1213,7 @@ export async function backup ( if (!canceled() && domainChanges > 0) { backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete backupInfo.migrations.forcedFullCheck = forcedFullCheck - backupInfo.migrations.forcedCompaction = forcedCompact + backupInfo.migrations.forcedCompact = forcedCompact backupInfo.dataSize = result.dataSize backupInfo.blobsSize = result.blobsSize backupInfo.backupSize = result.backupSize diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index 3f4e57f4529..76887d2d94e 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -61,7 +61,7 @@ export interface BackupConfig { } class BackupWorker { - downloadLimit: number = 5 + downloadLimit: number = 2 workspacesToBackup = new Map() rateLimiter: RateLimiter @@ -336,7 +336,7 @@ class BackupWorker { connectTimeout: 5 * 60 * 1000, // 5 minutes to, keepSnapshots: this.config.KeepSnapshots, blobDownloadLimit: this.downloadLimit, - skipBlobContentTypes: ['video/', 'audio/', 'image/'], + skipBlobContentTypes: ['video/', 'audio/'], fullVerify: this.fullCheck, progress: (progress) => { return notify?.(progress) ?? Promise.resolve() diff --git a/server/backup/src/types.ts b/server/backup/src/types.ts index 964f1d6e0e4..9df9a255286 100644 --- a/server/backup/src/types.ts +++ b/server/backup/src/types.ts @@ -75,6 +75,19 @@ export interface BackupSnapshot { stIndex: number // Snapshot index } +/** + * Known migration keys for backup info + * @public + */ +export interface BackupMigrations { + /** Flag indicating zero-size check has been performed */ + zeroCheckSize?: boolean + /** Version of forced compaction migration */ + forcedCompact?: string + /** Version of forced full check migration */ + forcedFullCheck?: string +} + /** * @public */ @@ -88,7 +101,7 @@ export interface BackupInfo { // A hash of current domain transactions, so we could skip all other checks if same. domainHashes: Record - migrations: Record + migrations: BackupMigrations dataSize?: number blobsSize?: number backupSize?: number