Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions foundations/server/packages/client/src/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class BlobClient {
): Promise<void> {
let written = 0
const chunkSize = 50 * 1024 * 1024
let emptyChunkRetries = 0
const maxEmptyChunkRetries = 3

// Use ranges to iterave through file with retry if required.
while (written < size) {
Expand All @@ -65,6 +67,24 @@ export class BlobClient {
})
const chunk = Buffer.concat(chunks)

// Check for empty chunk to prevent infinite loop
if (chunk.length === 0) {
emptyChunkRetries++
if (emptyChunkRetries >= maxEmptyChunkRetries) {
ctx.error('Empty chunk received multiple times, aborting', { name, written, size, emptyChunkRetries })
await new Promise<void>((resolve) => {
writable.end(resolve)
})
throw new Error(
`Empty chunk received ${emptyChunkRetries} times for blob ${name} at offset ${written}/${size}`
)
}
ctx.warn('Empty chunk received, retrying', { name, written, size, retry: emptyChunkRetries })
await new Promise<void>((resolve) => setTimeout(resolve, 100 * emptyChunkRetries))
continue
}
emptyChunkRetries = 0 // Reset on successful non-empty chunk

await new Promise<void>((resolve, reject) => {
writable.write(chunk, (err) => {
if (err != null) {
Expand All @@ -86,13 +106,13 @@ export class BlobClient {
ctx.info('No such key', { name })
return
}
if (i > 4) {
if (i >= 4) {
await new Promise<void>((resolve) => {
writable.end(resolve)
})
throw err
}
await new Promise<void>((resolve) => setTimeout(resolve, 10))
await new Promise<void>((resolve) => setTimeout(resolve, 100 * (i + 1)))
// retry
}
}
Expand Down
44 changes: 36 additions & 8 deletions server/backup/src/backup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ export async function backup (
timeout: 0,
skipDomains: [],
connectTimeout: 30000,
skipBlobContentTypes: ['video/', 'image/', 'audio/'],
blobDownloadLimit: 5,
skipBlobContentTypes: ['video/', 'audio/'],
blobDownloadLimit: 2,
keepSnapshots: 7 * 12
}
): Promise<BackupResult> {
Expand Down Expand Up @@ -349,7 +349,12 @@ export async function backup (
}
}

let retryCount = 0
const maxRetries = 5
while (true) {
if (canceled()) {
return { changed: 0, needRetrieveChunks: [] }
}
try {
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(ctx, domain, idx))
if (domain === DOMAIN_BLOB) {
Expand All @@ -360,6 +365,7 @@ export async function backup (

idx = currentChunk.idx
ops++
retryCount = 0 // Reset retry count on success

let needRetrieve: RetriavableChunks = new Map()

Expand Down Expand Up @@ -446,13 +452,19 @@ export async function backup (
break
}
} catch (err: any) {
ctx.error('failed to load chunks', { error: err })
retryCount++
ctx.error('failed to load chunks', { error: err, retryCount, maxRetries })
if (idx !== undefined) {
await ctx.with('closeChunk', {}, async () => {
await connection.closeChunk(ctx, idx as number)
})
}
// Try again
if (retryCount >= maxRetries) {
ctx.error('Max retries exceeded in loadChangesFromServer', { domain, retryCount })
throw new Error(`Max retries (${maxRetries}) exceeded while loading chunks for domain ${domain}`)
}
// Try again with delay
await new Promise<void>((resolve) => setTimeout(resolve, 1000 * retryCount))
idx = undefined
processed = 0
}
Expand Down Expand Up @@ -596,6 +608,8 @@ export async function backup (
} catch (err) {}

let lastSize = 0
const chunkRetryCount = new Map<RetriavableChunks, number>()
const maxChunkRetries = 3

while (needRetrieveChunks.length > 0) {
if (canceled()) {
Expand Down Expand Up @@ -659,8 +673,22 @@ export async function backup (
}
ops++
} catch (err: any) {
ctx.error('error loading docs', { domain, err, workspace: workspaceId })
// Put back.
const currentRetry = (chunkRetryCount.get(needRetrieve) ?? 0) + 1
chunkRetryCount.set(needRetrieve, currentRetry)
ctx.error('error loading docs', {
domain,
err,
workspace: workspaceId,
retry: currentRetry,
maxRetries: maxChunkRetries
})
if (currentRetry >= maxChunkRetries) {
ctx.error('Max retries exceeded for chunk, skipping', { domain, chunkSize: needRetrieve.size })
// Skip this chunk after max retries
continue
}
// Put back with delay.
await new Promise<void>((resolve) => setTimeout(resolve, 1000 * currentRetry))
needRetrieveChunks.push(needRetrieve)
continue
}
Expand Down Expand Up @@ -763,7 +791,7 @@ export async function backup (
const descrJson = JSON.stringify(d)

if (blob.size > options.blobDownloadLimit * 1024 * 1024) {
ctx.info('skip blob download, limit excheed', {
ctx.info('skip blob download, limit exceeded', {
blob: blob._id,
provider: blob.provider,
size: Math.round(blob.size / (1024 * 1024)),
Expand Down Expand Up @@ -1185,7 +1213,7 @@ export async function backup (
if (!canceled() && domainChanges > 0) {
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
backupInfo.migrations.forcedFullCheck = forcedFullCheck
backupInfo.migrations.forcedCompaction = forcedCompact
backupInfo.migrations.forcedCompact = forcedCompact
backupInfo.dataSize = result.dataSize
backupInfo.blobsSize = result.blobsSize
backupInfo.backupSize = result.backupSize
Expand Down
4 changes: 2 additions & 2 deletions server/backup/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export interface BackupConfig {
}

class BackupWorker {
downloadLimit: number = 5
downloadLimit: number = 2
workspacesToBackup = new Map<WorkspaceUuid, WorkspaceInfoWithStatus>()
rateLimiter: RateLimiter

Expand Down Expand Up @@ -336,7 +336,7 @@ class BackupWorker {
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
keepSnapshots: this.config.KeepSnapshots,
blobDownloadLimit: this.downloadLimit,
skipBlobContentTypes: ['video/', 'audio/', 'image/'],
skipBlobContentTypes: ['video/', 'audio/'],
fullVerify: this.fullCheck,
progress: (progress) => {
return notify?.(progress) ?? Promise.resolve()
Expand Down
15 changes: 14 additions & 1 deletion server/backup/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ export interface BackupSnapshot {
stIndex: number // Snapshot index
}

/**
* Known migration keys for backup info
* @public
*/
export interface BackupMigrations {
/** Flag indicating zero-size check has been performed */
zeroCheckSize?: boolean
/** Version of forced compaction migration */
forcedCompact?: string
/** Version of forced full check migration */
forcedFullCheck?: string
}

/**
* @public
*/
Expand All @@ -88,7 +101,7 @@ export interface BackupInfo {
// A hash of current domain transactions, so we could skip all other checks if same.
domainHashes: Record<Domain, string>

migrations: Record<string, boolean | string>
migrations: BackupMigrations
dataSize?: number
blobsSize?: number
backupSize?: number
Expand Down
Loading