Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CON-402 CN Remove all synchronous disk ops #3908

Merged
merged 10 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
const enqueueSync = async (params) => {
const { serviceRegistry } = params
// eslint-disable-next-line node/no-sync
await serviceRegistry.syncQueue.enqueueSync(params)
}

Expand All @@ -13,6 +14,7 @@ const enqueueSync = async (params) => {
*/
const processManualImmediateSync = async (params) => {
const { serviceRegistry } = params
// eslint-disable-next-line node/no-sync
await serviceRegistry.syncImmediateQueue.processManualImmediateSync(params)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const axios = require('axios')
const fs = require('fs')
const fsExtra = require('fs-extra')
const fs = require('fs-extra')
const FormData = require('form-data')
const _ = require('lodash')

Expand Down Expand Up @@ -380,7 +379,7 @@ class TrackTranscodeHandoffManager {
* @returns formData object passed in axios to send a transcode and segment request
*/
static async createFormData(pathToFile) {
const fileExists = await fsExtra.pathExists(pathToFile)
const fileExists = await fs.pathExists(pathToFile)
if (!fileExists) {
throw new Error(`File does not exist at path=${pathToFile}`)
}
Expand Down
2 changes: 1 addition & 1 deletion creator-node/src/config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const axios = require('axios')
const convict = require('convict')
const fs = require('fs')
const fs = require('fs-extra')
const path = require('path')
const os = require('os')
const _ = require('lodash')
Expand Down
18 changes: 9 additions & 9 deletions creator-node/src/diskManager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const path = require('path')
const fs = require('fs')
const fs = require('fs-extra')
const config = require('./config')
const { logger: genericLogger } = require('./logging')
const CID = require('cids')
Expand Down Expand Up @@ -27,14 +27,14 @@ class DiskManager {
* is we should be able to delete the contents of this folder without scanning through other folders with the
* naming scheme.
*/
static getTmpTrackUploadArtifactsPath() {
static async getTmpTrackUploadArtifactsPath() {
const dirPath = path.join(
config.get('storagePath'),
'files',
'tmp_track_artifacts'
)
if (!TMP_TRACK_ARTIFACTS_CREATED) {
this.ensureDirPathExists(dirPath)
await this.ensureDirPathExists(dirPath)
TMP_TRACK_ARTIFACTS_CREATED = true
}
return dirPath
Expand All @@ -51,7 +51,7 @@ class DiskManager {
* eg QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6 will be eg /file_storage/muU/QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6
* @param {String} cid file system destination, either filename or directory
*/
static computeFilePath(cid, ensureDirPathExists = true) {
static async computeFilePath(cid, ensureDirPathExists = true) {
try {
CID.isCID(new CID(cid))
} catch (e) {
Expand All @@ -76,7 +76,7 @@ class DiskManager {

// create the subdirectories in parentDirHash if they don't exist
if (ensureDirPathExists) {
this.ensureDirPathExists(parentDirPath)
await this.ensureDirPathExists(parentDirPath)
}

return path.join(parentDirPath, cid)
Expand Down Expand Up @@ -115,7 +115,7 @@ class DiskManager {
* @param {String} dirName directory name
* @param {String} fileName file name
*/
static computeFilePathInDir(dirName, fileName) {
static async computeFilePathInDir(dirName, fileName) {
if (!dirName || !fileName) {
genericLogger.error(
`Invalid dirName and/or fileName, dirName=${dirName}, fileName=${fileName}`
Expand All @@ -135,7 +135,7 @@ class DiskManager {
)
}

const parentDirPath = this.computeFilePath(dirName)
const parentDirPath = await this.computeFilePath(dirName)
const absolutePath = path.join(parentDirPath, fileName)
genericLogger.info(`File path computed, absolutePath=${absolutePath}`)
return absolutePath
Expand All @@ -146,10 +146,10 @@ class DiskManager {
* If it does exist, it will not overwrite, effectively a no-op
* @param {*} dirPath fs directory path to create if it does not exist
*/
static ensureDirPathExists(dirPath) {
static async ensureDirPathExists(dirPath) {
try {
// the mkdir recursive option is equivalent to `mkdir -p` and should created nested folders several levels deep
fs.mkdirSync(dirPath, { recursive: true })
await fs.mkdir(dirPath, { recursive: true })
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
} catch (e) {
genericLogger.error(
`Error making directory, dirName=${dirPath}, error=${e.toString()}`
Expand Down
32 changes: 16 additions & 16 deletions creator-node/src/diskManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ describe('Test DiskManager', function () {
/**
* getTmpTrackUploadArtifactsPath
*/
it('Should pass if storagePath is correctly set', function () {
it('Should pass if storagePath is correctly set', async function () {
const tmpTrackArtifactPath = path.join(
DiskManager.getConfigStoragePath(),
'files',
'tmp_track_artifacts'
)
assert.deepStrictEqual(
tmpTrackArtifactPath,
DiskManager.getTmpTrackUploadArtifactsPath()
await DiskManager.getTmpTrackUploadArtifactsPath()
)
})

/**
* computeFilePath
*/
it('Should pass if computeFilePath returns the correct path', function () {
const fullPath = DiskManager.computeFilePath(
it('Should pass if computeFilePath returns the correct path', async function () {
const fullPath = await DiskManager.computeFilePath(
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
const validPath = path.join(
Expand All @@ -50,29 +50,29 @@ describe('Test DiskManager', function () {
assert.deepStrictEqual(fullPath, validPath)
})

it('Should fail if fileName is not passed into computeFilePath', function () {
it('Should fail if fileName is not passed into computeFilePath', async function () {
try {
DiskManager.computeFilePath()
await DiskManager.computeFilePath()
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
)
}
})

it(`Should fail if fileName doesn't contain the appropriate amount of characters`, function () {
it(`Should fail if fileName doesn't contain the appropriate amount of characters`, async function () {
try {
DiskManager.computeFilePath('asd')
await DiskManager.computeFilePath('asd')
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
)
}
})

it(`Should fail if fileName contains a slash`, function () {
it(`Should fail if fileName contains a slash`, async function () {
try {
DiskManager.computeFilePath('/file_storage/asdf')
await DiskManager.computeFilePath('/file_storage/asdf')
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
Expand All @@ -83,8 +83,8 @@ describe('Test DiskManager', function () {
/**
* computeFilePathInDir
*/
it('Should pass if computeFilePathInDir returns the correct path', function () {
const fullPath = DiskManager.computeFilePathInDir(
it('Should pass if computeFilePathInDir returns the correct path', async function () {
const fullPath = await DiskManager.computeFilePathInDir(
'QmRSvU8NtadxPPrP4M72wUPBiTqykqziWDuGr6q2arsYW4',
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
Expand All @@ -98,17 +98,17 @@ describe('Test DiskManager', function () {
assert.deepStrictEqual(fullPath, validPath)
})

it('Should fail if dirName and fileName are not passed into computeFilePathInDir', function () {
it('Should fail if dirName and fileName are not passed into computeFilePathInDir', async function () {
try {
DiskManager.computeFilePathInDir()
await DiskManager.computeFilePathInDir()
} catch (e) {
assert.ok(e.message.includes('Must pass in valid dirName and fileName'))
}
})

it('Should fail if dirName or fileName are not a CID passed into computeFilePathInDir', function () {
it('Should fail if dirName or fileName are not a CID passed into computeFilePathInDir', async function () {
try {
DiskManager.computeFilePathInDir(
await DiskManager.computeFilePathInDir(
'Qmdirhash',
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
Expand Down
82 changes: 45 additions & 37 deletions creator-node/src/ffmpeg.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const config = require('./config')
const fs = require('fs')
const fs = require('fs-extra')
const path = require('path')
const ffmpeg = require('ffmpeg-static').path
const spawn = require('child_process').spawn
Expand All @@ -13,7 +13,7 @@ const { logger: genericLogger } = require('./logging')
* @param {string} params.fileDir the directory of the uploaded track artifact
* @param {string} params.fileName the uploaded track artifact filename
* @param {Object} params.logContext the log context used to instantiate a logger
* @returns {Object} response in the structure
* @returns {Promise<Object>} response in the structure
{
segments: {
fileNames: segmentFileNames {string[]}: the segment file names only,
Expand All @@ -24,6 +24,7 @@ const { logger: genericLogger } = require('./logging')
*/
function segmentFile(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)

return new Promise((resolve, reject) => {
const absolutePath = path.resolve(fileDir, fileName)
logger.info(`Segmenting file ${absolutePath}...`)
Expand Down Expand Up @@ -63,24 +64,27 @@ function segmentFile(fileDir, fileName, { logContext }) {
proc.stderr.on('data', (data) => (stderr += data.toString()))

proc.on('close', (code) => {
if (code === 0) {
const segmentFileNames = fs.readdirSync(fileDir + '/segments')
const segmentFilePaths = segmentFileNames.map((filename) =>
path.resolve(fileDir, 'segments', filename)
)
async function asyncFn() {
if (code === 0) {
const segmentFileNames = await fs.readdir(fileDir + '/segments')
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
const segmentFilePaths = segmentFileNames.map((filename) =>
path.resolve(fileDir, 'segments', filename)
)

resolve({
segments: {
fileNames: segmentFileNames,
filePaths: segmentFilePaths
},
m3u8FilePath
})
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
resolve({
segments: {
fileNames: segmentFileNames,
filePaths: segmentFilePaths
},
m3u8FilePath
})
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
}
asyncFn()
})
})
}
Expand All @@ -92,21 +96,22 @@ function segmentFile(fileDir, fileName, { logContext }) {
* @param {string} params.fileDir the directory of the uploaded track artifact
* @param {string} params.fileName the uploaded track artifact filename
* @param {Object} params.logContext the log context used to instantiate a logger
* @returns {string} the path to the transcode
* @returns {Promise<string>} the path to the transcode
*/
function transcodeFileTo320(fileDir, fileName, { logContext }) {
async function transcodeFileTo320(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)
return new Promise((resolve, reject) => {
const sourcePath = path.resolve(fileDir, fileName)
const targetPath = path.resolve(fileDir, fileName.split('.')[0] + '-dl.mp3')
logger.info(`Transcoding file ${sourcePath}...`)

// Exit if dl-copy file already exists at target path.
if (fs.existsSync(targetPath)) {
logger.info(`Downloadable copy already exists at ${targetPath}.`)
resolve(targetPath)
}
const sourcePath = path.resolve(fileDir, fileName)
const targetPath = path.resolve(fileDir, fileName.split('.')[0] + '-dl.mp3')
logger.info(`Transcoding file ${sourcePath}...`)

// Exit if dl-copy file already exists at target path.
if (await fs.pathExists(targetPath)) {
logger.info(`Downloadable copy already exists at ${targetPath}.`)
return targetPath
}

return new Promise((resolve, reject) => {
// https://ffmpeg.org/ffmpeg-formats.html#hls-2
const args = [
'-i',
Expand All @@ -130,20 +135,23 @@ function transcodeFileTo320(fileDir, fileName, { logContext }) {
proc.stderr.on('data', (data) => (stderr += data.toString()))

proc.on('close', (code) => {
if (code === 0) {
if (fs.existsSync(targetPath)) {
logger.info(`Transcoded file ${targetPath}`)
resolve(targetPath)
async function asyncFn() {
if (code === 0) {
if (await fs.pathExists(targetPath)) {
logger.info(`Transcoded file ${targetPath}`)
resolve(targetPath)
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
asyncFn()
})
})
}
Expand Down