Skip to content

Commit

Permalink
Merge pull request #5014 from zeeshanakram3/colossus_sync_rework
Browse files Browse the repository at this point in the history
[Colossus] Sync rework -
  • Loading branch information
mnaamani committed Dec 28, 2023
2 parents c4789f9 + 3c02240 commit 78f3e9a
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 284 deletions.
11 changes: 1 addition & 10 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,7 @@ async function runSyncWithInterval(
while (true) {
try {
logger.info(`Resume syncing....`)
await performSync(
api,
buckets,
syncWorkersNumber,
syncWorkersTimeout,
qnApi,
uploadsDirectory,
tempDirectory,
hostId
)
await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId)
logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
} catch (err) {
Expand Down
1 change: 0 additions & 1 deletion storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export default class FetchBucket extends Command {

try {
await performSync(
undefined,
[bucketId],
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) {

fragment DataObjectByBagIdsDetails on StorageDataObject {
id
ipfsHash
storageBagId
}

Expand Down
63 changes: 0 additions & 63 deletions storage-node/src/services/sync/remoteStorageData.ts

This file was deleted.

6 changes: 6 additions & 0 deletions storage-node/src/services/sync/storageObligations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type DataObject = {
* Assigned bag ID
*/
bagId: string

/**
* Data Object hash
*/
ipfsHash: string
}

/**
Expand Down Expand Up @@ -110,6 +115,7 @@ export async function getStorageObligationsFromRuntime(
dataObjects: assignedDataObjects.map((dataObject) => ({
id: dataObject.id,
bagId: dataObject.storageBagId,
ipfsHash: dataObject.ipfsHash,
})),
}

Expand Down
103 changes: 34 additions & 69 deletions storage-node/src/services/sync/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { ApiPromise } from '@polkadot/api'
import _ from 'lodash'
import { getDataObjectIDs } from '../../services/caching/localDataObjects'
import logger from '../../services/logger'
import { QueryNodeApi } from '../queryNode/api'
import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations'
import { DownloadFileTask, PrepareDownloadFileTask, SyncTask } from './tasks'
import { TaskProcessorSpawner, TaskSink, WorkingStack } from './workingProcess'
import { DownloadFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'

/**
* Temporary directory name for data uploading.
Expand All @@ -31,50 +30,43 @@ export const PendingDirName = 'pending'
* @param qnApi - Query Node API
* @param uploadDirectory - local directory to get file names from
* @param tempDirectory - local directory for temporary data uploading
* @param operatorUrl - (optional) defines the data source URL. If not set
* @param selectedOperatorUrl - (optional) defines the data source URL. If not set
* the source URL is resolved for each data object separately using the Query
* Node information about the storage providers.
*/
export async function performSync(
api: ApiPromise | undefined,
buckets: string[],
asyncWorkersNumber: number,
asyncWorkersTimeout: number,
qnApi: QueryNodeApi,
uploadDirectory: string,
tempDirectory: string,
hostId: string,
operatorUrl?: string
selectedOperatorUrl?: string
): Promise<void> {
logger.info('Started syncing...')
const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()])

const requiredIds = model.dataObjects.map((obj) => obj.id)
const required = model.dataObjects

const added = _.difference(requiredIds, files)
const removed = _.difference(files, requiredIds)
const added = _.differenceWith(required, files, (required, file) => required.id === file)
const removed = _.differenceWith(files, required, (file, required) => file === required.id)

logger.debug(`Sync - new objects: ${added.length}`)
logger.debug(`Sync - obsolete objects: ${removed.length}`)

const workingStack = new WorkingStack()

let addedTasks: SyncTask[]
if (operatorUrl === undefined) {
addedTasks = await getPrepareDownloadTasks(
api,
model,
buckets,
added,
uploadDirectory,
tempDirectory,
workingStack,
asyncWorkersTimeout,
hostId
)
} else {
addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory, asyncWorkersTimeout, hostId)
}
const addedTasks = await getDownloadTasks(
model,
buckets,
added,
uploadDirectory,
tempDirectory,
asyncWorkersTimeout,
hostId,
selectedOperatorUrl
)

logger.debug(`Sync - started processing...`)

Expand All @@ -87,28 +79,28 @@ export async function performSync(
}

/**
* Creates the download preparation tasks.
* Creates the download tasks.
*
* @param api - Runtime API promise
* @param ownBuckets - list of bucket ids operated this node
* @param dataObligations - defines the current data obligations for the node
* @param ownBuckets - list of bucket ids operated this node
* @param addedIds - data object IDs to download
* @param uploadDirectory - local directory for data uploading
* @param tempDirectory - local directory for temporary data uploading
* @param taskSink - a destination for the newly created tasks
* @param asyncWorkersTimeout - downloading asset timeout
* @param hostId - Random host UUID assigned to each node during bootstrap
* @param selectedOperatorUrl - operator URL selected for syncing objects
*/
async function getPrepareDownloadTasks(
api: ApiPromise | undefined,
async function getDownloadTasks(
dataObligations: DataObligations,
ownBuckets: string[],
addedIds: string[],
added: DataObligations['dataObjects'],
uploadDirectory: string,
tempDirectory: string,
taskSink: TaskSink,
asyncWorkersTimeout: number,
hostId: string
): Promise<PrepareDownloadFileTask[]> {
hostId: string,
selectedOperatorUrl?: string
): Promise<DownloadFileTask[]> {
const bagIdByDataObjectId = new Map()
for (const entry of dataObligations.dataObjects) {
bagIdByDataObjectId.set(entry.id, entry.bagId)
Expand Down Expand Up @@ -148,53 +140,26 @@ async function getPrepareDownloadTasks(
bagOperatorsUrlsById.set(entry.id, operatorUrls)
}

const tasks = addedIds.map((id) => {
const tasks = added.map((dataObject) => {
let operatorUrls: string[] = [] // can be empty after look up
let bagId = null
if (bagIdByDataObjectId.has(id)) {
bagId = bagIdByDataObjectId.get(id)
if (bagIdByDataObjectId.has(dataObject.id)) {
bagId = bagIdByDataObjectId.get(dataObject.id)
if (bagOperatorsUrlsById.has(bagId)) {
operatorUrls = bagOperatorsUrlsById.get(bagId)
}
}

return new PrepareDownloadFileTask(
operatorUrls,
hostId,
bagId,
id,
return new DownloadFileTask(
selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls,
dataObject.id,
dataObject.ipfsHash,
uploadDirectory,
tempDirectory,
taskSink,
asyncWorkersTimeout,
api
hostId
)
})

return tasks
}

/**
* Creates the download file tasks.
*
* @param operatorUrl - defines the data source URL.
* @param addedIds - data object IDs to download
* @param uploadDirectory - local directory for data uploading
* @param tempDirectory - local directory for temporary data uploading
* @param downloadTimeout - asset downloading timeout (in minutes)
*/
async function getDownloadTasks(
operatorUrl: string,
addedIds: string[],
uploadDirectory: string,
tempDirectory: string,
downloadTimeout: number,
hostId: string
): Promise<DownloadFileTask[]> {
const addedTasks = addedIds.map(
(fileName) =>
new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory, downloadTimeout, hostId)
)

return addedTasks
}
Loading

0 comments on commit 78f3e9a

Please sign in to comment.