Skip to content

Commit

Permalink
[AUD-391] In content-node track stream, add a fallback to enable stre…
Browse files Browse the repository at this point in the history
…aming if track association didn't complete (#1284)

* Should be working minus redis cache

* add redis cache

* lint

* wrap redis cache in conditional

* Code review comments

* Code review comments
  • Loading branch information
dmanjunath committed Mar 9, 2021
1 parent f3fdb8b commit 86a5d86
Showing 1 changed file with 82 additions and 14 deletions.
96 changes: 82 additions & 14 deletions creator-node/src/routes/tracks.js
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ module.exports = function (app) {
**/
app.get('/tracks/stream/:encodedId', async (req, res, next) => {
const libs = req.app.get('audiusLibs')
const redisClient = req.app.get('redisClient')
const delegateOwnerWallet = config.get('delegateOwnerWallet')

const encodedId = req.params.encodedId
Expand All @@ -550,26 +551,93 @@ module.exports = function (app) {
return sendResponse(req, res, errorResponseBadRequest(`Invalid ID: ${encodedId}`))
}

const { blockchainId: blockchainIdFromTrack } = await models.Track.findOne({
attributes: ['blockchainId'],
where: { blockchainId },
order: [['clock', 'DESC']]
})

if (!blockchainIdFromTrack) {
return sendResponse(req, res, errorResponseBadRequest(`No track found for blockchainId ${blockchainId}`))
}

const { multihash } = await models.File.findOne({
let fileRecord = await models.File.findOne({
attributes: ['multihash'],
where: {
type: 'copy320',
trackBlockchainId: blockchainIdFromTrack
trackBlockchainId: blockchainId
},
order: [['clock', 'DESC']]
})

if (!multihash) {
if (!fileRecord) {
try {
// see if there's a fileRecord in redis so we can short circuit all this logic
let redisFileRecord = await redisClient.get(`streamFallback:::${blockchainId}`)
if (redisFileRecord) {
redisFileRecord = JSON.parse(redisFileRecord)
if (redisFileRecord && redisFileRecord.multihash) {
fileRecord = redisFileRecord
}
}
} catch (e) {
req.logger.error(`Error looking for stream fallback in redis`, e)
}
}

// if track didn't finish the upload process and was never associated, there may not be a trackBlockchainId for the File records,
// try to fall back to discovery to fetch the metadata multihash and see if you can deduce the copy320 file
if (!fileRecord) {
try {
let trackRecord = await libs.Track.getTracks(1, 0, [blockchainId])
if (!trackRecord || trackRecord.length === 0 || !trackRecord[0].hasOwnProperty('blocknumber')) {
return sendResponse(req, res, errorResponseServerError('Missing or malformatted track fetched from discovery node.'))
}

trackRecord = trackRecord[0]

// query the files table for a metadata multihash from discovery for a given track
// no need to add CNodeUserUUID to the filter because the track is associated with a user and that contains the
// user_id inside it which is unique to the user
const file = await models.File.findOne({ where: { multihash: trackRecord.metadata_multihash, type: 'metadata' } })
if (!file) {
return sendResponse(req, res, errorResponseServerError('Missing or malformatted track fetched from discovery node.'))
}

// make sure all track segments have the same sourceFile
const segments = trackRecord.track_segments.map(segment => segment.multihash)

let fileSegmentRecords = await models.File.findAll({
attributes: ['sourceFile'],
where: {
multihash: segments,
cnodeUserUUID: file.cnodeUserUUID
},
raw: true
})

// check that the number of files in the Files table for these segments for this user matches the number of segments from the metadata object
if (fileSegmentRecords.length !== trackRecord.track_segments.length) {
req.logger.warning(`Track stream content mismatch for blockchainId ${blockchainId} - number of segments don't match between local and discovery`)
}

// check that there's a single sourceFile that all File records share by getting an array of uniques
const uniqSourceFiles = fileSegmentRecords.map(record => record.sourceFile).filter((v, i, a) => a.indexOf(v) === i)

if (uniqSourceFiles.length !== 1) {
req.logger.warning(`Track stream content mismatch for blockchainId ${blockchainId} - there's not one sourceFile that matches all segments`)
}

// search for the copy320 record based on the sourceFile
fileRecord = await models.File.findOne({
attributes: ['multihash'],
where: {
type: 'copy320',
sourceFile: uniqSourceFiles[0]
},
raw: true
})

// cache the fileRecord in redis for an hour so we don't have to keep making requests to discovery
if (fileRecord) {
redisClient.set(`streamFallback:::${blockchainId}`, JSON.stringify(fileRecord), 'EX', 60 * 60)
}
} catch (e) {
req.logger.error(`Error falling back to reconstructing data from discovery to stream`, e)
}
}

if (!fileRecord || !fileRecord.multihash) {
return sendResponse(req, res, errorResponseBadRequest(`No file found for blockchainId ${blockchainId}`))
}

Expand All @@ -580,7 +648,7 @@ module.exports = function (app) {
libs.identityService.logTrackListen(blockchainId, delegateOwnerWallet, req.ip)
}

req.params.CID = multihash
req.params.CID = fileRecord.multihash
req.params.streamable = true
res.set('Content-Type', 'audio/mpeg')
next()
Expand Down

0 comments on commit 86a5d86

Please sign in to comment.