Skip to content

Commit

Permalink
Allow DAG to be sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Jan 16, 2022
1 parent 9c748f1 commit 99504fe
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 63 deletions.
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"js-yaml": "^4.0.0",
"junk": "^4.0.0",
"map-obj": "^5.0.0",
"memoize-one": "^6.0.0",
"moize": "^6.1.0",
"nvexeca": "^7.0.0",
"omit.js": "^2.0.2",
Expand Down
126 changes: 65 additions & 61 deletions src/config/normalize/dag/run.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import memoizeOne from 'memoize-one'
import pProps from 'p-props'

import { mapValues } from '../../../utils/map.js'

import { createDag, addDagEdge } from './structure.js'

// Run several async or async tasks in parallel while still allowing them to
// use each other's return values.
// Same as `runDag()` but allows methods to be async, or a mixed of sync|async.
export const runDagAsync = async function (tasks) {
const tasksReturns = runDag(tasks)
return await pProps(tasksReturns)
}

// Run several tasks in parallel while still allowing them to use each other's
// return values.
// This creates a DAG.
// - Unlike traditional DAGs which require each task to declaring its
// dependencies as a static array, tasks declare those in an imperative
Expand All @@ -17,92 +24,94 @@ import { createDag, addDagEdge } from './structure.js'
// The `tasks` object:
// - Is unordered
// - Keys are used by tasks to reference each other
// - Values are async functions which receive an object of promises as single
// argument
// - Values are functions which receive an object as single argument
// - The object keys are the other task keys
// - The object values are the other task return values
// - To use functions with more arguments, those must be bound or use
// lexical scoping
// The return value is a similar `tasks` object but with the functions replaced
// by their return value instead.
export const runDag = async function (tasks) {
// by their return values instead.
export const runDag = function (tasks) {
const tasksNames = Object.keys(tasks)
const dag = createDag(tasksNames)
const tasksPromises = mapValues(tasks, createTaskPromise)
const allTasksArgs = mapValues(
tasks,
getTasksArgs.bind(undefined, dag, Object.entries(tasksPromises)),
)
const tasksReturns = await pProps(tasks, (taskFunc, taskName) =>
runTask(taskFunc, tasksPromises[taskName], allTasksArgs[taskName]),
)
const boundTasks = getBoundTasks(tasks, tasksNames, dag)
const tasksReturns = mapValues(boundTasks, runBoundTaskFunc)
return tasksReturns
}

// Creates one promise to represent each task's state.
// `taskPromise` has a noop error handler to avoid uncaught promise rejection
// process errors in case a task fails but no other task follows it.
// Tasks which do follow it will still receive the rejection correctly.
const createTaskPromise = function () {
const taskPromise = createPromise()
// eslint-disable-next-line promise/prefer-await-to-then
taskPromise.promise.catch(noop)
return taskPromise
}

// Create a promise which state can be manually manipulated
/* eslint-disable fp/no-let, init-declarations, fp/no-mutation,
promise/avoid-new */
const createPromise = function () {
let resolveFunc
let rejectFunc
const promise = new Promise((resolve, reject) => {
resolveFunc = resolve
rejectFunc = reject
})
return { promise, resolve: resolveFunc, reject: rejectFunc }
// Since each boundTask references each other, we need to create a `boundTasks`
// reference first, bind each `taskFunc`, then update that reference.
const getBoundTasks = function (tasks, tasksNames, dag) {
const boundTasks = mapValues(tasks, noop)
const boundTasksValues = mapValues(tasks, (taskFunc, parentTaskName) =>
bindTaskFunc({
dag,
boundTasks,
tasksNames,
parentTaskName,
taskFunc,
}),
)
// eslint-disable-next-line fp/no-mutating-assign
return Object.assign(boundTasks, boundTasksValues)
}
/* eslint-enable fp/no-let, init-declarations, fp/no-mutation,
promise/avoid-new */

// eslint-disable-next-line no-empty-function
const noop = function () {}

// Each task receive a `tasksArgs` object which is a copy of `tasksPromises`
// except it is wrapped in getters.
// Each sets of getters keeps track of the `parentTaskName` it is associated
// with, which allows knowing which task references which.
// We use getters for convenience.
// eslint-disable-next-line max-params
const getTasksArgs = function (dag, tasksPromisesEntries, _, parentTaskName) {
// Each `taskFunc` is being bound with a `tasksArg` object.
// That object has methods to call the other `taskFuncs`.
// We need to memoize each `taskFunc` since references mean they would be
// called several times.
const bindTaskFunc = function ({
dag,
boundTasks,
tasksNames,
parentTaskName,
taskFunc,
}) {
const boundTaskFunc = memoizeOne(taskFunc)
const tasksArg = getTasksArg({
dag,
boundTasks,
tasksNames,
parentTaskName,
})
return boundTaskFunc.bind(undefined, tasksArg)
}

const getTasksArg = function ({ dag, boundTasks, tasksNames, parentTaskName }) {
const tasksArgs = {}

// eslint-disable-next-line fp/no-loops
for (const [childTaskName, taskPromise] of tasksPromisesEntries) {
for (const childTaskName of tasksNames) {
setTaskArgGetter({
dag,
tasksArgs,
parentTaskName,
childTaskName,
taskPromise,
boundTasks,
})
}

return tasksArgs
}

// We use getters to simplify how `tasksArg` is consumed.
const setTaskArgGetter = function ({
dag,
tasksArgs,
parentTaskName,
childTaskName,
taskPromise,
boundTasks,
}) {
// eslint-disable-next-line fp/no-mutating-methods
Object.defineProperty(tasksArgs, childTaskName, {
get: getTaskArg.bind(undefined, {
dag,
parentTaskName,
childTaskName,
taskPromise,
boundTasks,
}),
// `tasksArgs` should not be iterated since it would call the getter,
// including with object spreading.
Expand All @@ -112,24 +121,19 @@ const setTaskArgGetter = function ({
})
}

// Each `tasksArgs` method just forward to another `taskFunc` but it also
// validate against cycles.
// It does so by keeping track of the `parentTaskName` it is associated with.
const getTaskArg = function ({
dag,
parentTaskName,
childTaskName,
taskPromise,
boundTasks,
}) {
addDagEdge(dag, parentTaskName, childTaskName)
return taskPromise.promise
return boundTasks[childTaskName]()
}

// Run a single task function and resolve|reject its associated promise
const runTask = async function (taskFunc, { resolve, reject }, tasksArgs) {
try {
const taskReturn = await taskFunc(tasksArgs)
resolve(taskReturn)
return taskReturn
} catch (error) {
reject(error)
throw error
}
const runBoundTaskFunc = function (boundTaskFunc) {
return boundTaskFunc()
}
4 changes: 2 additions & 2 deletions src/config/normalize/main.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { mapValues } from '../../utils/map.js'

import { runNormalizer } from './check.js'
import { runDag } from './dag/run.js'
import { runDagAsync } from './dag/run.js'
import { CONFIG_PROPS } from './properties.js'

// Normalize configuration shape and do custom validation.
Expand All @@ -25,7 +25,7 @@ export const normalizeConfig = async function (config, command, configInfos) {
configInfos: configInfosA,
}),
)
const configProps = await runDag(configPropsFuncs)
const configProps = await runDagAsync(configPropsFuncs)
const configA = mergeConfigProps(configProps)
return configA
}
Expand Down

0 comments on commit 99504fe

Please sign in to comment.