Skip to content

Commit

Permalink
ASI-679 Add more logs for phases during upload flow (#2138)
Browse files Browse the repository at this point in the history
* Add/reconfigure logging for file processing and transcoding

* Implemented Vicky's feedback

* Add logging to remaining files

* make logContext an optional parameter for logStatus and logError, don't use backticks for simple strings

* Remove word 'successfully', missed reordered command

* incorporate Vicky's feedback

* update e to error

* Update ffmpeg.js

* {error: e}

* {error: e}

* Clarify regex failure

* lint

* lint

* lint

* lint
  • Loading branch information
joaquincasares committed Dec 28, 2021
1 parent 32c4e90 commit 025f2f3
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 35 deletions.
32 changes: 20 additions & 12 deletions creator-node/src/FileProcessingQueue.js
Expand Up @@ -49,10 +49,10 @@ class FileProcessingQueue {
done(null, { response })
} catch (e) {
this.logError(
transcodeParams.logContext,
`Could not process taskType=${PROCESS_NAMES.transcode} uuid=${
transcodeParams.logContext.requestID
}: ${e.toString()}`
}: ${e.toString()}`,
transcodeParams.logContext
)
done(e.toString())
}
Expand All @@ -62,31 +62,39 @@ class FileProcessingQueue {
this.getFileProcessingQueueJobs = this.getFileProcessingQueueJobs.bind(this)
}

async logStatus(logContext, message) {
async logStatus(message, logContext = {}) {
const logger = genericLogger.child(logContext)
const { waiting, active, completed, failed, delayed } =
await this.queue.getJobCounts()
logger.info(
`FileProcessing Queue: ${message} || active: ${active}, waiting: ${waiting}, failed ${failed}, delayed: ${delayed}, completed: ${completed} `
`FileProcessingQueue: ${message} || active: ${active}, waiting: ${waiting}, failed ${failed}, delayed: ${delayed}, completed: ${completed} `
)
}

async logError(logContext, message) {
async logError(message, logContext = {}) {
const logger = genericLogger.child(logContext)
logger.error(`FileProcessingQueue error: ${message}`)
const { waiting, active, completed, failed, delayed } =
await this.queue.getJobCounts()
logger.error(
`FileProcessingQueue error: ${message} || active: ${active}, waiting: ${waiting}, failed ${failed}, delayed: ${delayed}, completed: ${completed}`
)
}

// TODO: Will make this job a background process
async addTranscodeTask(transcodeParams) {
const { logContext } = transcodeParams
this.logStatus(
logContext,
`Adding ${PROCESS_NAMES.transcode} task! uuid=${logContext.requestID}}`
`Adding ${PROCESS_NAMES.transcode} task! uuid=${logContext.requestID}}`,
logContext
)

const job = await this.queue.add(PROCESS_NAMES.transcode, {
transcodeParams
})
this.logStatus(
`Added ${PROCESS_NAMES.transcode} task, uuid=${logContext.requestID}`,
logContext
)

return job
}
Expand All @@ -96,22 +104,22 @@ class FileProcessingQueue {
const redisKey = constructProcessKey(taskType, uuid)

let state = { status: PROCESS_STATES.IN_PROGRESS }
this.logStatus(logContext, `Starting ${taskType}! uuid=${uuid}}`)
this.logStatus(`Starting ${taskType}, uuid=${uuid}`, logContext)
await redisClient.set(redisKey, JSON.stringify(state), 'EX', EXPIRATION)

let response
try {
response = await func({ logContext }, req)
state = { status: PROCESS_STATES.DONE, resp: response }
this.logStatus(logContext, `Successful ${taskType}! uuid=${uuid}}`)
this.logStatus(`Successful ${taskType}, uuid=${uuid}`, logContext)
await redisClient.set(redisKey, JSON.stringify(state), 'EX', EXPIRATION)
} catch (e) {
state = { status: PROCESS_STATES.FAILED, resp: e.message }
this.logError(
logContext,
`Error with ${taskType}. uuid=${uuid}} resp=${JSON.stringify(
e.message
)}`
)}`,
logContext
)
await redisClient.set(redisKey, JSON.stringify(state), 'EX', EXPIRATION)
throw e
Expand Down
68 changes: 55 additions & 13 deletions creator-node/src/TranscodingQueue.js
Expand Up @@ -30,6 +30,7 @@ class TranscodingQueue {
removeOnFail: true
}
})
this.logStatus('Initialized TranscodingQueue')

// NOTE: Specifying max concurrency here dictates the max concurrency for
// *any* process fn below
Expand All @@ -42,22 +43,22 @@ class TranscodingQueue {
const { fileDir, fileName, logContext } = job.data

try {
this.logStatus(logContext, `segmenting ${fileDir} ${fileName}`)
this.logStatus(`Segmenting ${fileDir} ${fileName}`, logContext)

const filePaths = await ffmpeg.segmentFile(fileDir, fileName, {
logContext
})
this.logStatus(
logContext,
`Successfully completed segment job ${fileDir} ${fileName} in duration ${
Date.now() - start
}ms`
}ms`,
logContext
)
done(null, { filePaths })
} catch (e) {
this.logStatus(
logContext,
`Segment Job Error ${e} in duration ${Date.now() - start}ms`
`Segment Job Error ${e} in duration ${Date.now() - start}ms`,
logContext
)
done(e)
}
Expand All @@ -73,42 +74,43 @@ class TranscodingQueue {

try {
this.logStatus(
logContext,
`transcoding to 320kbps ${fileDir} ${fileName}`
`transcoding to 320kbps ${fileDir} ${fileName}`,
logContext
)

const filePath = await ffmpeg.transcodeFileTo320(fileDir, fileName, {
logContext
})
this.logStatus(
logContext,
`Successfully completed Transcode320 job ${fileDir} ${fileName} in duration ${
Date.now() - start
}ms`
}ms`,
logContext
)
done(null, { filePath })
} catch (e) {
this.logStatus(
logContext,
`Transcode320 Job Error ${e} in duration ${Date.now() - start}`
`Transcode320 Job Error ${e} in duration ${Date.now() - start}`,
logContext
)
done(e)
}
}
)

this.logStatus = this.logStatus.bind(this)
this.logError = this.logError.bind(this)
this.segment = this.segment.bind(this)
this.transcode320 = this.transcode320.bind(this)
this.getTranscodeQueueJobs = this.getTranscodeQueueJobs.bind(this)
}

/**
* Logs a status message and includes current queue info
* Logs a successful status message and includes current queue info
* @param {object} logContext to create a logger.child(logContext) from
* @param {string} message
*/
async logStatus(logContext, message) {
async logStatus(message, logContext = {}) {
const logger = genericLogger.child(logContext)
const { waiting, active, completed, failed, delayed } =
await this.queue.getJobCounts()
Expand All @@ -117,19 +119,46 @@ class TranscodingQueue {
)
}

/**
* Logs an error status message and includes current queue info
* @param {object} logContext to create a logger.child(logContext) from
* @param {string} message
*/
async logError(message, logContext = {}) {
const logger = genericLogger.child(logContext)
const { waiting, active, completed, failed, delayed } =
await this.queue.getJobCounts()
logger.error(
`Transcoding error: ${message} || active: ${active}, waiting: ${waiting}, failed ${failed}, delayed: ${delayed}, completed: ${completed} `
)
}

/**
* Adds a task to the queue that segments up an audio file
* @param {string} fileDir
* @param {string} fileName
* @param {object} logContext to create a logger.child(logContext) from
*/
async segment(fileDir, fileName, { logContext }) {
this.logStatus(
`Adding job to segment queue, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)
const job = await this.queue.add(PROCESS_NAMES.segment, {
fileDir,
fileName,
logContext
})
this.logStatus(
`Job added to segment queue, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)

const result = await job.finished()
this.logStatus(
`Segment job successful, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)
return result
}

Expand All @@ -140,12 +169,25 @@ class TranscodingQueue {
* @param {object} logContext to create a logger.child(logContext) from
*/
async transcode320(fileDir, fileName, { logContext }) {
this.logStatus(
`Adding job to transcode320 queue, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)
const job = await this.queue.add(PROCESS_NAMES.transcode320, {
fileDir,
fileName,
logContext
})
this.logStatus(
`Job added to transcode320 queue, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)

const result = await job.finished()
this.logStatus(
`Transcode320 job successful, fileDir=${fileDir}, fileName=${fileName}`,
logContext
)
return result
}

Expand Down
26 changes: 23 additions & 3 deletions creator-node/src/diskManager.js
@@ -1,6 +1,7 @@
const path = require('path')
const fs = require('fs')
const config = require('./config')
const { logger: genericLogger } = require('./logging')
const { CID } = require('ipfs-http-client-latest')

// regex to check if a directory or just a regular file
Expand Down Expand Up @@ -54,6 +55,7 @@ class DiskManager {
try {
CID.isCID(new CID(cid))
} catch (e) {
genericLogger.error(`CID invalid, cid=${cid}, error=${e.toString()}`)
throw new Error(
`Please pass in a valid cid to computeFilePath. Passed in ${cid} ${e.message}`
)
Expand Down Expand Up @@ -89,19 +91,29 @@ class DiskManager {
* @param {String} fileName file name
*/
static computeFilePathInDir(dirName, fileName) {
if (!dirName || !fileName)
if (!dirName || !fileName) {
genericLogger.error(
`Invalid dirName and/or fileName, dirName=${dirName}, fileName=${fileName}`
)
throw new Error('Must pass in valid dirName and fileName')
}

try {
CID.isCID(new CID(dirName))
CID.isCID(new CID(fileName))
} catch (e) {
genericLogger.error(
`CID invalid, dirName=${dirName}, fileName=${fileName}, error=${e.toString()}`
)
throw new Error(
`Please pass in a valid cid to computeFilePathInDir for dirName and fileName. Passed in dirName: ${dirName} fileName: ${fileName} ${e.message}`
)
}

const parentDirPath = this.computeFilePath(dirName)
return path.join(parentDirPath, fileName)
const absolutePath = path.join(parentDirPath, fileName)
genericLogger.info(`File path computed, absolutePath=${absolutePath}`)
return absolutePath
}

/**
Expand All @@ -114,6 +126,9 @@ class DiskManager {
// the mkdir recursive option is equivalent to `mkdir -p` and should created nested folders several levels deep
fs.mkdirSync(dirPath, { recursive: true })
} catch (e) {
genericLogger.error(
`Error making directory, dirName=${dirPath}, error=${e.toString()}`
)
throw new Error(`Error making directory at ${dirPath} - ${e.message}`)
}
}
Expand All @@ -127,7 +142,12 @@ class DiskManager {
*/
static extractCIDsFromFSPath(fsPath) {
const match = CID_DIRECTORY_REGEX.exec(fsPath)
if (!match || !match.groups) return null
if (!match || !match.groups) {
genericLogger.info(
`Input path does not match cid directory pattern, fsPath=${fsPath}`
)
return null
}

let ret = null
if (match && match.groups && match.groups.outer && match.groups.inner) {
Expand Down
14 changes: 10 additions & 4 deletions creator-node/src/ffmpeg.js
Expand Up @@ -11,8 +11,8 @@ const { logger: genericLogger } = require('./logging')
function segmentFile(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)
return new Promise((resolve, reject) => {
logger.info(`Segmenting file ${fileName}...`)
const absolutePath = path.resolve(fileDir, fileName)
logger.info(`Segmenting file ${absolutePath}...`)

// https://ffmpeg.org/ffmpeg-formats.html#hls-2
const args = [
Expand All @@ -34,9 +34,10 @@ function segmentFile(fileDir, fileName, { logContext }) {
path.resolve(fileDir, 'segments', 'segment%05d.ts'),
// "-vn" flag required to allow track uploading with album art
// https://stackoverflow.com/questions/20193065/how-to-remove-id3-audio-tag-image-or-metadata-from-mp3-with-ffmpeg
'-vn',
'-vn', // skip inclusion of video, process only the audio file without "video"
path.resolve(fileDir, fileName.split('.')[0] + '.m3u8')
]
logger.info(`Spawning: ffmpeg ${args}`)
const proc = spawn(ffmpeg, args)

// capture output
Expand All @@ -48,6 +49,7 @@ function segmentFile(fileDir, fileName, { logContext }) {
proc.on('close', (code) => {
if (code === 0) {
const segmentFilePaths = fs.readdirSync(fileDir + '/segments')
logger.info(`Segmented file ${absolutePath}`)
resolve(segmentFilePaths)
} else {
logger.error('Error when processing file with ffmpeg')
Expand All @@ -62,9 +64,9 @@ function segmentFile(fileDir, fileName, { logContext }) {
function transcodeFileTo320(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)
return new Promise((resolve, reject) => {
logger.info(`Transcoding file ${fileName}...`)
const sourcePath = path.resolve(fileDir, fileName)
const targetPath = path.resolve(fileDir, fileName.split('.')[0] + '-dl.mp3')
logger.info(`Transcoding file ${sourcePath}...`)

// Exit if dl-copy file already exists at target path.
if (fs.existsSync(targetPath)) {
Expand All @@ -82,9 +84,10 @@ function transcodeFileTo320(fileDir, fileName, { logContext }) {
'320k',
// "-vn" flag required to allow track uploading with album art
// https://stackoverflow.com/questions/20193065/how-to-remove-id3-audio-tag-image-or-metadata-from-mp3-with-ffmpeg
'-vn',
'-vn', // skip inclusion of video, process only the audio file without "video"
targetPath
]
logger.info(`Spawning: ffmpeg ${args}`)
const proc = spawn(ffmpeg, args)

// capture output
Expand All @@ -96,8 +99,11 @@ function transcodeFileTo320(fileDir, fileName, { logContext }) {
proc.on('close', (code) => {
if (code === 0) {
if (fs.existsSync(targetPath)) {
logger.info(`Transcoded file ${targetPath}`)
resolve(targetPath)
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
} else {
Expand Down
1 change: 1 addition & 0 deletions creator-node/src/ipfsClient.js
Expand Up @@ -17,6 +17,7 @@ const ipfsLatest = ipfsClientLatest({
})

async function logIpfsPeerIds() {
genericLogger.info(`Starting IPFS, ipfsAddr=${ipfsAddr}`)
const identity = await ipfs.id()
// Pretty print the JSON obj with no filter fn (e.g. filter by string or number) and spacing of size 2
genericLogger.info(
Expand Down

0 comments on commit 025f2f3

Please sign in to comment.