Skip to content

Commit

Permalink
Start to fix content state on creator node (#895)
Browse files Browse the repository at this point in the history
* new route

* Recover missing files via file system v0 working

* this works...but not great

* fix weird depednency issue

* more stuff working

* working end to end

* use cached value for all cnodes

* comments and validations

* bug fixes

* comments

* move the fetchCIDInNetwork location

* lint

* fix dirCID case

* move back to fetching and caching cnodes in 30 minute intervals
  • Loading branch information
dmanjunath committed Oct 13, 2020
1 parent 86a67dc commit 1a0a0c1
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 22 deletions.
25 changes: 5 additions & 20 deletions creator-node/src/fileManager.js
Expand Up @@ -156,16 +156,8 @@ async function saveFileForMultihash (req, multihash, expectedStoragePath, gatewa
req.logger.debug(`saveFileForMultihash - retrieved file for multihash ${multihash} from local ipfs node`)

// Write file to disk.
const destinationStream = fs.createWriteStream(expectedStoragePath)
fileBL.pipe(destinationStream)
await new Promise((resolve, reject) => {
destinationStream.on('finish', () => {
fileFound = true
resolve()
})
destinationStream.on('error', err => { reject(err) })
fileBL.on('error', err => { destinationStream.end(); reject(err) })
})
await Utils.writeStreamToFileSystem(fileBL, expectedStoragePath)
fileFound = true
req.logger.info(`saveFileForMultihash - wrote file to ${expectedStoragePath}, obtained via ipfs get`)
} catch (e) {
req.logger.info(`saveFileForMultihash - Failed to retrieve file for multihash ${multihash} from IPFS ${e.message}`)
Expand Down Expand Up @@ -203,16 +195,9 @@ async function saveFileForMultihash (req, multihash, expectedStoragePath, gatewa
throw new Error(`Couldn't find files on other creator nodes`)
}

const destinationStream = fs.createWriteStream(expectedStoragePath)
response.data.pipe(destinationStream)
await new Promise((resolve, reject) => {
destinationStream.on('finish', () => {
fileFound = true
resolve()
})
destinationStream.on('error', err => { reject(err) })
response.data.on('error', err => { destinationStream.end(); reject(err) })
})
// Write file to disk
await Utils.writeStreamToFileSystem(response.data, expectedStoragePath)
fileFound = true

req.logger.info(`saveFileForMultihash - wrote file to ${expectedStoragePath}`)
} catch (e) {
Expand Down
87 changes: 85 additions & 2 deletions creator-node/src/routes/files.js
@@ -1,5 +1,6 @@
const Redis = require('ioredis')
const fs = require('fs')
const path = require('path')
var contentDisposition = require('content-disposition')

const { getRequestRange, formatContentRange } = require('../utils/requestRange')
Expand All @@ -12,18 +13,25 @@ const {
errorResponseServerError,
errorResponseNotFound,
errorResponseForbidden,
errorResponseRangeNotSatisfiable
errorResponseRangeNotSatisfiable,
errorResponseUnauthorized
} = require('../apiHelpers')
const { recoverWallet } = require('../apiSigning')

const models = require('../models')
const config = require('../config.js')
const redisClient = new Redis(config.get('redisPort'), config.get('redisHost'))
const { authMiddleware, syncLockMiddleware, triggerSecondarySyncs } = require('../middlewares')
const { getIPFSPeerId, ipfsSingleByteCat, ipfsStat } = require('../utils')
const { getIPFSPeerId, ipfsSingleByteCat, ipfsStat, getAllRegisteredCNodes, findCIDInNetwork } = require('../utils')
const ImageProcessingQueue = require('../ImageProcessingQueue')
const RehydrateIpfsQueue = require('../RehydrateIpfsQueue')
const DBManager = require('../dbManager')

// regex to validate storagePath format passed in for /file_lookup route
// this will either be of the format /file_storage/<cid> for a file or /file_storage/<cid1>/<cid2> for an dir image
// there are two named match groups, outer and inner. outer is for file or dirname for dir image. inner is only image cid in dir
const FILE_SYSTEM_REGEX = /\/file_storage\/(?<outer>Qm[a-zA-Z0-9]{44})\/?(?<inner>Qm[a-zA-Z0-9]{44})?/

/**
* Helper method to stream file from file system on creator node
* Serves partial content using range requests
Expand Down Expand Up @@ -127,6 +135,15 @@ const getCID = async (req, res) => {
return await streamFromFileSystem(req, res, queryResults.storagePath)
} catch (e) {
req.logger.info(`Failed to retrieve ${queryResults.storagePath} from FS`)

// ugly nested try/catch but don't want findCIDInNetwork to stop execution of the rest of the route
try {
const libs = req.app.get('audiusLibs')
// notice this is not await-ed
findCIDInNetwork(queryResults.storagePath, CID, req.logger, libs)
} catch (e) {
req.logger.error(`Error calling findCIDInNetwork for path ${queryResults.storagePath}`, e)
}
}

try {
Expand Down Expand Up @@ -220,6 +237,17 @@ const getDirCID = async (req, res) => {
return await streamFromFileSystem(req, res, queryResults.storagePath)
} catch (e) {
req.logger.info(`Failed to retrieve ${queryResults.storagePath} from FS`)

// ugly nested try/catch but don't want findCIDInNetwork to stop execution of the rest of the route
try {
// CID is the file CID, parse it from the storagePath
const CID = queryResults.storagePath.split('/').slice(-1).join('')
const libs = req.app.get('audiusLibs')
// notice this is not await-ed
findCIDInNetwork(queryResults.storagePath, CID, req.logger, libs)
} catch (e) {
req.logger.error(`Error calling findCIDInNetwork for path ${queryResults.storagePath}`, e)
}
}

try {
Expand Down Expand Up @@ -368,6 +396,61 @@ module.exports = function (app) {
* TODO: It seems like handleResponse does work with piped responses, as seen from the track/stream endpoint.
*/
app.get('/ipfs/:dirCID/:filename', getDirCID)

/**
* Serve file from FS given a storage path
* This is a cnode-cnode only route, not to be consumed by clients. It has auth restrictions to only
* allow calls from cnodes with delegateWallets registered on chain
* @dev No handleResponse around this route because it doesn't play well with our route handling abstractions,
* same as the /ipfs route
* @param req.query.filePath the fs path for the file. should be full path including leading /file_storage
* @param req.query.delegateWallet the wallet address that signed this request
* @param req.query.timestamp the timestamp when the request was made
* @param req.query.signature the hashed signature of the object {filePath, delegateWallet, timestamp}
*/
app.get('/file_lookup', async (req, res) => {
const { filePath, timestamp, signature } = req.query
let { delegateWallet } = req.query
delegateWallet = delegateWallet.toLowerCase()

// no filePath passed in
if (!filePath) return sendResponse(req, res, errorResponseBadRequest(`Invalid request, no path provided`))

// check that signature is correct and delegateWallet is registered on chain
const recoveredWallet = recoverWallet({ filePath, delegateWallet, timestamp }, signature).toLowerCase()
const libs = req.app.get('audiusLibs')
const creatorNodes = await getAllRegisteredCNodes(libs)
const foundDelegateWallet = creatorNodes.some(node => node.delegateOwnerWallet.toLowerCase() === recoveredWallet)
if ((recoveredWallet !== delegateWallet) || !foundDelegateWallet) {
return sendResponse(req, res, errorResponseUnauthorized(`Invalid wallet signature`))
}
const filePathNormalized = path.normalize(filePath)

// check that the regex works and verify it's not blacklisted
const match = FILE_SYSTEM_REGEX.exec(filePathNormalized)
if (!match) return sendResponse(req, res, errorResponseBadRequest(`Invalid filePathNormalized provided`))

const { outer, inner } = match.groups
if (await req.app.get('blacklistManager').CIDIsInBlacklist(outer)) {
return sendResponse(req, res, errorResponseForbidden(`CID ${outer} has been blacklisted by this node.`))
}
res.setHeader('Content-Disposition', contentDisposition(outer))

// inner will only be set for image dir CID
// if there's an inner CID, check if CID is blacklisted and set content disposition header
if (inner) {
if (await req.app.get('blacklistManager').CIDIsInBlacklist(inner)) {
return sendResponse(req, res, errorResponseForbidden(`CID ${inner} has been blacklisted by this node.`))
}
res.setHeader('Content-Disposition', contentDisposition(inner))
}

try {
return await streamFromFileSystem(req, res, filePathNormalized)
} catch (e) {
return sendResponse(req, res, errorResponseNotFound(`File with path not found`))
}
})
}

module.exports.getCID = getCID
109 changes: 109 additions & 0 deletions creator-node/src/utils.js
@@ -1,10 +1,16 @@
const { recoverPersonalSignature } = require('eth-sig-util')
const fs = require('fs')
const { BufferListStream } = require('bl')
const axios = require('axios')

const { logger: genericLogger } = require('./logging')
const models = require('./models')
const { ipfs, ipfsLatest } = require('./ipfsClient')
const redis = require('./redis')

const config = require('./config')
const BlacklistManager = require('./blacklistManager')
const { generateTimestampAndSignature } = require('./apiSigning')

class Utils {
static verifySignature (data, sig) {
Expand Down Expand Up @@ -196,6 +202,94 @@ const ipfsGet = (path, req, timeout = 1000) => new Promise(async (resolve, rejec
}
})

async function findCIDInNetwork (filePath, cid, logger, libs) {
const attemptedStateFix = await getIfAttemptedStateFix(filePath)
if (attemptedStateFix) return

// get list of creator nodes
const creatorNodes = await getAllRegisteredCNodes(libs)
if (!creatorNodes.length) return

// generate signature
const delegateWallet = config.get('delegateOwnerWallet').toLowerCase()
const { signature, timestamp } = generateTimestampAndSignature({ filePath, delegateWallet }, config.get('delegatePrivateKey'))
let node

for (let index = 0; index < creatorNodes.length; index++) {
node = creatorNodes[index]
try {
const resp = await axios({
method: 'get',
url: `${node.endpoint}/file_lookup`,
params: {
filePath,
timestamp,
delegateWallet,
signature
},
responseType: 'stream',
timeout: 1000
})
if (resp.data) {
await writeStreamToFileSystem(resp.data, filePath)

// verify that the file written matches the hash expected if added to ipfs
const content = fs.createReadStream(filePath)
for await (const result of ipfsLatest.add(content, { onlyHash: true, timeout: 2000 })) {
if (cid !== result.cid.toString()) {
await fs.unlink(filePath)
logger.error(`File contents don't match IPFS hash cid: ${cid} result: ${result.cid.toString()}`)
}
}

logger.info(`findCIDInNetwork - successfully fetched file ${filePath} from node ${node.endpoint}`)
break
}
} catch (e) {
// since this is a function running in the background intended to fix state, don't error
// and stop the flow of execution for functions that call it
continue
}
}
}

/**
* Get all creator nodes registered on chain from a cached redis value
*/
async function getAllRegisteredCNodes (libs) {
const cacheKey = 'all_registered_cnodes'

try {
const cnodesList = await redis.get(cacheKey)
if (cnodesList) {
return JSON.parse(cnodesList)
}

let creatorNodes = (await libs.ethContracts.ServiceProviderFactoryClient.getServiceProviderList('creator-node'))
creatorNodes = creatorNodes.filter(node => node.endpoint !== config.get('creatorNodeEndpoint'))
redis.set(cacheKey, JSON.stringify(creatorNodes), 'EX', 60 * 30) // cache this for 30 minutes
return creatorNodes
} catch (e) {
console.error('Error getting values in getAllRegisteredCNodes', e)
}
return []
}

/**
* Return if a fix has already been attempted in today for this filePath
* @param {String} filePath path of CID on the file system
*/
async function getIfAttemptedStateFix (filePath) {
// key is `attempted_fs_fixes:<today's date>`
// the date function just generates the ISOString and removes the timestamp component
const key = `attempted_fs_fixes:${new Date().toISOString().split('T')[0]}`
const firstTime = await redis.sadd(key, filePath)
await redis.expire(key, 60 * 60 * 24) // expire one day after final write

// if firstTime is 1, it's a new key. existing key returns 0
return !firstTime
}

async function rehydrateIpfsFromFsIfNecessary (multihash, storagePath, logContext, filename = null) {
const logger = genericLogger.child(logContext)

Expand Down Expand Up @@ -325,6 +419,18 @@ async function rehydrateIpfsDirFromFsIfNecessary (dirHash, logContext) {
}
}

async function writeStreamToFileSystem (stream, expectedStoragePath) {
const destinationStream = fs.createWriteStream(expectedStoragePath)
stream.pipe(destinationStream)
return new Promise((resolve, reject) => {
destinationStream.on('finish', () => {
resolve()
})
destinationStream.on('error', err => { reject(err) })
stream.on('error', err => { destinationStream.end(); reject(err) })
})
}

module.exports = Utils
module.exports.getFileUUIDForImageCID = getFileUUIDForImageCID
module.exports.getIPFSPeerId = getIPFSPeerId
Expand All @@ -334,3 +440,6 @@ module.exports.ipfsSingleByteCat = ipfsSingleByteCat
module.exports.ipfsCat = ipfsCat
module.exports.ipfsGet = ipfsGet
module.exports.ipfsStat = ipfsStat
module.exports.writeStreamToFileSystem = writeStreamToFileSystem
module.exports.getAllRegisteredCNodes = getAllRegisteredCNodes
module.exports.findCIDInNetwork = findCIDInNetwork

0 comments on commit 1a0a0c1

Please sign in to comment.