Skip to content

Commit

Permalink
Merge pull request #4971 from zeeshanakram3/colossus_accept_pending_d…
Browse files Browse the repository at this point in the history
…ata_objects_enhancements

[Colossus] Accept pending data objects enhancements
  • Loading branch information
mnaamani committed Nov 30, 2023
2 parents 25408b2 + 426a235 commit c38cef0
Show file tree
Hide file tree
Showing 15 changed files with 516 additions and 89 deletions.
6 changes: 6 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 3.9.0

- Added new `AcceptPendingObjectsService` that is responsible for periodically sending batch `accept_pending_data_objects` for all the pending data objects. The `POST /files` endpoint now no longer calls the `accept_pending_data_objects` extrinsic for individual uploads, instead, it registers all the pending objects with `AcceptPendingObjectsService`
- Updated `/state/data` endpoint response headers to return data objects status too i.e. (`pending` or `accepted`)
- **FIX**: Increase the default timeout value in the `extrinsicWrapper` function to match the transaction validity in the transaction pool

### 3.8.1

- Hotfix: Fix call stack size exceeded when handling large number of initial object to sync.
Expand Down
2 changes: 1 addition & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "3.8.1",
"version": "3.9.0",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down
6 changes: 6 additions & 0 deletions storage-node/src/api-spec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ components:
tempDownloads:
type: integer
format: int64
pendingDirSize:
type: integer
format: int64
pendingObjects:
type: integer
format: int64
VersionResponse:
type: object
required:
Expand Down
52 changes: 37 additions & 15 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import { flags } from '@oclif/command'
import { createApp } from '../services/webApi/app'
import ApiCommandBase from '../command-base/ApiCommandBase'
import logger, { initNewLogger, DatePatternByFrequency, Frequency } from '../services/logger'
import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
import { ApiPromise } from '@polkadot/api'
import { performSync, TempDirName } from '../services/sync/synchronizer'
import sleep from 'sleep-promise'
import rimraf from 'rimraf'
import { KeyringPair } from '@polkadot/keyring/types'
import { Option } from '@polkadot/types-codec'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import fs from 'fs'
import _ from 'lodash'
import path from 'path'
import rimraf from 'rimraf'
import sleep from 'sleep-promise'
import { promisify } from 'util'
import ExitCodes from './../command-base/ExitCodes'
import fs from 'fs'
import { getStorageBucketIdsByWorkerId } from '../services/sync/storageObligations'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import { Option } from '@polkadot/types-codec'
import { QueryNodeApi } from '../services/queryNode/api'
import { KeyringPair } from '@polkadot/keyring/types'
import ApiCommandBase from '../command-base/ApiCommandBase'
import { customFlags } from '../command-base/CustomFlags'
import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
import logger, { DatePatternByFrequency, Frequency, initNewLogger } from '../services/logger'
import { QueryNodeApi } from '../services/queryNode/api'
import { AcceptPendingObjectsService } from '../services/sync/acceptPendingObjects'
import { getStorageBucketIdsByWorkerId } from '../services/sync/storageObligations'
import { PendingDirName, TempDirName, performSync } from '../services/sync/synchronizer'
import { createApp } from '../services/webApi/app'
import ExitCodes from './../command-base/ExitCodes'
const fsPromises = fs.promises

/**
Expand Down Expand Up @@ -127,6 +128,11 @@ Supported values: warn, error, debug, info. Default:debug`,
default: 'daily',
required: false,
}),
maxBatchTxSize: flags.integer({
description: 'Maximum number of `accept_pending_data_objects` in a batch transactions.',
default: 10,
required: false,
}),
...ApiCommandBase.flags,
}

Expand Down Expand Up @@ -198,13 +204,27 @@ Supported values: warn, error, debug, info. Default:debug`,
await recreateTempDirectory(flags.uploads, TempDirName)

if (fs.existsSync(flags.uploads)) {
await loadDataObjectIdCache(flags.uploads, TempDirName)
await loadDataObjectIdCache(flags.uploads, TempDirName, PendingDirName)
}

if (flags.dev) {
await this.ensureDevelopmentChain()
}

const pendingDataObjectsDir = path.join(flags.uploads, PendingDirName)

const acceptPendingObjectsService = await AcceptPendingObjectsService.create(
api,
qnApi,
workerId,
flags.uploads,
pendingDataObjectsDir,
bucketKeyPairs,
writableBuckets,
flags.maxBatchTxSize,
6000 // Every block
)

// Don't run sync job if no buckets selected, to prevent purging
// any assets.
if (flags.sync && selectedBuckets.length) {
Expand Down Expand Up @@ -244,6 +264,8 @@ Supported values: warn, error, debug, info. Default:debug`,
maxFileSize,
uploadsDir: flags.uploads,
tempFileUploadingDir,
pendingDataObjectsDir,
acceptPendingObjectsService,
process: this.config,
enableUploadingAuth,
downloadBuckets: selectedBuckets,
Expand Down
13 changes: 10 additions & 3 deletions storage-node/src/services/caching/localDataObjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@ export async function getDataObjectIDs(): Promise<string[]> {
* @param uploadDir - uploading directory
* @param tempDirName - temp directory name
*/
export async function loadDataObjectIdCache(uploadDir: string, tempDirName: string): Promise<void> {
export async function loadDataObjectIdCache(
uploadDir: string,
tempDirName: string,
pendingDirName: string
): Promise<void> {
await lock.acquireAsync()

const localIds = await getLocalFileNames(uploadDir)
// Filter temporary directory name.
// Filter temporary & pending directory name.
const tempDirectoryName = path.parse(tempDirName).name
const ids = localIds.filter((dataObjectId) => dataObjectId !== tempDirectoryName)
const pendingDirectoryName = path.parse(pendingDirName).name
const ids = localIds.filter(
(dataObjectId) => dataObjectId !== tempDirectoryName && dataObjectId !== pendingDirectoryName
)

idCache = new Set(ids)

Expand Down
66 changes: 46 additions & 20 deletions storage-node/src/services/queryNode/api.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,51 @@
import {
ApolloClient,
NormalizedCacheObject,
DocumentNode,
HttpLink,
defaultDataIdFromObject,
InMemoryCache,
DocumentNode,
split,
NormalizedCacheObject,
defaultDataIdFromObject,
from,
split,
} from '@apollo/client'
import { onError } from '@apollo/client/link/error'
import { WebSocketLink } from '@apollo/client/link/ws'
import { getMainDefinition } from '@apollo/client/utilities'
import fetch from 'cross-fetch'
import stringify from 'fast-safe-stringify'
import ws from 'ws'
import logger from '../logger'
import {
DataObjectDetailsFragment,
DataObjectsWithBagAndBucketsFragment,
GetBagConnection,
GetBagConnectionQuery,
GetBagConnectionQueryVariables,
GetDataObjectConnection,
GetDataObjectConnectionQuery,
GetDataObjectConnectionQueryVariables,
GetDataObjectsByIds,
GetDataObjectsByIdsQuery,
GetDataObjectsByIdsQueryVariables,
GetStorageBucketDetails,
GetStorageBucketDetailsQuery,
GetStorageBucketDetailsByWorkerId,
GetStorageBucketDetailsByWorkerIdQuery,
GetStorageBucketDetailsByWorkerIdQueryVariables,
GetStorageBucketDetailsQuery,
GetStorageBucketDetailsQueryVariables,
StorageBucketDetailsFragment,
StorageBagDetailsFragment,
DataObjectDetailsFragment,
GetDataObjectConnectionQuery,
GetDataObjectConnectionQueryVariables,
GetDataObjectConnection,
StorageBucketIdsFragment,
GetStorageBucketsConnection,
GetStorageBucketsConnectionQuery,
GetStorageBucketsConnectionQueryVariables,
GetStorageBucketDetailsByWorkerId,
QueryNodeState,
QueryNodeStateFields,
QueryNodeStateFieldsFragment,
QueryNodeStateSubscription,
QueryNodeStateSubscriptionVariables,
QueryNodeState,
QueryNodeStateFields,
StorageBagDetailsFragment,
StorageBucketDetailsFragment,
StorageBucketIdsFragment,
} from './generated/queries'
import { Maybe, StorageBagWhereInput } from './generated/schema'
import { WebSocketLink } from '@apollo/client/link/ws'
import { getMainDefinition } from '@apollo/client/utilities'
import ws from 'ws'
import logger from '../logger'
import stringify from 'fast-safe-stringify'

/**
* Defines query paging limits.
Expand Down Expand Up @@ -314,6 +318,28 @@ export class QueryNodeApi {
return fullResult
}

/**
* Returns data objects info by IDs.
*
* @param bagIds - query filter: data object IDs
*/
public async getDataObjectsByIds(ids: string[]): Promise<Array<DataObjectsWithBagAndBucketsFragment>> {
const allIds = [...ids] // Copy to avoid modifying the original array
const fullResult: DataObjectsWithBagAndBucketsFragment[] = []
while (allIds.length) {
const idsBatch = allIds.splice(0, 1000)
fullResult.push(
...((await this.multipleEntitiesQuery<GetDataObjectsByIdsQuery, GetDataObjectsByIdsQueryVariables>(
GetDataObjectsByIds,
{ limit: MAX_RESULTS_PER_QUERY, ids: idsBatch },
'storageDataObjects'
)) || [])
)
}

return fullResult
}

/**
* Returns storage bucket IDs.
*
Expand Down
17 changes: 17 additions & 0 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ query getDataObjectConnection($bagIds: StorageBagWhereInput, $limit: Int, $curso
}
}

fragment DataObjectsWithBagAndBuckets on StorageDataObject {
id
isAccepted
storageBag {
id
storageBuckets {
id
}
}
}

query getDataObjectsByIds($ids: [ID!], $limit: Int) {
storageDataObjects(where: { id_in: $ids }, limit: $limit) {
...DataObjectsWithBagAndBuckets
}
}

fragment QueryNodeStateFields on ProcessorState {
chainHead
lastCompleteBlock
Expand Down
64 changes: 63 additions & 1 deletion storage-node/src/services/runtime/extrinsics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { PalletStorageBagIdType as BagId, PalletStorageDynamicBagType as Dynamic
import BN from 'bn.js'
import { timeout } from 'promise-timeout'
import logger from '../../services/logger'
import { parseBagId } from '../helpers/bagTypes'
import { AcceptPendingDataObjectsParams } from '../sync/acceptPendingObjects'
import { formatDispatchError, getEvent, getEvents, sendAndFollowNamedTx } from './api'

/**
Expand Down Expand Up @@ -129,6 +131,64 @@ export async function updateStorageBucketsForBags(
return [success, failedCalls]
}

/**
* Accepts pending data objects by storage provider in batch transaction.
*
* @remarks
* It sends an batch extrinsic to the runtime.
*
* @param api - runtime API promise
* @param workerId - runtime storage provider ID (worker ID)
* @param acceptPendingDataObjectsParams - acceptPendingDataObject extrinsic parameters
* @returns promise with a list of failedCalls.
*/
export async function acceptPendingDataObjectsBatch(
api: ApiPromise,
workerId: number,
acceptPendingDataObjectsParams: AcceptPendingDataObjectsParams[]
): Promise<string[]> {
// a list of failed data objects
const failedDataObjects: string[] = []

const txsByTransactorAccount = acceptPendingDataObjectsParams.map(({ account, storageBucket }) => {
const txs = storageBucket.bags.map((bag) =>
api.tx.storage.acceptPendingDataObjects(
workerId,
storageBucket.id,
parseBagId(bag.id),
api.createType('BTreeSet<u64>', bag.dataObjects)
)
)

return [account, txs, storageBucket.bags] as const
})

for (const [account, txs, bags] of txsByTransactorAccount) {
const txBatch = api.tx.utility.forceBatch(txs)

const success = await extrinsicWrapper(async () => {
await sendAndFollowNamedTx(api, account, txBatch, (result) => {
// Process individual ItemFailed events
const events = getEvents(result, 'utility', ['ItemCompleted', 'ItemFailed'])
events.forEach((e, i) => {
if (e.method === 'ItemFailed') {
failedDataObjects.push(...bags[i].dataObjects.toString())
}
})
})
})

if (!success) {
// If the batch transaction failed, push all data objects to failed list
bags.forEach((bag) => {
failedDataObjects.push(...bag.dataObjects.toString())
})
}
}

return failedDataObjects
}

/**
* Accepts pending data objects by storage provider.
*
Expand Down Expand Up @@ -263,7 +323,9 @@ export async function inviteStorageBucketOperator(
async function extrinsicWrapper(
extrinsic: () => Promise<void>,
throwErr = false,
timeoutMs = 25000 // 25s - default extrinsic timeout
// 5 mins - based on the default transactions validity of Substrate based chains with
// 6s block time: https://polkadot.js.org/docs/api/FAQ/#how-long-do-transactions-live
timeoutMs = 300_000
): Promise<boolean> {
try {
await timeout(extrinsic(), timeoutMs)
Expand Down
Loading

0 comments on commit c38cef0

Please sign in to comment.