Skip to content

Commit

Permalink
Improve sync-export logging + add export req timeout (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
SidSethi committed Oct 27, 2020
1 parent e0274b6 commit 4bed969
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions creator-node/src/routes/nodeSync.js
Expand Up @@ -24,9 +24,12 @@ module.exports = function (app) {
* }
*/
app.get('/export', handleResponse(async (req, res) => {
const start = Date.now()

// TODO - allow for offsets in the /export
const walletPublicKeys = req.query.wallet_public_key // array
const dbOnlySync = (req.query.db_only_sync === true || req.query.db_only_sync === 'true')
const sourceEndpoint = req.query.source_endpoint || '' // string

const MaxClock = 25000

Expand Down Expand Up @@ -104,7 +107,7 @@ module.exports = function (app) {
// there's more data to pull
if (cnodeUser.clock > MaxClock) {
// since clockRecords are returned by clock ASC, clock val at last index is largest clock val
console.log('nodeSync.js#export - cnode user clock value is higher than MaxClock, resetting', clockRecords[clockRecords.length - 1].clock)
req.logger.info('nodeSync.js#export - cnode user clock value is higher than MaxClock, resetting', clockRecords[clockRecords.length - 1].clock)
cnodeUser.clock = clockRecords[clockRecords.length - 1].clock
}
})
Expand Down Expand Up @@ -150,9 +153,10 @@ module.exports = function (app) {
}
}

req.logger.info('Successful export for wallets', walletPublicKeys, `to source endpoint ${sourceEndpoint} || route duration ${Date.now() - start} ms`)
return successResponse({ cnodeUsers: cnodeUsersDict, ipfsIDObj })
} catch (e) {
console.error('Error in /export', e)
req.logger.error('Error in /export for wallets', walletPublicKeys, `to source endpoint ${sourceEndpoint} || route duration ${Date.now() - start} ms ||`, e)
await transaction.rollback()
return errorResponseServerError(e.message)
}
Expand Down Expand Up @@ -245,15 +249,26 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
}

try {
// Fetch data export from creatorNodeEndpoint for given walletPublicKeys.
// Fetch data export from creatorNodeEndpoint for given walletPublicKeys
const exportQueryParams = {
wallet_public_key: walletPublicKeys,
db_only_sync: dbOnlySync
}
if (config.get('creatorNodeEndpoint')) exportQueryParams.source_endpoint = config.get('creatorNodeEndpoint')

const resp = await axios({
method: 'get',
baseURL: creatorNodeEndpoint,
url: '/export',
params: { wallet_public_key: walletPublicKeys, db_only_sync: dbOnlySync },
responseType: 'json'
params: exportQueryParams,
responseType: 'json',
/** @notice - this request timeout is arbitrarily large for now until we find an appropriate value */
timeout: 300000 /* 5m = 300000ms */
})
if (resp.status !== 200) throw new Error(resp.data['error'])
if (resp.status !== 200) {
req.logger.error(redisKey, `Failed to retrieve export from ${creatorNodeEndpoint} for wallets`, walletPublicKeys)
throw new Error(resp.data['error'])
}
// TODO - explain patch
if (!resp.data) {
if (resp.request && resp.request.responseText) {
Expand All @@ -263,6 +278,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
if (!resp.data.hasOwnProperty('cnodeUsers') || !resp.data.hasOwnProperty('ipfsIDObj') || !resp.data.ipfsIDObj.hasOwnProperty('addresses')) {
throw new Error(`Malformed response from ${creatorNodeEndpoint}.`)
}
req.logger.info(redisKey, `Successful export from ${creatorNodeEndpoint} for wallets`, walletPublicKeys)

if (!dbOnlySync) {
// Attempt to connect directly to target CNode's IPFS node.
Expand Down Expand Up @@ -294,7 +310,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
// Spread + set uniq's the array
userReplicaSet = [...new Set(userReplicaSet)]
} catch (e) {
req.logger.error(`Couldn't get user's replica sets, can't use cnode gateways in saveFileForMultihash`)
req.logger.error(redisKey, `Couldn't get user's replica sets, can't use cnode gateways in saveFileForMultihash`)
}
}

Expand Down Expand Up @@ -325,7 +341,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
fetched latestClockVal: ${fetchedLatestClockVal}, self latestClockVal: ${latestClockValue}`)
} else if (latestClockValue === fetchedLatestClockVal) {
// Already to update, no sync necessary
req.logger.info(`User ${fetchedWalletPublicKey} already up to date! fetchedLatestClockVal=${fetchedLatestClockVal}, latestClockValue=${latestClockValue}`)
req.logger.info(redisKey, `User ${fetchedWalletPublicKey} already up to date! fetchedLatestClockVal=${fetchedLatestClockVal}, latestClockValue=${latestClockValue}`)
// the transaction declared outside the try/catch needs to be closed. if we call the continue
// and do not end the tx, it will never be closed
transaction.rollback()
Expand Down Expand Up @@ -418,7 +434,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
// Save all track files to disk in batches (to limit concurrent load)
for (let i = 0; i < trackFiles.length; i += TrackSaveConcurrencyLimit) {
const trackFilesSlice = trackFiles.slice(i, i + TrackSaveConcurrencyLimit)
req.logger.info(`TrackFiles saveFileForMultihash - processing trackFiles ${i} to ${i + TrackSaveConcurrencyLimit}...`)
req.logger.info(redisKey, `TrackFiles saveFileForMultihash - processing trackFiles ${i} to ${i + TrackSaveConcurrencyLimit}...`)
await Promise.all(trackFilesSlice.map(
trackFile => saveFileForMultihash(req, trackFile.multihash, trackFile.storagePath, userReplicaSet)
))
Expand All @@ -428,7 +444,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
// Save all non-track files to disk in batches (to limit concurrent load)
for (let i = 0; i < nonTrackFiles.length; i += NonTrackFileSaveConcurrencyLimit) {
const nonTrackFilesSlice = nonTrackFiles.slice(i, i + NonTrackFileSaveConcurrencyLimit)
req.logger.info(`NonTrackFiles saveFileForMultihash - processing files ${i} to ${i + NonTrackFileSaveConcurrencyLimit}...`)
req.logger.info(redisKey, `NonTrackFiles saveFileForMultihash - processing files ${i} to ${i + NonTrackFileSaveConcurrencyLimit}...`)
await Promise.all(nonTrackFilesSlice.map(
nonTrackFile => {
// Skip over directories since there's no actual content to sync
Expand All @@ -445,7 +461,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
}
))
}
req.logger.info('Saved all non-track files to disk.')
req.logger.info(redisKey, 'Saved all non-track files to disk.')
}

await models.File.bulkCreate(nonTrackFiles.map(file => ({
Expand All @@ -466,13 +482,13 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
...trackFile,
cnodeUserUUID: fetchedCnodeUserUUID
})), { transaction })
req.logger.info('saved all track files to db')
req.logger.info(redisKey, 'saved all track files to db')

await models.AudiusUser.bulkCreate(fetchedCNodeUser.audiusUsers.map(audiusUser => ({
...audiusUser,
cnodeUserUUID: fetchedCnodeUserUUID
})), { transaction })
req.logger.info('saved all audiususer data to db')
req.logger.info(redisKey, 'saved all audiususer data to db')

await transaction.commit()
req.logger.info(redisKey, `Transaction successfully committed for cnodeUserUUID ${fetchedCnodeUserUUID}`)
Expand All @@ -487,7 +503,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
}
}
} catch (e) {
req.logger.error('Sync Error', e)
req.logger.error(redisKey, 'Sync Error for wallets ', walletPublicKeys, `|| from endpoint ${creatorNodeEndpoint} ||`, e)
errorObj = e
} finally {
// Release all redis locks
Expand All @@ -496,7 +512,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync
await redisLock.removeLock(redisKey)
delete (syncQueue[wallet])
}
req.logger.info(`DURATION SYNC ${Date.now() - start}`)
req.logger.info(redisKey, `DURATION SYNC ${Date.now() - start}`)
}

return errorObj
Expand Down

0 comments on commit 4bed969

Please sign in to comment.