Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dev/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,8 @@ export function devTool (
.command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.action(async (cmd: { workspace: string, blobLimit: string }) => {
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => {
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
await withStorage(mongodbUri, async (adapter) => {
Expand All @@ -1010,7 +1011,10 @@ export function devTool (
}

const wsId = getWorkspaceId(workspace.workspace)
await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit))
await moveFiles(toolCtx, wsId, exAdapter, {
blobSizeLimitMb: parseInt(cmd.blobLimit),
concurrency: parseInt(cmd.concurrency)
})
}
} catch (err: any) {
console.error(err)
Expand Down
49 changes: 32 additions & 17 deletions dev/tool/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
// limitations under the License.
//

import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { type StorageAdapterEx } from '@hcengineering/server-core'
import { PassThrough } from 'stream'

export async function moveFiles (
ctx: MeasureContext,
workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx,
blobSizeLimitMb: number
params: {
blobSizeLimitMb: number
concurrency: number
}
): Promise<void> {
if (exAdapter.adapters === undefined) return

Expand All @@ -35,7 +38,11 @@ export async function moveFiles (

for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === target) continue
console.log('moving from', name)
console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)

let time = Date.now()

const rateLimiter = new RateLimiter(params.concurrency)

const iterator = await adapter.listStream(ctx, workspaceId)
while (true) {
Expand All @@ -46,29 +53,37 @@ export async function moveFiles (
if (blob === undefined) continue
if (blob.provider === target) continue

if (blob.size > blobSizeLimitMb * 1024 * 1024) {
if (blob.size > params.blobSizeLimitMb * 1024 * 1024) {
console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024))
continue
}

try {
await retryOnFailure(
ctx,
5,
async () => {
await moveFile(ctx, exAdapter, workspaceId, blob)
},
50
)
} catch (err) {
console.error('failed to process blob', name, data._id, err)
}
await rateLimiter.exec(async () => {
try {
await retryOnFailure(
ctx,
5,
async () => {
await moveFile(ctx, exAdapter, workspaceId, blob)
},
50
)
} catch (err) {
console.error('failed to process blob', name, data._id, err)
}
})

count += 1
if (count % 100 === 0) {
console.log('...moved: ', count)
await rateLimiter.waitProcessing()
const duration = Date.now() - time
time = Date.now()
console.log('...moved: ', count, Math.round(duration / 1000))
}
}

await rateLimiter.waitProcessing()

await iterator.close()
}

Expand Down
61 changes: 28 additions & 33 deletions models/core/src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.
//

import { saveCollaborativeDoc, takeCollaborativeDocSnapshot } from '@hcengineering/collaboration'
import { saveCollaborativeDoc } from '@hcengineering/collaboration'
import core, {
DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_STATUS,
DOMAIN_TX,
MeasureMetricsContext,
RateLimiter,
collaborativeDocParse,
coreId,
generateId,
Expand Down Expand Up @@ -188,7 +189,10 @@ async function processMigrateContentFor (
storageAdapter: StorageAdapter,
iterator: MigrationIterator<Doc>
): Promise<void> {
const rateLimiter = new RateLimiter(10)

let processed = 0

while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
Expand All @@ -201,45 +205,36 @@ async function processMigrateContentFor (
const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = []

for (const doc of docs) {
const update: MigrateUpdate<Doc> = {}
await rateLimiter.exec(async () => {
const update: MigrateUpdate<Doc> = {}

for (const attribute of attributes) {
const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
for (const attribute of attributes) {
const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId)

const value = (doc as any)[attribute.name] as string
if (value != null && value.startsWith('{')) {
const { documentId } = collaborativeDocParse(collaborativeDoc)
const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId)
// only for documents not in storage
if (blob === undefined) {
const ydoc = markupToYDoc(value, attribute.name)
await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx)
await takeCollaborativeDocSnapshot(
storageAdapter,
client.workspaceId,
collaborativeDoc,
ydoc,
{
versionId: revisionId,
name: 'Migration to storage',
createdBy: core.account.System,
createdOn: Date.now()
},
ctx
)
}
const value = (doc as any)[attribute.name] as string
if (value != null && value.startsWith('{')) {
const { documentId } = collaborativeDocParse(collaborativeDoc)
const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId)
// only for documents not in storage
if (blob === undefined) {
const ydoc = markupToYDoc(value, attribute.name)
await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx)
}

update[attribute.name] = collaborativeDoc
} else if (value == null) {
update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
update[attribute.name] = collaborativeDoc
} else if (value == null) {
update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
}
}
}

if (Object.keys(update).length > 0) {
operations.push({ filter: { _id: doc._id }, update })
}
if (Object.keys(update).length > 0) {
operations.push({ filter: { _id: doc._id }, update })
}
})
}

await rateLimiter.waitProcessing()

if (operations.length > 0) {
await client.bulk(domain, operations)
}
Expand Down