diff --git a/README.md b/README.md index c183848..f25b90f 100644 --- a/README.md +++ b/README.md @@ -5,25 +5,26 @@ Composer is a new programming model from [IBM Research](https://ibm.biz/serverless-research) for composing [IBM Cloud Functions](https://ibm.biz/openwhisk), built on [Apache -OpenWhisk](https://github.com/apache/incubator-openwhisk). Composer -extends Functions and sequences with more powerful control flow and -automatic state management. With it, developers can build even more +OpenWhisk](https://github.com/apache/incubator-openwhisk). +With composer, developers can build even more serverless applications including using it for IoT, with workflow orchestration, conversation services, and devops automation, to name a few examples. -Composer helps you express cloud-native apps that are serverless by -construction: scale automatically, and pay as you go and not for idle -time. Programming compositions for IBM Cloud Functions is done via the -[functions shell](https://github.com/ibm-functions/shell), which +Composer extends Functions and sequences with more powerful control +flow and automatic state management. +Composer helps express cloud-native apps that are serverless by +construction: scale automatically, pay as you go and not for idle time. + +The [IBM Cloud functions shell](https://github.com/ibm-functions/shell) offers a CLI and graphical interface for fast, incremental, iterative, and local development of serverless apps. Some additional highlights of the shell include: -* Edit your code and program using your favorite text editor, rather than using a drag-n-drop UI -* Validate your compositions with readily accessible visualizations, without switching tools or using a browser -* Deploy and invoke compositions using familiar CLI commands -* Debug your invocations with either familiar CLI commands or readily accessible visualizations +* Edit your code and program using your favorite text editor, rather than using a drag-n-drop UI. +* Validate compositions with readily accessible visualizations, without switching tools or using a browser. +* Deploy and invoke compositions using familiar CLI commands. +* Debug invocations with either familiar CLI commands or readily accessible visualizations. Composer and shell are currently available as IBM Research previews. We are excited about both and are looking forward to what @@ -39,11 +40,9 @@ you to [join us on slack](http://ibm.biz/composer-users). This repository includes: - * [tutorial](docs) for getting started with Composer in the [docs](docs) folder, - * [composer](composer.js) node.js module to author compositions using JavaScript, - * [conductor](conductor.js) action code to orchestrate the execution of compositions, - * [manager](manager.js) node.js module to query the state of compositions, - * [test-harness](test-harness.js) helper module for testing composer, - * [redis-promise](redis-promise.js) helper module that implements a promisified redis client for node.js, + * a [composer](composer.js) node.js module to author compositions using JavaScript, + * a [compose](bin/compose) shell script for deploying compositions, + * a [tutorial](docs/README.md), + * a [reference manual](docs/COMPOSER.md), * example compositions in the [samples](samples) folder, - * unit tests in the [test](test) folder. + * tests in the [test](test) folder. diff --git a/bin/compose b/bin/compose new file mode 100755 index 0000000..bfe723c --- /dev/null +++ b/bin/compose @@ -0,0 +1,27 @@ +#!/usr/bin/env node + +'use strict' + +const fs = require('fs') +const vm = require('vm') +const minimist = require('minimist') +const composer = require('../composer') + +const argv = minimist(process.argv.slice(2), { string: ['apihost', 'auth', 'deploy'], boolean: 'insecure', alias: { auth: 'u', insecure: 'i' } }) + +if (argv._.length !== 1) { + console.error('Usage: compose [--deploy ] [--apihost ] [--auth ] [--insecure]') + return +} + +const filename = argv._[0] +const source = fs.readFileSync(filename, { encoding: 'utf8' }) +const composition = filename.slice(filename.lastIndexOf('.')) === '.js' ? vm.runInNewContext(source, { composer, require, console, process }) : composer.deserialize(JSON.parse(source)) +if (argv.deploy) { + const options = { ignore_certs: argv.insecure } + if (argv.apihost) options.apihost = argv.apihost + if (argv.auth) options.api_key = argv.auth + composer.openwhisk(options).compositions.deploy(composition,argv.deploy).catch(console.error) +} else { + console.log(JSON.stringify(composition, null, 4)) +} diff --git a/composer.js b/composer.js index 80bd59d..70e7e16 100644 --- a/composer.js +++ b/composer.js @@ -1,5 +1,5 @@ /* - * Copyright 2017 IBM Corporation + * Copyright 2017-2018 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,269 +16,504 @@ 'use strict' -const clone = require('clone') -const util = require('util') +// composer module + const fs = require('fs') +const os = require('os') +const path = require('path') +const util = require('util') +const uglify = require('uglify-es') class ComposerError extends Error { - constructor(message, cause) { - super(message) - const index = this.stack.indexOf('\n') - this.stack = this.stack.substring(0, index) + '\nCause: ' + util.inspect(cause) + this.stack.substring(index) + constructor(message, argument) { + super(message + (typeof argument !== 'undefined' ? '\nArgument: ' + util.inspect(argument) : '')) } } -function chain(front, back) { - front.States.push(...back.States) - front.Exit.Next = back.Entry - front.Exit = back.Exit - return front +/** + * Validates options and converts to JSON + */ +function validate(options) { + if (options == null) return + if (typeof options !== 'object' || Array.isArray(options)) throw new ComposerError('Invalid options', options) + options = JSON.stringify(options) + if (options === '{}') return + return JSON.parse(options) } -function push(id) { - const Entry = { Type: 'Push', id } - return { Entry, States: [Entry], Exit: Entry } +/** + * Encodes a composition as an action by injecting the conductor code + */ +function encode({ name, action }) { + if (action.exec.kind !== 'composition') return { name, action } + const code = `${conductor}(${JSON.stringify(action.exec.composition)})\n` // invoke conductor on composition + return { name, action: { exec: { kind: 'nodejs:default', code }, annotations: [{ key: 'conductor', value: action.exec.composition }] } } } -function pop(id) { - const Entry = { Type: 'Pop', id } - return { Entry, States: [Entry], Exit: Entry } +/** + * Parses a (possibly fully qualified) resource name and validates it. If it's not a fully qualified name, + * then attempts to qualify it. + * + * Examples string to namespace, [package/]action name + * foo => /_/foo + * pkg/foo => /_/pkg/foo + * /ns/foo => /ns/foo + * /ns/pkg/foo => /ns/pkg/foo + */ +function parseActionName(name) { + if (typeof name !== 'string' || name.trim().length == 0) throw new ComposerError('Name is not specified') + name = name.trim() + let delimiter = '/' + let parts = name.split(delimiter) + let n = parts.length + let leadingSlash = name[0] == delimiter + // no more than /ns/p/a + if (n < 1 || n > 4 || (leadingSlash && n == 2) || (!leadingSlash && n == 4)) throw new ComposerError('Name is not valid') + // skip leading slash, all parts must be non empty (could tighten this check to match EntityName regex) + parts.forEach(function (part, i) { if (i > 0 && part.trim().length == 0) throw new ComposerError('Name is not valid') }) + let newName = parts.join(delimiter) + if (leadingSlash) return newName + else if (n < 3) return `${delimiter}_${delimiter}${newName}` + else return `${delimiter}${newName}` } -function begin(id, symbol, value) { - const Entry = { Type: 'Let', Symbol: symbol, Value: value, id } - return { Entry, States: [Entry], Exit: Entry } -} +class Composition { + constructor(composition, options, actions = []) { + // collect actions defined in nested composition + Object.keys(composition).forEach(key => { + if (composition[key] instanceof Composition) { + // TODO: check for duplicate entries + actions.push(...composition[key].actions || []) + composition[key] = composition[key].composition + } + }) + if (actions.length > 0) this.actions = actions + options = validate(options) + if (typeof options !== 'undefined') composition = Object.assign({ options }, composition) + // flatten composition array + this.composition = Array.isArray(composition) ? [].concat(...composition) : [composition] + } + + /** Names the composition and returns a composition which invokes the named composition */ + named(name) { + if (arguments.length > 1) throw new ComposerError('Too many arguments') + if (typeof name !== 'string') throw new ComposerError('Invalid argument', name) + name = parseActionName(name) + if (this.actions && this.actions.findIndex(action => action.name === name) !== -1) throw new ComposerError('Duplicate action name', name) + const actions = (this.actions || []).concat({ name, action: { exec: { kind: 'composition', composition: this.composition } } }) + return new Composition({ type: 'action', name }, null, actions) + } -function end(id) { - const Entry = { Type: 'End', id } - return { Entry, States: [Entry], Exit: Entry } + /** Encodes all compositions as actions by injecting the conductor code in them */ + encode(name) { + if (arguments.length > 1) throw new ComposerError('Too many arguments') + if (typeof name !== 'undefined' && typeof name !== 'string') throw new ComposerError('Invalid argument', name) + const obj = typeof name === 'string' ? this.named(name) : this + if (obj.composition.length !== 1 || obj.composition[0].type !== 'action') throw new ComposerError('Cannot encode anonymous composition') + return new Composition(obj.composition, null, obj.actions.map(encode)) + } } -const isObject = obj => typeof (obj) === 'object' && obj !== null && !Array.isArray(obj) +class Compositions { + constructor(wsk) { + this.actions = wsk.actions + } -class Composer { - task(obj, options) { - if (options != null && options.output) return this.assign(options.output, obj, options.input) - if (options != null && options.merge) return this.sequence(this.retain(obj), ({ params, result }) => Object.assign({}, params, result)) - const id = {} - let Entry - if (obj == null) { // identity function (must throw errors if any) - Entry = { Type: 'Task', Helper: 'null', Function: 'params => params', id } - } else if (typeof obj === 'object' && typeof obj.Entry === 'object' && Array.isArray(obj.States) && typeof obj.Exit === 'object') { // an action composition - return clone(obj) - } else if (typeof obj === 'object' && typeof obj.Entry === 'string' && typeof obj.States === 'object' && typeof obj.Exit === 'string') { // a compiled composition - return this.decompile(obj) - } else if (typeof obj === 'function') { // function - Entry = { Type: 'Task', Function: obj.toString(), id } - } else if (typeof obj === 'string') { // action - Entry = { Type: 'Task', Action: obj, id } - } else if (typeof obj === 'object' && typeof obj.Helper !== 'undefined' && typeof obj.Function === 'string') { //helper function - Entry = { Type: 'Task', Function: obj.Function, Helper: obj.Helper, id } - } else { // error - throw new ComposerError('Invalid composition argument', obj) - } - return { Entry, States: [Entry], Exit: Entry } + deploy(composition, name) { + if (arguments.length > 2) throw new ComposerError('Too many arguments') + if (!(composition instanceof Composition)) throw new ComposerError('Invalid argument', composition) + const obj = composition.encode(name) + return obj.actions.reduce((promise, action) => promise.then(() => this.actions.delete(action).catch(() => { })) + .then(() => this.actions.update(action)), Promise.resolve()) + .then(() => composition) } +} + +class Composer { + openwhisk(options) { + // try to extract apihost and key first from whisk property file file and then from process.env + let apihost + let api_key + + try { + const wskpropsPath = process.env.WSK_CONFIG_FILE || path.join(os.homedir(), '.wskprops') + const lines = fs.readFileSync(wskpropsPath, { encoding: 'utf8' }).split('\n') + + for (let line of lines) { + let parts = line.trim().split('=') + if (parts.length === 2) { + if (parts[0] === 'APIHOST') { + apihost = parts[1] + } else if (parts[0] === 'AUTH') { + api_key = parts[1] + } + } + } + } catch (error) { } + + if (process.env.__OW_API_HOST) apihost = process.env.__OW_API_HOST + if (process.env.__OW_API_KEY) api_key = process.env.__OW_API_KEY - sequence() { - if (arguments.length == 0) return this.task() - return Array.prototype.map.call(arguments, x => this.task(x), this).reduce(chain) + const wsk = require('openwhisk')(Object.assign({ apihost, api_key }, options)) + wsk.compositions = new Compositions(wsk) + return wsk } seq() { return this.sequence(...arguments) } - if(test, consequent, alternate) { - if (test == null || consequent == null) throw new ComposerError('Missing arguments in composition', arguments) - const id = {} - test = chain(push(id), this.task(test)) - consequent = this.task(consequent) - alternate = this.task(alternate) - const Exit = { Type: 'Pass', id } - const choice = { Type: 'Choice', Then: consequent.Entry, Else: alternate.Entry, id } - test.States.push(choice) - test.States.push(...consequent.States) - test.States.push(...alternate.States) - test.Exit.Next = choice - consequent.Exit.Next = Exit - alternate.Exit.Next = Exit - test.States.push(Exit) - test.Exit = Exit - return test + value() { + return this.literal(...arguments) } - while(test, body) { - if (test == null || body == null) throw new ComposerError('Missing arguments in composition', arguments) - const id = {} - test = chain(push(id), this.task(test)) - body = this.task(body) - const Exit = { Type: 'Pass', id } - const choice = { Type: 'Choice', Then: body.Entry, Else: Exit, id } - test.States.push(choice) - test.States.push(...body.States) - test.Exit.Next = choice - body.Exit.Next = test.Entry - test.States.push(Exit) - test.Exit = Exit - return test + /** Takes a serialized Composition and returns a Composition instance */ + deserialize({ composition, actions }) { + return new Composition(composition, null, actions) } - try(body, handler) { - if (body == null || handler == null) throw new ComposerError('Missing arguments in composition', arguments) - const id = {} - body = this.task(body) - handler = this.task(handler) - const Exit = { Type: 'Pass', id } - const Entry = { Type: 'Try', Next: body.Entry, Handler: handler.Entry, id } - const pop = { Type: 'Catch', Next: Exit, id } - const States = [Entry] - States.push(...body.States, pop, ...handler.States, Exit) - body.Exit.Next = pop - handler.Exit.Next = Exit - return { Entry, States, Exit } + task(obj) { + if (arguments.length > 1) throw new ComposerError('Too many arguments') + if (obj == null) return this.seq() + if (obj instanceof Composition) return obj + if (typeof obj === 'function') return this.function(obj) + if (typeof obj === 'string') return this.action(obj) + throw new ComposerError('Invalid argument', obj) } - retain(body, flag = false) { - if (body == null) throw new ComposerError('Missing arguments in composition', arguments) - if (typeof flag !== 'boolean') throw new ComposerError('Invalid retain flag', flag) - - const id = {} - if (!flag) return chain(push(id), chain(this.task(body), pop(id))) - - let helperFunc_1 = { 'Helper': 'retain_1', 'Function': 'params => ({params})' } - let helperFunc_3 = { 'Helper': 'retain_3', 'Function': 'params => ({params})' } - let helperFunc_2 = { 'Helper': 'retain_2', 'Function': 'params => ({ params: params.params, result: params.result.params })' } - - return this.sequence( - this.retain( - this.try( - this.sequence( - body, - helperFunc_1 - ), - helperFunc_3 - ) - ), - helperFunc_2 - ) + sequence() { // varargs, no options + return new Composition(Array.prototype.map.call(arguments, obj => this.task(obj), this)) } - assign(dest, body, source, flag = false) { - if (dest == null || body == null) throw new ComposerError('Missing arguments in composition', arguments) - if (typeof flag !== 'boolean') throw new ComposerError('Invalid assign flag', flag) - - let helperFunc_1 = { 'Helper': 'assign_1', 'Function': 'params => params[source]' }; - let helperFunc_2 = { 'Helper': 'assign_2', 'Function': 'params => { params.params[dest] = params.result; return params.params }' }; - - const t = source ? this.let('source', source, this.retain(this.sequence(helperFunc_1, body), flag)) : this.retain(body, flag) - return this.let('dest', dest, t, helperFunc_2) + if(test, consequent, alternate, options) { + if (arguments.length > 4) throw new ComposerError('Too many arguments') + return new Composition({ type: 'if', test: this.task(test), consequent: this.task(consequent), alternate: this.task(alternate) }, options) } - let(arg1, arg2) { - if (arg1 == null) throw new ComposerError('Missing arguments in composition', arguments) - if (typeof arg1 === 'string') { - const id = {} - return chain(begin(id, arg1, arg2), chain(this.sequence(...Array.prototype.slice.call(arguments, 2)), end(id))) - } else if (isObject(arg1)) { - const enter = [] - const exit = [] - for (const name in arg1) { - const id = {} - enter.push(begin(id, name, arg1[name])) - exit.unshift(end(id)) - } - if (enter.length == 0) return this.sequence(...Array.prototype.slice.call(arguments, 1)) - return chain(enter.reduce(chain), chain(this.sequence(...Array.prototype.slice.call(arguments, 1)), exit.reduce(chain))) - } else { - throw new ComposerError('Invalid first let argument', arg1) - } + while(test, body, options) { + if (arguments.length > 3) throw new ComposerError('Too many arguments') + return new Composition({ type: 'while', test: this.task(test), body: this.task(body) }, options) } - retry(count, body) { - if (body == null) throw new ComposerError('Missing arguments in composition', arguments) - if (typeof count !== 'number') throw new ComposerError('Invalid retry count', count) + dowhile(body, test, options) { + if (arguments.length > 3) throw new ComposerError('Too many arguments') + return new Composition({ type: 'dowhile', test: this.task(test), body: this.task(body) }, options) + } - let helperFunc_1 = { 'Helper': 'retry_1', 'Function': "params => typeof params.result.error !== 'undefined' && count-- > 0" } - let helperFunc_2 = { 'Helper': 'retry_2', 'Function': 'params => params.params' } - let helperFunc_3 = { 'Helper': 'retry_3', 'Function': 'params => params.result' } + try(body, handler, options) { + if (arguments.length > 3) throw new ComposerError('Too many arguments') + return new Composition({ type: 'try', body: this.task(body), handler: this.task(handler) }, options) + } - return this.let('count', count, - this.retain(body, true), - this.while( - helperFunc_1, - this.sequence(helperFunc_2, this.retain(body, true))), - helperFunc_3) + finally(body, finalizer, options) { + if (arguments.length > 3) throw new ComposerError('Too many arguments') + return new Composition({ type: 'finally', body: this.task(body), finalizer: this.task(finalizer) }, options) } - repeat(count, body) { - if (body == null) throw new ComposerError('Missing arguments in composition', arguments) - if (typeof count !== 'number') throw new ComposerError('Invalid repeat count', count) + let(declarations) { // varargs, no options + if (typeof declarations !== 'object' || declarations === null) throw new ComposerError('Invalid argument', declarations) + return new Composition({ type: 'let', declarations, body: this.seq(...Array.prototype.slice.call(arguments, 1)) }) + } - let helperFunc_1 = { 'Helper': 'repeat_1', 'Function': '() => count-- > 0' } - return this.let('count', count, this.while(helperFunc_1, body)) + literal(value, options) { + if (arguments.length > 2) throw new ComposerError('Too many arguments') + if (typeof value === 'function') throw new ComposerError('Invalid argument', value) + return new Composition({ type: 'literal', value: typeof value === 'undefined' ? {} : value }, options) } - value(json) { - const id = {} - if (typeof json === 'function') throw new ComposerError('Value cannot be a function', json.toString()) - const Entry = { Type: 'Task', Value: typeof json === 'undefined' ? {} : json, id } - return { Entry, States: [Entry], Exit: Entry } + function(fun, options) { + if (arguments.length > 2) throw new ComposerError('Too many arguments') + if (typeof fun === 'function') { + fun = `${fun}` + if (fun.indexOf('[native code]') !== -1) throw new ComposerError('Cannot capture native function', fun) + } + if (typeof fun === 'string') { + fun = { kind: 'nodejs:default', code: fun } + } + if (typeof fun !== 'object' || fun === null) throw new ComposerError('Invalid argument', fun) + return new Composition({ type: 'function', exec: fun }, options) } - compile(obj, filename) { - if (typeof obj !== 'object' || typeof obj.Entry !== 'object' || !Array.isArray(obj.States) || typeof obj.Exit !== 'object') { - throw new ComposerError('Invalid argument to compile', obj) + action(name, options) { + if (arguments.length > 2) throw new ComposerError('Too many arguments') + name = parseActionName(name) // throws ComposerError if name is not valid + let exec + if (options && Array.isArray(options.sequence)) { // native sequence + const components = options.sequence.map(a => a.indexOf('/') == -1 ? `/_/${a}` : a) + exec = { kind: 'sequence', components } + delete options.sequence } - obj = clone(obj) - const States = {} - let Entry - let Exit - let Count = 0 - obj.States.forEach(state => { - if (typeof state.id.id === 'undefined') state.id.id = Count++ - }) - obj.States.forEach(state => { - const id = (state.Type === 'Task' ? state.Action && 'action' || state.Function && 'function' || state.Value && 'value' : state.Type.toLowerCase()) + '_' + state.id.id - States[id] = state - state.id = id - if (state === obj.Entry) Entry = id - if (state === obj.Exit) Exit = id - }) - obj.States.forEach(state => { - if (state.Next) state.Next = state.Next.id - if (state.Then) state.Then = state.Then.id - if (state.Else) state.Else = state.Else.id - if (state.Handler) state.Handler = state.Handler.id - }) - obj.States.forEach(state => { - delete state.id - }) - const app = { Entry, States, Exit } - if (filename) fs.writeFileSync(filename, JSON.stringify(app, null, 4), { encoding: 'utf8' }) - return app + if (options && typeof options.filename === 'string') { // read action code from file + options.action = fs.readFileSync(options.filename, { encoding: 'utf8' }) + delete options.filename + } + if (options && typeof options.action === 'function') { + options.action = `${options.action}` + if (options.action.indexOf('[native code]') !== -1) throw new ComposerError('Cannot capture native function', options.action) + } + if (options && typeof options.action === 'string') { + options.action = { kind: 'nodejs:default', code: options.action } + } + if (options && typeof options.action === 'object' && options.action !== null) { + exec = options.action + delete options.action + } + return new Composition({ type: 'action', name }, options, exec ? [{ name, action: { exec } }] : []) } - decompile(obj) { - if (typeof obj !== 'object' || typeof obj.Entry !== 'string' || typeof obj.States !== 'object' || typeof obj.Exit !== 'string') { - throw new ComposerError('Invalid argument to decompile', obj) + retain(body, options) { + if (arguments.length > 2) throw new ComposerError('Too many arguments') + if (options && typeof options.filter === 'function') { + // return { params: filter(params), result: body(params) } + const filter = options.filter + delete options.filter + options.field = 'result' + return this.seq(this.retain(filter), this.retain(this.finally(this.function(({ params }) => params, { helper: 'retain_3' }), body), options)) } - obj = clone(obj) - const States = [] - const ids = [] - for (const name in obj.States) { - const state = obj.States[name] - if (state.Next) state.Next = obj.States[state.Next] - if (state.Then) state.Then = obj.States[state.Then] - if (state.Else) state.Else = obj.States[state.Else] - if (state.Handler) state.Handler = obj.States[state.Handler] - const id = parseInt(name.substring(name.lastIndexOf('_') + 1)) - state.id = ids[id] = typeof ids[id] !== 'undefined' ? ids[id] : {} - States.push(state) + if (options && typeof options.catch === 'boolean' && options.catch) { + // return { params, result: body(params) } even if result is an error + delete options.catch + return this.seq( + this.retain(this.finally(body, this.function(result => ({ result }), { helper: 'retain_1' })), options), + this.function(({ params, result }) => ({ params, result: result.result }), { helper: 'retain_2' })) } - return { Entry: obj.States[obj.Entry], States, Exit: obj.States[obj.Exit] } + if (options && typeof options.field !== 'undefined' && typeof options.field !== 'string') throw new ComposerError('Invalid options', options) + // return new Composition({ params, result: body(params) } if no error, otherwise body(params) + return new Composition({ type: 'retain', body: this.task(body) }, options) + } + + repeat(count) { // varargs, no options + if (typeof count !== 'number') throw new ComposerError('Invalid argument', count) + return this.let({ count }, this.while(this.function(() => count-- > 0, { helper: 'repeat_1' }), this.seq(...Array.prototype.slice.call(arguments, 1)))) + } + + retry(count) { // varargs, no options + if (typeof count !== 'number') throw new ComposerError('Invalid argument', count) + const attempt = this.retain(this.seq(...Array.prototype.slice.call(arguments, 1)), { catch: true }) + return this.let({ count }, + this.function(params => ({ params }), { helper: 'retry_1' }), + this.dowhile( + this.finally(this.function(({ params }) => params, { helper: 'retry_2' }), attempt), + this.function(({ result }) => typeof result.error !== 'undefined' && count-- > 0, { helper: 'retry_3' })), + this.function(({ result }) => result, { helper: 'retry_4' })) } } module.exports = new Composer() + +// conductor action + +const conductor = `const __eval__ = main => eval(main)\nconst main = (${uglify.minify(`${init}`).code})` + +function init(composition) { + function chain(front, back) { + front.slice(-1)[0].next = 1 + front.push(...back) + return front + } + + function compile(json, path = '') { + if (Array.isArray(json)) { + if (json.length === 0) return [{ type: 'pass', path }] + return json.map((json, index) => compile(json, path + '[' + index + ']')).reduce(chain) + } + const options = json.options || {} + switch (json.type) { + case 'action': + return [{ type: 'action', name: json.name, path }] + case 'function': + return [{ type: 'function', exec: json.exec, path }] + case 'literal': + return [{ type: 'literal', value: json.value, path }] + case 'finally': + var body = compile(json.body, path + '.body') + const finalizer = compile(json.finalizer, path + '.finalizer') + var fsm = [[{ type: 'try', path }], body, [{ type: 'exit', path }], finalizer].reduce(chain) + fsm[0].catch = fsm.length - finalizer.length + return fsm + case 'let': + var body = compile(json.body, path + '.body') + return [[{ type: 'let', let: json.declarations, path }], body, [{ type: 'exit', path }]].reduce(chain) + case 'retain': + var body = compile(json.body, path + '.body') + var fsm = [[{ type: 'push', path }], body, [{ type: 'pop', collect: true, path }]].reduce(chain) + if (options.field) fsm[0].field = options.field + return fsm + case 'try': + var body = compile(json.body, path + '.body') + const handler = chain(compile(json.handler, path + '.handler'), [{ type: 'pass', path }]) + var fsm = [[{ type: 'try', path }], body].reduce(chain) + fsm[0].catch = fsm.length + fsm.slice(-1)[0].next = handler.length + fsm.push(...handler) + return fsm + case 'if': + var consequent = compile(json.consequent, path + '.consequent') + var alternate = chain(compile(json.alternate, path + '.alternate'), [{ type: 'pass', path }]) + if (!options.nosave) consequent = chain([{ type: 'pop', path }], consequent) + if (!options.nosave) alternate = chain([{ type: 'pop', path }], alternate) + var fsm = chain(compile(json.test, path + '.test'), [{ type: 'choice', then: 1, else: consequent.length + 1, path }]) + if (!options.nosave) fsm = chain([{ type: 'push', path }], fsm) + consequent.slice(-1)[0].next = alternate.length + fsm.push(...consequent) + fsm.push(...alternate) + return fsm + case 'while': + var consequent = compile(json.body, path + '.body') + var alternate = [{ type: 'pass', path }] + if (!options.nosave) consequent = chain([{ type: 'pop', path }], consequent) + if (!options.nosave) alternate = chain([{ type: 'pop', path }], alternate) + var fsm = chain(compile(json.test, path + '.test'), [{ type: 'choice', then: 1, else: consequent.length + 1, path }]) + if (!options.nosave) fsm = chain([{ type: 'push', path }], fsm) + consequent.slice(-1)[0].next = 1 - fsm.length - consequent.length + fsm.push(...consequent) + fsm.push(...alternate) + return fsm + case 'dowhile': + var test = compile(json.test, path + '.test') + if (!options.nosave) test = chain([{ type: 'push', path }], test) + var fsm = [compile(json.body, path + '.body'), test, [{ type: 'choice', then: 1, else: 2, path }]].reduce(chain) + if (options.nosave) { + fsm.slice(-1)[0].then = 1 - fsm.length + fsm.slice(-1)[0].else = 1 + } else { + fsm.push({ type: 'pop', path }) + fsm.slice(-1)[0].next = 1 - fsm.length + } + var alternate = [{ type: 'pass', path }] + if (!options.nosave) alternate = chain([{ type: 'pop', path }], alternate) + fsm.push(...alternate) + return fsm + } + } + + const fsm = compile(composition) + + const isObject = obj => typeof obj === 'object' && obj !== null && !Array.isArray(obj) + + // encode error object + const encodeError = error => ({ + code: typeof error.code === 'number' && error.code || 500, + error: (typeof error.error === 'string' && error.error) || error.message || (typeof error === 'string' && error) || 'An internal error occurred' + }) + + // error status codes + const badRequest = error => Promise.reject({ code: 400, error }) + const internalError = error => Promise.reject(encodeError(error)) + + return params => Promise.resolve().then(() => invoke(params)).catch(internalError) + + // do invocation + function invoke(params) { + // initial state and stack + let state = 0 + let stack = [] + + // restore state and stack when resuming + if (typeof params.$resume !== 'undefined') { + if (!isObject(params.$resume)) return badRequest('The type of optional $resume parameter must be object') + state = params.$resume.state + stack = params.$resume.stack + if (typeof state !== 'undefined' && typeof state !== 'number') return badRequest('The type of optional $resume.state parameter must be number') + if (!Array.isArray(stack)) return badRequest('The type of $resume.stack must be an array') + delete params.$resume + inspect() // handle error objects when resuming + } + + // wrap params if not a dictionary, branch to error handler if error + function inspect() { + if (!isObject(params)) params = { value: params } + if (typeof params.error !== 'undefined') { + params = { error: params.error } // discard all fields but the error field + state = undefined // abort unless there is a handler in the stack + while (stack.length > 0) { + if (typeof (state = stack.shift().catch) === 'number') break + } + } + } + + // run function f on current stack + function run(f) { + // update value of topmost matching symbol on stack if any + function set(symbol, value) { + const element = stack.find(element => typeof element.let !== 'undefined' && typeof element.let[symbol] !== 'undefined') + if (typeof element !== 'undefined') element.let[symbol] = JSON.parse(JSON.stringify(value)) + } + + // collapse stack for invocation + const env = stack.reduceRight((acc, cur) => typeof cur.let === 'object' ? Object.assign(acc, cur.let) : acc, {}) + let main = '(function main(){try{' + for (const name in env) main += `var ${name}=arguments[1]['${name}'];` + main += `return eval((${f}))(arguments[0])}finally{` + for (const name in env) main += `arguments[1]['${name}']=${name};` + main += '}})' + try { + return __eval__(main)(params, env) + } finally { + for (const name in env) set(name, env[name]) + } + } + + while (true) { + // final state, return composition result + if (typeof state === 'undefined') { + console.log(`Entering final state`) + console.log(JSON.stringify(params)) + if (params.error) return params; else return { params } + } + + // process one state + const json = fsm[state] // json definition for current state + console.log(`Entering state ${state} at path fsm${json.path}`) + const current = state + state = typeof json.next === 'undefined' ? undefined : current + json.next // default next state + switch (json.type) { + case 'choice': + state = current + (params.value ? json.then : json.else) + break + case 'try': + stack.unshift({ catch: current + json.catch }) + break + case 'let': + stack.unshift({ let: JSON.parse(JSON.stringify(json.let)) }) + break + case 'exit': + if (stack.length === 0) return internalError(`State ${current} attempted to pop from an empty stack`) + stack.shift() + break + case 'push': + stack.unshift(JSON.parse(JSON.stringify({ params: json.field ? params[json.field] : params }))) + break + case 'pop': + if (stack.length === 0) return internalError(`State ${current} attempted to pop from an empty stack`) + params = json.collect ? { params: stack.shift().params, result: params } : stack.shift().params + break + case 'action': + return { action: json.name, params, state: { $resume: { state, stack } } } // invoke continuation + break + case 'literal': + params = JSON.parse(JSON.stringify(json.value)) + inspect() + break + case 'function': + let result + try { + result = run(json.exec.code) + } catch (error) { + console.error(error) + result = { error: `An exception was caught at state ${current} (see log for details)` } + } + if (typeof result === 'function') result = { error: `State ${current} evaluated to a function` } + // if a function has only side effects and no return value, return params + params = JSON.parse(JSON.stringify(typeof result === 'undefined' ? params : result)) + inspect() + break + case 'pass': + inspect() + break + default: + return internalError(`State ${current} has an unknown type`) + } + } + } +} diff --git a/conductor.js b/conductor.js deleted file mode 100644 index f8be541..0000000 --- a/conductor.js +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Copyright 2017 IBM Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Run a composition - -'use strict' - -// evaluate main exposing the fields of __env__ as variables -function __eval__(__env__, main) { - main = `(${main})` - let __eval__ = '__eval__=undefined;params=>{try{' - for (const name in __env__) { - __eval__ += `var ${name}=__env__['${name}'];` - } - __eval__ += 'return eval(main)(params)}finally{' - for (const name in __env__) { - __eval__ += `__env__['${name}']=${name};` - } - __eval__ += '}}' - return eval(__eval__) -} - -// keep outer namespace clean -const main = (() => { - const openwhisk = require('openwhisk') - const request = require('request-promise') - const redis = require('redis') - - // inline redis-promise to keep action code in a single file - redis.createAsyncClient = function () { - const client = redis.createClient(...arguments) - const noop = () => { } - let handler = noop - client.on('error', error => handler(error)) - require('redis-commands').list.forEach(f => client[`${f}Async`] = function () { - let failed = false - return new Promise((resolve, reject) => { - handler = error => { - handler = noop - failed = true - reject(error) - } - client[f](...arguments, (error, result) => { - handler = noop - return error ? reject(error) : resolve(result) - }) - }).catch(error => { - if (failed) client.end(true) - return Promise.reject(error) - }) - }) - return client - } - - let wsk // cached openwhisk instance - let db // cached redis instance - - // encode error object - const encodeError = error => ({ - code: typeof error.code === 'number' && error.code || 500, - error: (typeof error.error === 'string' && error.error) || error.message || (typeof error === 'string' && error) || 'An internal error occurred' - }) - - // error status codes - const ok = () => ({ message: 'OK' }) - const badRequest = error => Promise.reject({ code: 400, error }) - const notFound = error => Promise.reject({ code: 404, error }) - const gone = error => Promise.reject({ code: 410, error }) - const internalError = error => Promise.reject(encodeError(error)) - - const isObject = obj => typeof (obj) === 'object' && obj !== null && !Array.isArray(obj) - - // catch all exceptions and rejected promises - return params => { - try { - return invoke(params).catch(internalError) - } catch (error) { - return internalError(error) - } - } - - function poll(activationId, resolve) { // poll for activation record (1s interval) - return wsk.activations.get(activationId).then(resolve, () => setTimeout(() => poll(activationId, resolve), 1000)) - } - - // do invocation - function invoke(params) { - // check parameters - if (!isObject(params.$config)) return badRequest('Missing $config parameter of type object') - if (typeof params.$config.redis !== 'string') return badRequest('Missing $config.redis parameter of type string') - if (typeof params.$config.notify !== 'undefined' && typeof params.$config.notify !== 'boolean') return badRequest('Type of $config.notify parameter must be Boolean') - if (typeof params.$config.expiration !== 'undefined' && typeof params.$config.expiration !== 'number') return badRequest('Type of $config.expiration parameter must be number') - if (typeof params.$activationId !== 'undefined' && typeof params.$activationId !== 'number' && typeof params.$activationId !== 'string') return badRequest('Type of $activationId parameter must be number or string') - if (typeof params.$sessionId !== 'undefined' && typeof params.$sessionId !== 'number' && typeof params.$sessionId !== 'string') return badRequest('Type of $sessionId parameter must be number or string') - if (typeof params.$invoke !== 'undefined' && !isObject(params.$invoke)) return badRequest('Type of $invoke parameter must be object') - if (typeof params.$blocking !== 'undefined' && typeof params.$blocking !== 'boolean') return badRequest('Type of $blocking parameter must be Boolean') - if (typeof params.$invoke === 'undefined' && typeof params.$sessionId === 'undefined') return badRequest('Missing $invoke or $sessionId parameter') - - // configuration - const notify = params.$config.notify - const expiration = params.$config.expiration || (86400 * 7) - const resuming = typeof params.$sessionId !== 'undefined' - const blocking = params.__ow_method || params.$blocking - const session = params.$sessionId || process.env.__OW_ACTIVATION_ID - - // initialize openwhisk instance - if (!wsk) { - wsk = openwhisk({ ignore_certs: true }) - if (!notify) wsk.actions.qs_options.invoke = ['blocking', 'notify', 'cause'] - } - - // redis keys - const apiKey = process.env.__OW_API_KEY.substring(0, process.env.__OW_API_KEY.indexOf(':')) - const sessionStateKey = `${apiKey}:session:live:${session}` - const sessionResultKey = `${apiKey}:session:done:${session}` - const sessionTraceKey = `${apiKey}:list:${session}` - const sessionsKey = `${apiKey}:all` - - // initialize redis instance - // TODO: check that redis config has not changed since last invocation - if (!db) db = redis.createAsyncClient(params.$config.redis) - - // retrieve session state from redis - function getSessionState() { - return db.lindexAsync(sessionStateKey, -1).then(result => { - if (typeof result !== 'string') return notFound(`Cannot find live session ${session}`) - const obj = JSON.parse(result) - if (!isObject(obj)) return internalError(`State of live session ${session} is not a JSON object`) - return obj - }) - } - // retrieve session result from redis - function getSessionResult() { - return db.brpoplpushAsync(sessionResultKey, sessionResultKey, 30).then(result => { - if (typeof result !== 'string') return { $session: session } // timeout - const obj = JSON.parse(result) - if (!isObject(obj)) throw `Result of session ${session} is not a JSON object` - return obj - }) - } - - // resume suspended session - function resume() { - params = params.$result - return db.rpushxAsync(sessionTraceKey, process.env.__OW_ACTIVATION_ID) - .then(() => getSessionState()) // obtain live session state - .then(result => { - if (!isObject(result.$fsm)) return badRequest(`State of session ${session} is not well formed`) - params.$invoke = result.$fsm - params.$state = result.$state - params.$stack = result.$stack - params.$callee = result.$callee - }) - } - - // start new session - function start() { - return db.rpushAsync(sessionTraceKey, process.env.__OW_ACTIVATION_ID) - .then(() => db.expireAsync(sessionTraceKey, expiration)) - .then(() => db.zaddAsync(sessionsKey, process.env.__OW_DEADLINE * 2, session)) - .then(() => db.lpushAsync(sessionStateKey, JSON.stringify({}))) - .then(() => db.ltrimAsync(sessionStateKey, -1, -1)) - .then(() => db.expireAsync(sessionStateKey, expiration)) - } - - // persist session state to redis - function persist($fsm, $state, $stack, $callee) { - // ensure using set-if-exists that the session has not been killed - return db.lsetAsync(sessionStateKey, -1, JSON.stringify({ $fsm, $state, $stack, $callee })) - .catch(() => gone(`Session ${session} has been killed`)) - } - - // record session result to redis - function record(result) { - return db.lsetAsync(sessionStateKey, -1, JSON.stringify(result)) - .then(() => db.rpoplpushAsync(sessionStateKey, sessionResultKey)) - .then(() => db.delAsync(sessionStateKey)) - .then(() => db.expireAsync(sessionResultKey, expiration)) - .then(() => db.zincrbyAsync(sessionsKey, 1, session)) - .catch(() => gone(`Session ${session} has been killed`)) - } - - // retrieve session state if resuming or initialize session state if not, step, push error in step to db if any - return (resuming ? resume() : start()).then(() => Promise.resolve().then(step).catch(error => record(encodeError(error)).then(() => Promise.reject(error)))) - - // one step of execution - function step() { - const fsm = params.$invoke // the action composition to run - if (typeof fsm.Entry !== 'string') return badRequest('The composition has no Entry field of type string') - if (!isObject(fsm.States)) return badRequest('The composition has no States field of type object') - if (typeof fsm.Exit !== 'string') return badRequest('The composition has no Exit field of type string') - - let state = resuming ? params.$state : (params.$state || fsm.Entry) - const stack = params.$stack || [] - const callee = params.$callee - - // wrap params if not a JSON object, branch to error handler if error - function inspect() { - if (!isObject(params) || Array.isArray(params) || params === null) { - params = { value: params } - } - if (typeof params.error !== 'undefined') { - params = { error: params.error } // discard all fields but the error field - state = undefined // abort unless there is a handler in the stack - while (stack.length > 0) { - if (state = stack.shift().catch) break - } - } - } - - // handle error objects when resuming - if (resuming) inspect() - - // delete $ params - delete params.$config - delete params.$activationId - delete params.$invoke - delete params.$sessionId - delete params.$state - delete params.$stack - delete params.$callee - delete params.$blocking - - // run function f on current stack - function run(f) { - function set(symbol, value) { - const element = stack.find(element => typeof element.let !== 'undefined' && typeof element.let[symbol] !== 'undefined') - if (typeof element !== 'undefined') element.let[symbol] = JSON.parse(JSON.stringify(value)) - } - - const env = stack.reduceRight((acc, cur) => typeof cur.let === 'object' ? Object.assign(acc, cur.let) : acc, {}) - const result = __eval__(env, f)(params) - for (const name in env) { - set(name, env[name]) - } - return result - } - - while (true) { - // final state - if (!state) { - console.log(`Entering final state`) - console.log(JSON.stringify(params)) - return record(params).then(() => { - if (callee) { - return wsk.actions.invoke({ name: callee.name, params: { $sessionId: callee.session, $result: params } }) - .catch(error => badRequest(`Failed to return to callee: ${encodeError(error).error}`)) - } - }).then(() => blocking ? params : ({ $session: session })) - } - - console.log(`Entering ${state}`) - - if (!isObject(fsm.States[state])) return badRequest(`The composition has no state named ${state}`) - const json = fsm.States[state] // json for current state - if (json.Type !== 'Choice' && typeof json.Next !== 'string' && state !== fsm.Exit) return badRequest(`The state named ${state} has no Next field`) - const current = state // current state - state = json.Next // default next state - - switch (json.Type) { - case 'Choice': - if (typeof json.Then !== 'string') return badRequest(`The state named ${current} of type Choice has no Then field`) - if (typeof json.Else !== 'string') return badRequest(`The state named ${current} of type Choice has no Else field`) - state = params.value === true ? json.Then : json.Else - if (stack.length === 0) return badRequest(`The state named ${current} of type Choice attempted to pop from an empty stack`) - const top = stack.shift() - if (typeof top.params !== 'object') return badRequest(`The state named ${current} of type Choice popped an unexpected stack element`) - params = top.params - break - case 'Try': - if (typeof json.Handler !== 'string') return badRequest(`The state named ${current} of type Try has no Handler field`) - stack.unshift({ catch: json.Handler }) // register handler - break - case 'Catch': - if (stack.length === 0) return badRequest(`The state named ${current} of type Catch attempted to pop from an empty stack`) - if (typeof stack.shift().catch !== 'string') return badRequest(`The state named ${current} of type Catch popped an unexpected stack element`) - break - case 'Push': - stack.unshift({ params: JSON.parse(JSON.stringify(params)) }) - break - case 'Pop': - if (stack.length === 0) return badRequest(`The state named ${current} of type Pop attempted to pop from an empty stack`) - const tip = stack.shift() - if (typeof tip.params !== 'object') return badRequest(`The state named ${current} of type Pop popped an unexpected stack element`) - params = { result: params, params: tip.params } // combine current params with persisted params popped from stack - break - case 'Let': - stack.unshift({ let: {} }) - if (typeof json.Symbol !== 'string') return badRequest(`The state named ${current} of type Let has no Symbol field`) - if (typeof json.Value === 'undefined') return badRequest(`The state named ${current} of type Let has no Value field`) - stack[0].let[json.Symbol] = JSON.parse(JSON.stringify(json.Value)) - break - case 'End': - if (stack.length === 0) return badRequest(`The state named ${current} of type End attempted to pop from an empty stack`) - if (typeof stack.shift().let !== 'object') return badRequest(`The state named ${current} of type End popped an unexpected stack element`) - break - case 'Task': - if (typeof json.Action === 'string' && json.Action.substr(json.Action.length - 4) === '.app') { // invoke app - params.$callee = { name: process.env.__OW_ACTION_NAME, session } - return persist(fsm, state, stack, callee) - .then(() => wsk.actions.invoke({ name: json.Action, params }) - .catch(error => badRequest(`Failed to invoke app ${json.Action}: ${encodeError(error).error}`))) - .then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId)) - .then(() => blocking ? getSessionResult() : { $session: session }) - } else if (typeof json.Action === 'string') { // invoke user action - const invocation = notify ? { name: json.Action, params, blocking: true } : { name: json.Action, params, notify: process.env.__OW_ACTION_NAME, cause: session } - return persist(fsm, state, stack, callee) - .then(() => wsk.actions.invoke(invocation) - .catch(error => error.error && error.error.response ? error.error : badRequest(`Failed to invoke action ${json.Action}: ${encodeError(error).error}`))) // catch error reponses - .then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId) - .then(() => activation.response || !notify ? activation : new Promise(resolve => poll(activation.activationId, resolve)))) // poll if timeout - .then(activation => notify && wsk.actions.invoke({ name: process.env.__OW_ACTION_NAME, params: { $activationId: activation.activationId, $sessionId: session, $result: activation.response.result } })) - .then(() => blocking ? getSessionResult() : { $session: session }) - } else if (typeof json.Value !== 'undefined') { // value - params = JSON.parse(JSON.stringify(json.Value)) - inspect() - } else if (typeof json.Function === 'string') { // function - let result - try { - result = run(json.Function) - } catch (error) { - console.error(error) - result = { error: 'An error has occurred: ' + error } - } - params = typeof result === 'undefined' ? {} : JSON.parse(JSON.stringify(result)) - inspect() - } else { - return badRequest(`The kind field of the state named ${current} of type Task is missing`) - } - break - case 'Pass': - break - default: - return badRequest(`The state named ${current} has an unknown type`) - } - } - } - } -})() diff --git a/docs/COMPOSER.md b/docs/COMPOSER.md index e951f82..8bc86a9 100644 --- a/docs/COMPOSER.md +++ b/docs/COMPOSER.md @@ -1,185 +1,260 @@ -# JavaScript Composer API +# Composer -The [composer](../composer.js) module makes it possible to compose actions programmatically. The module is typically used as illustrated in [samples/demo.js](../samples/demo.js): +The [`composer`](../composer.js) Node.js module makes it possible define action [compositions](#compositions) using [combinators](#combinators). -```javascript -const composer = require('@ibm-functions/composer') - -// author action composition -const app = composer.if('authenticate', /* then */ 'welcome', /* else */ 'login') +## Installation -// compile action composition -composer.compile(app, 'demo.json') +To install the `composer` module use the Node Package Manager: +``` +npm -g install @ibm-functions/composer ``` +We recommend to install the module globally (with `-g` option) so the `compose` command is added to the path. Otherwise, it can be found in the `bin` folder of the module installation. -When using the programming shell `fsh` however, there is no need to instantiate the module or invoke compile explicitly. The `demo.js` file can be shortened to: +## Example +A composition is typically defined by means of a Javascript file as illustrated in [samples/demo.js](../samples/demo.js): ```javascript -composer.if('authenticate', /* then */ 'welcome', /* else */ 'login') +composer.if('authenticate', /* then */ 'success', /* else */ 'failure') ``` +This example composition composes three actions named `authenticate`, `success`, and `failure` using the `composer.if` combinator. -and used as follows: - -```bash -$ fsh app create demo demo.js +To deploy this composition use the `compose` command: ``` +compose demo.js --deploy demo +``` +This command creates an action named `demo` that implements the composition. -This example program composes actions `authenticate`, `welcome`, and `login` using the `composer.if` composition method. The `composer.compile` method produces a JSON object encoding the composition and optionally writes the object to a file. The compiled composition is shown in [samples/demo.json](../samples/demo.json). - -To deploy and run this composition and others, please refer to [README.md](README.md). - -## Compositions and Compiled Compositions - -All composition methods return a _composition_ object. Composition objects must be _compiled_ using the `composer.compile` method before export or invocation. Compiled compositions objects are JSON objects that obey the specification described in [FORMAT.md](FORMAT.md). They can be converted to and from strings using the `JSON.stringify` and `JSON.parse` methods. In contrast, the format of composition objects is not specified and may change in the future. +Assuming the composed actions are already deployed, this composition may be invoked like any action, for instance using the OpenWhisk CLI: +``` +wsk action invoke demo -r -p password passw0rd +``` +``` +{ + message: "Failure" +} +``` +An invocation of a composition creates a series of activation records: +``` +wsk action invoke demo -p password passw0rd +``` +``` +ok: invoked /_/demo with id 4f91f9ed0d874aaa91f9ed0d87baaa07 +``` +``` +wsk activation list +``` +``` +activations +fd89b99a90a1462a89b99a90a1d62a8e demo +eaec119273d94087ac119273d90087d0 failure +3624ad829d4044afa4ad829d40e4af60 demo +a1f58ade9b1e4c26b58ade9b1e4c2614 authenticate +3624ad829d4044afa4ad829d40e4af60 demo +4f91f9ed0d874aaa91f9ed0d87baaa07 demo +``` +The entry with the earliest start time (`4f91f9ed0d874aaa91f9ed0d87baaa07`) summarizes the invocation of the composition while other entries record later activations caused by the composition invocation. There is one entry for each invocation of a composed action (`a1f58ade9b1e4c26b58ade9b1e4c2614` and `eaec119273d94087ac119273d90087d0`). The remaining entries record the beginning and end of the composition as well as the transitions between the composed actions. -### composer.compile(_composition_[, _filename_]) +Compositions are implemented by means of OpenWhisk conductor actions. The documentation of [conductor actions](https://github.com/apache/incubator-openwhisk/blob/master/docs/conductors.md) discusses activation records in greater details. -`composer.compile(composition, filename)` compiles the composition object to its JSON representation and writes the JSON object to the file `filename` if specified. It returns the compiled composition object. +## Compositions -## Tasks +The `compose` command when not invoked with the `--deploy` option returns the composition encoded as a JSON dictionary: +``` +compose demo.js +``` +``` +{ + "composition": [ + { + "type": "if", + "test": [ + { + "type": "action", + "name": "/_/authenticate" + } + ], + "consequent": [ + { + "type": "action", + "name": "/_/success" + } + ], + "alternate": [ + { + "type": "action", + "name": "/_/failure" + } + ] + } + ] +} +``` +The JSON format is documented in [FORMAT.md](FORMAT.md). -Composition methods compose _tasks_. A task is one of the following: +A JSON-encoded composition may be deployed using the `compose` command: +``` +compose demo.js > demo.json +compose demo.json --deploy demo +``` -| Type | Description | Examples | -| --:| --- | --- | -| _function task_ | a JavaScript function expression | `params => params` or `function (params) { return params }` | -| _action task_ | an OpenWhisk action | `'/whisk.system/utils/echo'` | -| _composition_ | a composition | `composer.retry(3, 'connect')` | -| _compiled composition_ | a compiled composition | `composer.compile(composer.retry(3, 'connect'))` | +## Parameter Objects and Error Objects -Function expressions occurring in action compositions cannot capture any part of their declaration environment. They may however access and mutate variables in an environment consisting of the variables declared by the [composer.let](#composerletname-value-task_1-task_2-) composition method as discussed below. +A composition, like any action, accepts a JSON dictionary (the _input parameter object_) and produces a JSON dictionary (the _output parameter object_). An output parameter object with an `error` field is an _error object_. A composition _fails_ if it produces an error object. -Actions are specified by name. Fully qualified names and aliases are supported following the [usual conventions](https://github.com/apache/incubator-openwhisk/blob/master/docs/reference.md). +By convention, an error object returned by a composition is stripped from all fields except from the `error` field. This behavior is consistent with the OpenWhisk action semantics, e.g., the action with code `function main() { return { error: 'KO', message: 'OK' } }` outputs `{ error: 'KO' }`. -A function task applies the function to the [parameter object](#parameter-objects-and-error-objects). An action task the invokes the action with the given name on the parameter object. A composition task or compiled composition task applies the composition to the parameter object. +## Combinators -## Parameter Objects and Error Objects +The `composer` module offers a number of combinators to define compositions: -A task is a function that consumes a JSON dictionary (the _input parameter object_) and produces a JSON dictionary (the _output parameter object_). An output parameter object of a task with an `error` field is an _error object_. A task _fails_ if it produces an error object. +| Combinator | Description | Example | +| --:| --- | --- | +| [`action`](#composeractionname) | action | `composer.action('echo')` | +| [`function`](#composerfunctionfun) | function | `composer.function(({ x, y }) => ({ product: x * y }))` | +| [`literal` or `value`](#composerliteralvalue) | constant value | `composer.literal({ message: 'Hello, World!' })` | +| [`sequence` or `seq`](#composersequencecomposition_1-composition_2) | sequence | `composer.sequence('foo', 'bar')` | +| [`let`](#composerlet-name-value-composition_1-composition_2) | variable declarations | `composer.let({ count: 3, message: 'hello' }, ...)` | +| [`if`](#composerifcondition-consequent-alternate) | conditional | `composer.if('authenticate', 'success', 'failure')` | +| [`while`](#composerwhilecondition-composition) | loop | `composer.while('notEnough', 'doMore')` | +| [`dowhile`](#TODO) | loop at least once | `composer.dowhile('fetchData', 'needMoreData')` | +| [`repeat`](#composerrepeatcount-composition) | counted loop | `composer.repeat(3, 'hello')` | +| [`try`](#composertrycomposition-handler) | error handling | `composer.try('divideByN', 'NaN')` | +| [`finally`](#TODO) | finalization | `composer.finally('tryThis', 'doThatAlways')` | +| [`retry`](#composerretrycount-composition) | error recovery | `composer.retry(3, 'connect')` | +| [`retain`](#composerretaincomposition) | persistence | `composer.retain('validateInput')` | + +The `action`, `function`, and `literal` combinators and their synonymous construct compositions respectively from actions, functions, and constant values. The other combinators combine existing compositions to produce new compositions. + +Where a composition is expected, the following shorthands are permitted: + - `name` of type `string` stands for `composer.action(name)`, + - `fun` of type `function` stands for `composer.function(fun)`, + - `null` stands for the empty sequence `composer.sequence()`. + +### composer.action(_name_) + +`composer.action(name)` is a composition with a single action named _name_. It invokes the action named _name_ on the input parameter object for the composition and returns the output parameter object of this action invocation. + +The action _name_ may specify the namespace and/or package containing the action following the usual OpenWhisk grammar. If no namespace is specified, the default namespace is assumed. If no package is specified, the default package is assumed. + +Examples: +``` +composer.action('hello') +composer.action('myPackage/myAction') +composer.action('/whisk.system/utils/echo') +``` -Values returned by constant and function tasks are converted to JSON using `JSON.stringify` followed by `JSON.parse`. Values other than JSON dictionaries are replaced with a dictionary with a unique `value` field with the converted value. -For instance, the task `42` outputs the JSON dictionary `{ value: 42 }`. +### composer.function(_fun_) -By convention, an error object returned by a task is stripped from all fields except from the `error` field. For instance, the task `() => ({ error: 'KO', message: 'OK' })` outputs the JSON dictionary `{ error: 'KO' }`. This is to be consistent with the OpenWhisk action semantics, e.g., the action with code `function main() { return { error: 'KO', message: 'OK' } }` outputs `{ error: 'KO' }`. +`composer.function(fun)` is a composition with a single Javascript function _fun_. It applies the specified function to the input parameter object for the composition. -## Composition Methods -The following composition methods are currently supported: + - If the function returns a value of type `function`, the composition returns an error object `{ error }`. + - If the function throws an exception, the composition returns an error object `{ error }`. + - If the function returns a value of type other than function, the value is first converted to a JSON value using `JSON.stringify` followed by `JSON.parse`. If the resulting JSON value is not a JSON dictionary, the JSON value is then wrapped into a `{ value }` dictionary. The composition returns the final JSON dictionary. + - If the function does not return a value and does not throw an exception, the composition returns the input parameter object for the composition converted to a JSON dictionary using `JSON.stringify` followed by `JSON.parse`. -| Composition | Description | Example | -| --:| --- | --- | -| [`task`](#composertasktask-options) | single task | `composer.task('sayHi', { input: 'userInfo' })` | -| [`value`](#composervaluejson) | constant value | `composer.value({ message: 'Hello World!' })` | -| [`sequence`](#composersequencetask_1-task_2-) | sequence | `composer.sequence('getLocation', 'getWeatherForLocation')` | -| [`let`](#composerletname-value-task_1-task_2-) | variables | `composer.let('n', 42, ...)` | -| [`if`](#composerifcondition-consequent-alternate) | conditional | `composer.if('authenticate', /* then */ 'welcome', /* else */ 'login')` | -| [`while`](#composerwhilecondition-task) | loop | `composer.while('needMoreData', 'fetchMoreData')` | -| [`try`](#composertrytask-handler) | error handling | `composer.try('DivideByN', /* catch */ 'NaN')` | -| [`repeat`](#composerrepeatcount-task) | repetition | `composer.repeat(42, 'sayHi')` | -| [`retry`](#composerretrycount-task) | error recovery | `composer.retry(3, 'connect')` | -| [`retain`](#composerretaintask-flag) | parameter retention | `composer.retain('validateInput')` | +Examples: +``` +composer.function(params => ({ message: 'Hello ' + params.name })) +composer.function(function (params) { return { error: 'error' } }) -### composer.task(_task_[, _options_]) +function product({ x, y }) { return { product: x * y } } +composer.function(product) +``` -`composer.task(task, options)` is a composition with a single task _task_. The optional _options_ parameter may alter the task behavior as follows: +#### Environment capture - * If _options.merge_ evaluates to true the output parameter object for the composition is obtained by merging the output parameter object for the task into the input parameter object (unless the task produces an error object). +Functions intended for compositions cannot capture any part of their environment. They may however access and mutate variables in an environment consisting of the variables declared by the [composer.let](#composerletname-value-composition_1-composition_2-) combinator discussed below. - For instance, the composition `composer.task(42, { merge: true })` invoked on the input parameter object `{ value: 0, message: 'OK' }` outputs `{ value: 42, message: 'OK' }`. - - * Alternatively if _options.output_ is defined the output parameter object for the composition is obtained by assigning the output parameter object for the task to the _options.output_ field of the input parameter object for the composition. Additionally, if _options.input_ is defined the input parameter for the task is only the value of the field _options.input_ of the input parameter object for the composition as opposed to the full input parameter object. +The following is not legal: +``` +let name = 'Dave' +composer.function(params => ({ message: 'Hello ' + name })) +``` - For instance, the composition `composer.task(({n}) => ({ n: n+1 }), { input: 'in', output: 'out' })` invoked on the input parameter object `{ in: { n: 42 } }` outputs `{ in: { n: 42 }, out: { n: 43 } }`. - - If the value of the _options.input_ field is not a JSON dictionary is it replaced with a dictionary with a unique field `value` with the field's value. +The following is legal: +``` +composer.let({ name: 'Dave' }, composer.function(params => ({ message: 'Hello ' + name }))) +``` -### composer.value(_json_) +### composer.literal(_value_) -`composer.value(json)` outputs _json_ if it is a JSON dictionary. If _json_ is not a JSON dictionary is it replaced with a dictionary with a unique field `value` with value _json_. +`composer.literal(value)` outputs a constant JSON dictionary. This dictionary is obtained by first converting the _value_ argument to JSON using `JSON.stringify` followed by `JSON.parse`. If the resulting JSON value is not a JSON dictionary, the JSON value is then wrapped into a `{ value }` dictionary. -The _json_ value may be computed at composition time. For instance, the following composition captures the composition time: +The _value_ argument may be computed at composition time. For instance, the following composition captures the date of the composition: ```javascript -composer.value(Date()) +composer.literal(Date()) ``` -### composer.sequence(_task\_1_, _task\_2_, ...) +### composer.sequence(_composition\_1_, _composition\_2_...) -`composer.sequence(task_1, task_2, ...)` runs a sequence of tasks (possibly empty). +`composer.sequence(composition_1, composition_2, ...)` chains a series of compositions (possibly empty). -The input parameter object for the composition is the input parameter object of the first task in the sequence. The output parameter object of one task in the sequence is the input parameter object for the next task in the sequence. The output parameter object of the last task in the sequence is the output parameter object for the composition. +The input parameter object for the composition is the input parameter object of the first composition in the sequence. The output parameter object of one composition in the sequence is the input parameter object for the next composition in the sequence. The output parameter object of the last composition in the sequence is the output parameter object for the composition. -If one of the tasks fails, the remainder of the sequence is not executed. The output parameter object for the composition is the error object produced by the failed task. +If one of the compositions fails, the remainder of the sequence is not executed. The output parameter object for the composition is the error object produced by the failed composition. -An empty sequence behaves as a sequence with a single function task `params => params`. The output parameter object for the empty sequence is its input parameter object unless it is an error object, in which case, as usual, the error object only contains the `error` field of the input parameter object. +An empty sequence behaves as a sequence with a single function `params => params`. The output parameter object for the empty sequence is its input parameter object unless it is an error object, in which case, as usual, the error object only contains the `error` field of the input parameter object. -### composer.let(_name_, _value_, _task\_1_, _task\_2_, ...) +### composer.let({ _name_: _value_ }, _composition\_1_, _composition\_2_, ...) -`composer.let(name, value, task_1_, _task_2_, ...)` declares a new variable with name _name_ and initial value _value_ and runs a sequence of tasks in the scope of this definition. +`composer.let({ name: value }, composition_1_, _composition_2_, ...)` declares a new variable with name _name_ and initial value _value_ and runs a sequence of compositions in the scope of this definition. Variables declared with `composer.let` may be accessed and mutated by functions __running__ as part of the following sequence (irrespective of their place of definition). In other words, name resolution is [dynamic](https://en.wikipedia.org/wiki/Name_resolution_(programming_languages)#Static_versus_dynamic). If a variable declaration is nested inside a declaration of a variable with the same name, the innermost declaration masks the earlier declarations. -For example, the following composition invokes task `task` repeatedly `n` times. +For example, the following composition invokes composition `composition` repeatedly `n` times. ```javascript -composer.let('i', n, composer.while(() => i-- > 0, task)) +composer.let({ i: n }, composer.while(() => i-- > 0, composition)) ``` -Observe the first argument to the `let` composition is the quoted variable name `'i'`, whereas occurrences of the variable in the `let` body are not quoted. -We recommend expanding `let` compositions as follows to avoid confusing code editors: +Variables declared with `composer.let` are not visible to invoked actions. However, they may be passed as parameters to actions as for instance in: ```javascript -let i = 'i'; composer.let(i, n, composer.while(() => i-- > 0, task)) -``` - -Variables declared with `composer.let` are not visible to invoked actions. However, they may be passed as parameters to actions as for instance: - -```javascript -let n = 'n'; composer.let(n, 42, () => ({n}), '/whisk.system/utils/echo', (params) => { n = params.n }) +composer.let({ n: 42 }, () => ({ n }), '/whisk.system/utils/echo', params => { n = params.n }) ``` In this example, the variable `n` is exposed to the invoked action as a field of the input parameter object. Moreover, the value of the `n` field of the output parameter object is assigned back to variable `n`. ### composer.if(_condition_, _consequent_[, _alternate_]) -`composer.if(condition, consequent, alternate)` runs either the _consequent_ task if the _condition_ evaluates to true or the _alternate_ task if not. The _condition_ task and _consequent_ task or _alternate_ task are all invoked on the input parameter object for the composition. The output parameter object of the _condition_ task is discarded. +`composer.if(condition, consequent, alternate)` runs either the _consequent_ composition if the _condition_ evaluates to true or the _alternate_ composition if not. The _condition_ composition and _consequent_ composition or _alternate_ composition are all invoked on the input parameter object for the composition. The output parameter object of the _condition_ composition is discarded. -A _condition_ task evaluates to true if and only if it produces a JSON dictionary with a field `value` with the value `true` (i.e., JSON's `true` value). Other fields are ignored. Because JSON values other than dictionaries are implicitly lifted to dictionaries with a `value` field, _condition_ may be a Javascript function returning a Boolean value. +A _condition_ composition evaluates to true if and only if it produces a JSON dictionary with a field `value` with value `true`. Other fields are ignored. Because JSON values other than dictionaries are implicitly lifted to dictionaries with a `value` field, _condition_ may be a Javascript function returning a Boolean value. -An expression such as `params.n > 0` is not a valid condition (or in general a valid task). One should write instead: `(params) => params.n > 0`. +An expression such as `params.n > 0` is not a valid condition (or in general a valid composition). One should write instead `params => params.n > 0`. -The _alternate_ task may be omitted. +The _alternate_ composition may be omitted. If _condition_ fails, neither branch is executed. -For examples, the following composition divides parameter `n` by two if it is even. +For example, the following composition divides parameter `n` by two if `n` is even. ```javascript -composer.if(({n}) => n % 2 == 0, ({n}) => ({ n: n / 2 })) +composer.if(params => params.n % 2 == 0, params => { params.n /= 2 }) ``` -### composer.while(_condition_, _task_) - -`composer.while(condition, task)` runs _task_ repeatedly while _condition_ evaluates to true. The _condition_ task is evaluated before any execution of _task_. See [composer.if](#composerifcondition-consequent-alternate) for a discussion of conditions. +### composer.while(_condition_, _composition_) -A failure of _condition_ or _task_ interrupts the execution. +`composer.while(condition, composition)` runs _composition_ repeatedly while _condition_ evaluates to true. The _condition_ composition is evaluated before any execution of _composition_. See [composer.if](#composerifcondition-consequent-alternate) for a discussion of conditions. -### composer.try(_task_, _handler_) +A failure of _condition_ or _composition_ interrupts the execution. The composition returns the error object from the failed component. -`composer.try(task, handler)` runs _task_ with error handler _handler_. +### composer.repeat(_count_, _composition_) -A _task_ failure triggers the execution of _handler_ with the error object as its input parameter object. +`composer.repeat(count, composition)` runs _composition_ _count_ times. It is equivalent to a sequence with _count_ _composition_(s). -### composer.repeat(_count_, _task_) +### composer.try(_composition_, _handler_) -`composer.repeat(count, task)` runs _task_ _count_ times. It is equivalent to a sequence with _count_ _task_(s). +`composer.try(composition, handler)` runs _composition_ with error handler _handler_. -### composer.retry(_count_, _task_) +A _composition_ failure triggers the execution of _handler_ with the error object as its input parameter object. -`composer.retry(count, task)` runs _task_ and retries _task_ up to _count_ times if it fails. The output parameter object for the composition is either the output parameter object of the successful _task_ invocation or the error object produced by the last _task_ invocation. +### composer.retry(_count_, _composition_) -### composer.retain(_task_[, _flag_]) +`composer.retry(count, composition)` runs _composition_ and retries _composition_ up to _count_ times if it fails. The output parameter object for the composition is either the output parameter object of the successful _composition_ invocation or the error object produced by the last _composition_ invocation. -`composer.retain(task[, flag])` runs _task_ on the input parameter object producing an object with two fields `params` and `result` such that `params` is the input parameter object of the composition and `result` is the output parameter object of _task_. +### composer.retain(_composition_) -If _task_ fails and _flag_ is true, then the output parameter object for the composition is the combination of the input parameters with the error object. If _task_ fails and _flag_ is false or absent, then the output parameter object for the composition is only the error object. +`composer.retain(composition)` runs _composition_ on the input parameter object producing an object with two fields `params` and `result` such that `params` is the input parameter object of the composition and `result` is the output parameter object of _composition_. diff --git a/manager.js b/manager.js deleted file mode 100644 index d4627b1..0000000 --- a/manager.js +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2017 IBM Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict' - -const redis = require('./redis-promise') - -let apikey -let args -let expiration -const addLivePrefix = session => `${apikey}:session:live:${session}` -const addDonePrefix = session => `${apikey}:session:done:${session}` -const addListPrefix = session => `${apikey}:list:${session}` -const sessionsKey = () => `${apikey}:all` - -const isObject = obj => typeof obj === 'object' && obj !== null && !Array.isArray(obj) - -class Client { - constructor(config) { - apikey = config.key || config - expiration = config.expiration || 86400 * 7 - args = Array.prototype.slice.call(arguments, 1) - this.client = redis.createAsyncClient(...args) - } - - register(session) { - return this.client.lpushAsync(addLivePrefix(session), JSON.stringify({})) - .then(() => this.client.ltrimAsync(addLivePrefix(session), -1, -1)) - .then(() => this.client.expireAsync(addLivePrefix(session), expiration)) - .then(() => this.client.existsAsync(addDonePrefix(session))) - .then(n => n && this.client.delAsync(addLivePrefix(session))) - } - - // timeout in seconds, set block to true to block even if session does not exists - get(session, timeout, block) { - return this.client.lindexAsync(addDonePrefix(session), 0).then(result => { - if (typeof result === 'string' || typeof timeout === 'undefined') return result // got a result or not willing to wait - return this.client.existsAsync(addLivePrefix(session), addDonePrefix(session)).then(n => { - if (!block && n === 0) throw `Cannot find session ${session}` // not willing to wait for session to appear - }).then(() => { - let other = redis.createAsyncClient(...args) // use separate client for blocking read - return other.brpoplpushAsync(addDonePrefix(session), addDonePrefix(session), timeout).then(result => other.quitAsync().then(() => result)) - }) - }).then(result => { // parse result - if (typeof result !== 'string') throw `Cannot find result of session ${session}` - const obj = JSON.parse(result) - if (!isObject(obj)) throw `Result of session ${session} is not a JSON object` - return obj - }) - } - - kill(session) { - return this.client.delAsync(addLivePrefix(session)).then(count => { - if (count === 1) return this.client.delAsync(addListPrefix(session)).then(() => this.client.zremAsync(sessionsKey(), session)).then(() => `OK`) - throw `Cannot find live session ${session}` - }) - } - - purge(session) { - return this.client.delAsync(addLivePrefix(session), addDonePrefix(session), addListPrefix(session)).then(count => { - if (count !== 0) return this.client.zremAsync(sessionsKey(), session).then(() => `OK`) - throw `Cannot find session ${session}` - }) - } - - trace(session) { - return this.client.lrangeAsync(addListPrefix(session), 0, -1).then(trace => { - if (trace.length > 0) return { trace } - throw `Cannot find trace for session ${session}` - }) - } - - flush() { - return this.client.keysAsync(`${apikey}:*`).then(keys => keys.length > 0 ? this.client.delAsync(keys) : 0) - } - - last({ limit = 30, skip = 0 } = {}) { - limit = Math.max(1, Math.min(200, limit)) // default limit is 30, max limit is 200 - return this.client.zrevrangeAsync(sessionsKey(), 0, 0, 'WITHSCORES').then(result => - result.length ? this.client.zremrangebyscoreAsync(sessionsKey(), '-inf', parseInt(result[1]) - expiration * 2000) - .then(() => skip === 0 && limit === 1 ? result : this.client.zrevrangebyscoreAsync(sessionsKey(), 'inf', '-inf', 'WITHSCORES', 'LIMIT', skip, limit)) - .then(result => result.reduce(function (dst, session, index, src) { - if (index % 2 === 0) { - const time = parseInt(src[index + 1]) - dst.push({ session, time: (time - time % 2) / 2, live: !(time & 1) }) - } - return dst - }, [])) : []) - } - - list(options) { - return this.last(options).then(list => { - const live = [] - const done = [] - list.forEach(entry => (entry.live ? live : done).push(entry.session)) - return { live: live, done, next: 0 } - }) - } - - quit() { - return this.client.quitAsync() - } -} - -module.exports = function () { return new Client(...arguments) } diff --git a/package.json b/package.json index a3a8b3e..2418aaa 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,9 @@ "scripts": { "test": "mocha" }, + "bin": { + "compose": "./bin/compose" + }, "repository": { "type": "git", "url": "https://github.com/ibm-functions/composer.git" @@ -20,12 +23,12 @@ "openwhisk" ], "dependencies": { - "redis": "^2.8.0", - "clone": "^2.1.1" + "minimist": "^1.2.0", + "openwhisk": "^3.11.0", + "uglify-es": "^3.3.9" }, "devDependencies": { - "mocha": "^3.5.0", - "openwhisk": "git://github.com/starpit/openwhisk-client-js.git#add_client_timeout" + "mocha": "^3.5.0" }, "author": { "name": "Olivier Tardieu", diff --git a/redis-promise.js b/redis-promise.js deleted file mode 100644 index 4919bd3..0000000 --- a/redis-promise.js +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2017 IBM Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict' - -const redis = require('redis') - -redis.createAsyncClient = function () { - const client = redis.createClient(...arguments) - const noop = () => { } - let handler = noop - client.on('error', error => handler(error)) - require('redis-commands').list.forEach(f => client[`${f}Async`] = function () { - let failed = false - return new Promise((resolve, reject) => { - handler = error => { - handler = noop - failed = true - reject(error) - } - client[f](...arguments, (error, result) => { - handler = noop - return error ? reject(error) : resolve(result) - }) - }).catch(error => { - if (failed) client.end(true) - return Promise.reject(error) - }) - }) - return client -} - -module.exports = redis diff --git a/samples/demo.js b/samples/demo.js index 6edf5b7..82ef154 100644 --- a/samples/demo.js +++ b/samples/demo.js @@ -14,12 +14,4 @@ * limitations under the License. */ -'use strict' - -const composer = require('@ibm-functions/composer') - -// author action composition -const app = composer.if('authenticate', /* then */ 'welcome', /* else */ 'login') - -// compile action composition -composer.compile(app, 'demo.json') +composer.if('authenticate', /* then */ 'success', /* else */ 'failure') diff --git a/samples/demo.json b/samples/demo.json index 897be7e..6ea877e 100644 --- a/samples/demo.json +++ b/samples/demo.json @@ -1,33 +1,25 @@ { - "Entry": "push_0", - "States": { - "push_0": { - "Type": "Push", - "Next": "action_1" - }, - "action_1": { - "Type": "Task", - "Action": "authenticate", - "Next": "choice_0" - }, - "choice_0": { - "Type": "Choice", - "Then": "action_2", - "Else": "action_3" - }, - "action_2": { - "Type": "Task", - "Action": "welcome", - "Next": "pass_0" - }, - "action_3": { - "Type": "Task", - "Action": "login", - "Next": "pass_0" - }, - "pass_0": { - "Type": "Pass" + "composition": [ + { + "type": "if", + "test": [ + { + "type": "action", + "name": "/_/authenticate" + } + ], + "consequent": [ + { + "type": "action", + "name": "/_/success" + } + ], + "alternate": [ + { + "type": "action", + "name": "/_/failure" + } + ] } - }, - "Exit": "pass_0" -} \ No newline at end of file + ] +} diff --git a/test-harness.js b/test-harness.js deleted file mode 100644 index fb99e81..0000000 --- a/test-harness.js +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2017 IBM Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict' - -const fs = require('fs') -const path = require('path') -const os = require('os') -const openwhisk = require('openwhisk') -const manager = require('./manager') -let conductor - -class Client { - constructor(options = {}) { - let apihost = process.env.__OW_API_HOST - let api_key = process.env.__OW_API_KEY - let ignore_certs = true - let redis = process.env.REDIS - - try { - const wskpropsPath = process.env.WSK_CONFIG_FILE || path.join(os.homedir(), '.wskprops') - const lines = fs.readFileSync(wskpropsPath).toString('utf8').split('\n') - - for (let line of lines) { - let parts = line.trim().split('=') - if (parts.length === 2) { - if (parts[0] === 'APIHOST') { - apihost = parts[1] - } else if (parts[0] === 'AUTH') { - api_key = parts[1] - } else if (parts[0] === 'REDIS') { - redis = parts[1] - } - } - } - ignore_certs = apihost.indexOf('bluemix') == -1 - } catch (error) { } - - this.wsk = openwhisk(Object.assign({ apihost, api_key, ignore_certs }, options)) - - const action_body = this.wsk.actions.action_body - this.wsk.actions.action_body = (options) => { - const body = action_body(options) - if (options.limits) { - body.limits = options.limits - } - return body - } - - this.mgr = manager(api_key.substring(0, api_key.indexOf(':')), options.redis || redis) - - const name = 'conductor' - const action = require('fs').readFileSync(require.resolve('./conductor'), { encoding: 'utf8' }) - const params = { $config: { redis: redis, notify: true } } - conductor = { name, action, params, limits: { timeout: 300000 } } - } - - deploy() { - return this.wsk.actions.update(conductor) - } -} - -module.exports = function () { return new Client(...arguments) } diff --git a/test/test.js b/test/test.js index 76e867d..f779406 100644 --- a/test/test.js +++ b/test/test.js @@ -1,147 +1,260 @@ const assert = require('assert') const composer = require('../composer') -const harness = require('../test-harness')() -const wsk = harness.wsk -const mgr = harness.mgr -const run = params => wsk.actions.invoke({ name: 'conductor', params, blocking: true }) -const invoke = (task, params, $blocking = true) => run(Object.assign({ $invoke: composer.compile(task), $blocking }, params)) +const name = 'TestAction' +const wsk = composer.openwhisk({ ignore_certs: process.env.IGNORE_CERTS && process.env.IGNORE_CERTS !== 'false' && process.env.IGNORE_CERTS !== '0' }) -let activationId +// deploy action +const define = action => wsk.actions.delete(action.name).catch(() => { }).then(() => wsk.actions.create(action)) + +// deploy and invoke composition +const invoke = (task, params = {}, blocking = true) => wsk.compositions.deploy(task, name).then(() => wsk.actions.invoke({ name, params, blocking })) describe('composer', function () { this.timeout(60000) - before('deploy conductor and sample actions', function () { - return harness.deploy() - .then(() => wsk.actions.update({ name: 'DivideByTwo', action: 'function main({n}) { return { n: n / 2 } }' })) - .then(() => wsk.actions.update({ name: 'TripleAndIncrement', action: 'function main({n}) { return { n: n * 3 + 1 } }' })) - .then(() => wsk.actions.update({ name: 'isNotOne', action: 'function main({n}) { return { value: n != 1 } }' })) - .then(() => wsk.actions.update({ name: 'isEven', action: 'function main({n}) { return { value: n % 2 == 0 } }' })) + before('deploy test actions', function () { + return define({ name: 'echo', action: 'const main = x=>x' }) + .then(() => define({ name: 'DivideByTwo', action: 'function main({n}) { return { n: n / 2 } }' })) + .then(() => define({ name: 'TripleAndIncrement', action: 'function main({n}) { return { n: n * 3 + 1 } }' })) + .then(() => define({ name: 'isNotOne', action: 'function main({n}) { return { value: n != 1 } }' })) + .then(() => define({ name: 'isEven', action: 'function main({n}) { return { value: n % 2 == 0 } }' })) }) - it('flush', function () { - return mgr.flush() - }) + describe('blocking invocations', function () { + describe('actions', function () { + it('action must return true', function () { + return invoke(composer.action('isNotOne'), { n: 0 }).then(activation => assert.deepEqual(activation.response.result, { value: true })) + }) - it('history must be clean', function () { - return mgr.list().then(result => assert.ok(Array.isArray(result.live) && Array.isArray(result.done) && typeof result.next === 'number' - && result.live.length === 0 && result.done.length === 0 && result.next === 0)) - }) + it('action must return false', function () { + return invoke(composer.action('isNotOne'), { n: 1 }).then(activation => assert.deepEqual(activation.response.result, { value: false })) + }) - describe('first composition', function () { - it('identity task must return input object', function () { - return invoke(composer.task(), { foo: 'bar' }).then(activation => { - activationId = activation.activationId - return assert.deepEqual(activation.response.result, { foo: 'bar' }) + it('action name must parse to fully qualified', function () { + let combos = [ + { n: '', s: false, e: 'Name is not specified' }, + { n: ' ', s: false, e: 'Name is not specified' }, + { n: '/', s: false, e: 'Name is not valid' }, + { n: '//', s: false, e: 'Name is not valid' }, + { n: '/a', s: false, e: 'Name is not valid' }, + { n: '/a/b/c/d', s: false, e: 'Name is not valid' }, + { n: '/a/b/c/d/', s: false, e: 'Name is not valid' }, + { n: 'a/b/c/d', s: false, e: 'Name is not valid' }, + { n: '/a/ /b', s: false, e: 'Name is not valid' }, + { n: 'a', e: false, s: '/_/a' }, + { n: 'a/b', e: false, s: '/_/a/b' }, + { n: 'a/b/c', e: false, s: '/a/b/c' }, + { n: '/a/b', e: false, s: '/a/b' }, + { n: '/a/b/c', e: false, s: '/a/b/c' } + ] + combos.forEach(({ n, s, e }) => { + if (s) { + // good cases + assert.ok(composer.action(n).composition[0].name, s) + } else { + // error cases + try { + composer.action(n) + assert.fail() + } catch (error) { + assert.ok(error.message == e) + } + } + }) }) - }) - it('check history', function () { - return mgr.list().then(result => assert.ok(result.live.length === 0 && result.done.length === 1 && result.done[0] === activationId && result.next === 0)) - }) + it('invalid options', function () { + try { + invoke(composer.function('foo', 'bar')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) - it('check trace', function () { - return mgr.trace(activationId).then(result => assert.ok(result.trace.length === 1 && result.trace[0] === activationId)) - }) - }) + it('invalid argument', function () { + try { + invoke(composer.function(42)) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } + }) - describe('invalid conductor invocations', function () { - it('missing both $sessionId and $invoke must fail with 400', function () { - return run({}).then(() => assert.fail(), activation => assert.equal(activation.error.response.result.error.code, 400)) + it('too many arguments', function () { + try { + invoke(composer.function('foo', {}, 'bar')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) }) - }) - describe('nonexistent session', function () { - it('resume nonexistent session must fail with 404 (and not record session result)', function () { - return run({ $sessionId: 'foo', $invoke: composer.task(), params: {} }).then(() => assert.fail(), activation => assert.equal(activation.error.response.result.error.code, 404)) - }) + describe('literals', function () { + it('true', function () { + return invoke(composer.literal(true)).then(activation => assert.deepEqual(activation.response.result, { value: true })) + }) - it('get nonexistent session must throw', function () { - return mgr.get('foo').then(() => assert.fail(), result => assert.equal(result, 'Cannot find result of session foo')) - }) + it('42', function () { + return invoke(composer.literal(42)).then(activation => assert.deepEqual(activation.response.result, { value: 42 })) + }) + + it('invalid options', function () { + try { + invoke(composer.literal('foo', 'bar')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) - it('kill nonexistent session must throw', function () { - return mgr.kill('foo').then(() => assert.fail(), result => assert.equal(result, 'Cannot find live session foo')) + it('invalid argument', function () { + try { + invoke(composer.literal(invoke)) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.literal('foo', {}, 'bar')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) }) - it('purge nonexistent session must throw', function () { - return mgr.purge('foo').then(() => assert.fail(), result => assert.equal(result, 'Cannot find session foo')) + describe('functions', function () { + it('function must return true', function () { + return invoke(composer.function(({ n }) => n % 2 === 0), { n: 4 }).then(activation => assert.deepEqual(activation.response.result, { value: true })) + }) + + it('function must return false', function () { + return invoke(composer.function(function ({ n }) { return n % 2 === 0 }), { n: 3 }).then(activation => assert.deepEqual(activation.response.result, { value: false })) + }) + + it('function must fail', function () { + return invoke(composer.function(() => n)).then(() => assert.fail(), activation => assert.ok(activation.error.response.result.error.startsWith('An exception was caught'))) + }) + + it('function must throw', function () { + return invoke(composer.function(() => ({ error: 'foo', n: 42 }))).then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result, { error: 'foo' })) + }) + + it('function must mutate params', function () { + return invoke(composer.function(params => { params.foo = 'foo' }), { bar: 42 }).then(activation => assert.deepEqual(activation.response.result, { foo: 'foo', bar: 42 })) + }) + + it('function as string', function () { + return invoke(composer.function('({ n }) => n % 2 === 0'), { n: 4 }).then(activation => assert.deepEqual(activation.response.result, { value: true })) + }) + + it('invalid options', function () { + try { + invoke(composer.function(() => n, 'foo')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('invalid argument', function () { + try { + invoke(composer.function(42)) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.function(() => n, {}, () => { })) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) }) - it('trace nonexistent session must throw', function () { - return mgr.trace('foo').then(() => assert.fail(), result => assert.equal(result, 'Cannot find trace for session foo')) + describe('deserialize', function () { + it('should deserialize a serialized composition', function () { + const json = { + "composition": [{ + "type": "action", + "name": "echo" + }, { + "type": "action", + "name": "echo" + }] + } + return invoke(composer.deserialize(json), { message: 'hi' }).then(activation => assert.deepEqual(activation.response.result, { message: 'hi' })) + }) }) - }) - describe('blocking invocations', function () { describe('tasks', function () { - describe('actions', function () { + describe('action tasks', function () { it('action must return true', function () { return invoke(composer.task('isNotOne'), { n: 0 }).then(activation => assert.deepEqual(activation.response.result, { value: true })) }) - - it('action must return false', function () { - return invoke(composer.task('isNotOne'), { n: 1 }).then(activation => assert.deepEqual(activation.response.result, { value: false })) - }) }) - describe('functions', function () { + describe('function tasks', function () { it('function must return true', function () { return invoke(composer.task(({ n }) => n % 2 === 0), { n: 4 }).then(activation => assert.deepEqual(activation.response.result, { value: true })) }) - - it('function must return false', function () { - return invoke(composer.task(function ({ n }) { return n % 2 === 0 }), { n: 3 }).then(activation => assert.deepEqual(activation.response.result, { value: false })) - }) - - it('function must fail', function () { - return invoke(composer.task(() => n)).then(() => assert.fail(), activation => assert.equal(activation.error.response.result.error, 'An error has occurred: ReferenceError: n is not defined')) - }) }) - describe('values', function () { - it('true', function () { - return invoke(composer.value(true)).then(activation => assert.deepEqual(activation.response.result, { value: true })) + describe('null task', function () { + it('null task must return input', function () { + return invoke(composer.task(), { foo: 'bar' }).then(activation => assert.deepEqual(activation.response.result, { foo: 'bar' })) }) - it('42', function () { - return invoke(composer.value(42)).then(activation => assert.deepEqual(activation.response.result, { value: 42 })) + it('null task must fail on error input', function () { + return invoke(composer.task(), { error: 'bar' }).then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result, { error: 'bar' })) }) }) - describe('invalid', function () { - it('false must throw', function () { + describe('invalid tasks', function () { + it('a Boolean is not a valid task', function () { try { invoke(composer.task(false)) assert.fail() } catch (error) { - assert.equal(error, 'Error: Invalid composition argument') + assert.ok(error.message.startsWith('Invalid argument')) } }) - it('42 must throw', function () { + it('a number is not a valid task', function () { try { invoke(composer.task(42)) assert.fail() } catch (error) { - assert.equal(error, 'Error: Invalid composition argument') + assert.ok(error.message.startsWith('Invalid argument')) } }) - it('{ foo: \'bar\' } must throw', function () { + it('a dictionary is not a valid task', function () { try { invoke(composer.task({ foo: 'bar' })) assert.fail() } catch (error) { - assert.equal(error, 'Error: Invalid composition argument') + assert.ok(error.message.startsWith('Invalid argument')) } }) }) - describe('pass', function () { - it('pass must return input object', function () { - return invoke(composer.task(), { foo: 'bar' }).then(activation => assert.deepEqual(activation.response.result, { foo: 'bar' })) - }) + it('too many arguments', function () { + try { + invoke(composer.task('foo', 'bar')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } }) }) @@ -169,87 +282,336 @@ describe('composer', function () { }) describe('if', function () { - it('then branch', function () { + it('condition = true', function () { return invoke(composer.if('isEven', 'DivideByTwo', 'TripleAndIncrement'), { n: 4 }) .then(activation => assert.deepEqual(activation.response.result, { n: 2 })) }) - it('else branch', function () { + it('condition = false', function () { return invoke(composer.if('isEven', 'DivideByTwo', 'TripleAndIncrement'), { n: 3 }) .then(activation => assert.deepEqual(activation.response.result, { n: 10 })) }) - }) + it('condition = true, then branch only', function () { + return invoke(composer.if('isEven', 'DivideByTwo'), { n: 4 }) + .then(activation => assert.deepEqual(activation.response.result, { n: 2 })) + }) + + it('condition = false, then branch only', function () { + return invoke(composer.if('isEven', 'DivideByTwo'), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { n: 3 })) + }) + + it('condition = true, nosave option', function () { + return invoke(composer.if('isEven', params => { params.then = true }, params => { params.else = true }, { nosave: true }), { n: 2 }) + .then(activation => assert.deepEqual(activation.response.result, { value: true, then: true })) + }) + + it('condition = false, nosave option', function () { + return invoke(composer.if('isEven', params => { params.then = true }, params => { params.else = true }, { nosave: true }), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { value: false, else: true })) + }) + + it('invalid options', function () { + try { + invoke(composer.if('isEven', 'DivideByTwo', 'TripleAndIncrement', 'TripleAndIncrement')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.if('isEven', 'DivideByTwo', 'TripleAndIncrement', {}, 'TripleAndIncrement')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) + }) describe('while', function () { - it('test 1', function () { + it('a few iterations', function () { return invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 })), { n: 4 }) .then(activation => assert.deepEqual(activation.response.result, { n: 1 })) }) - it('test 2', function () { + it('no iteration', function () { return invoke(composer.while(() => false, ({ n }) => ({ n: n - 1 })), { n: 1 }) .then(activation => assert.deepEqual(activation.response.result, { n: 1 })) }) - }) - describe('retain', function () { - it('test 1', function () { - return invoke(composer.retain('TripleAndIncrement'), { n: 3 }) - .then(activation => assert.deepEqual(activation.response.result, { params: { n: 3 }, result: { n: 10 } })) + it('nosave option', function () { + return invoke(composer.while(({ n }) => ({ n, value: n !== 1 }), ({ n }) => ({ n: n - 1 }), { nosave: true }), { n: 4 }) + .then(activation => assert.deepEqual(activation.response.result, { value: false, n: 1 })) + }) + + it('invalid options', function () { + try { + invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 }), ({ n }) => ({ n: n - 1 })), { n: 4 }) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 }), {}, ({ n }) => ({ n: n - 1 })), { n: 4 }) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } }) }) - describe('repeat', function () { - it('test 1', function () { - return invoke(composer.repeat(3, 'DivideByTwo'), { n: 8 }) + describe('dowhile', function () { + it('a few iterations', function () { + return invoke(composer.dowhile(({ n }) => ({ n: n - 1 }), 'isNotOne'), { n: 4 }) .then(activation => assert.deepEqual(activation.response.result, { n: 1 })) }) + + it('one iteration', function () { + return invoke(composer.dowhile(({ n }) => ({ n: n - 1 }), () => false), { n: 1 }) + .then(activation => assert.deepEqual(activation.response.result, { n: 0 })) + }) + + it('nosave option', function () { + return invoke(composer.dowhile(({ n }) => ({ n: n - 1 }), ({ n }) => ({ n, value: n !== 1 }), { nosave: true }), { n: 4 }) + .then(activation => assert.deepEqual(activation.response.result, { value: false, n: 1 })) + }) + + it('invalid options', function () { + try { + invoke(composer.dowhile(({ n }) => ({ n: n - 1 }), 'isNotOne', ({ n }) => ({ n: n - 1 })), { n: 4 }) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.dowhile(({ n }) => ({ n: n - 1 }), 'isNotOne', {}, ({ n }) => ({ n: n - 1 })), { n: 4 }) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) }) describe('try', function () { - it('test 1', function () { + it('no error', function () { return invoke(composer.try(() => true, error => ({ message: error.error }))) .then(activation => assert.deepEqual(activation.response.result, { value: true })) }) - it('test 2', function () { + it('error', function () { return invoke(composer.try(() => ({ error: 'foo' }), error => ({ message: error.error }))) .then(activation => assert.deepEqual(activation.response.result, { message: 'foo' })) }) + + it('try must throw', function () { + return invoke(composer.try(composer.try(), error => ({ message: error.error })), { error: 'foo' }) + .then(activation => assert.deepEqual(activation.response.result, { message: 'foo' })) + }) + + it('while must throw', function () { + return invoke(composer.try(composer.while(composer.literal(false)), error => ({ message: error.error })), { error: 'foo' }) + .then(activation => assert.deepEqual(activation.response.result, { message: 'foo' })) + }) + + it('if must throw', function () { + return invoke(composer.try(composer.if(composer.literal(false)), error => ({ message: error.error })), { error: 'foo' }) + .then(activation => assert.deepEqual(activation.response.result, { message: 'foo' })) + }) + + it('invalid options', function () { + try { + invoke(composer.try('isNotOne', 'isNotOne', 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.try('isNotOne', 'isNotOne', {}, 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) + }) + + describe('finally', function () { + it('no error', function () { + return invoke(composer.finally(() => true, params => ({ params }))) + .then(activation => assert.deepEqual(activation.response.result, { params: { value: true } })) + }) + + it('error', function () { + return invoke(composer.finally(() => ({ error: 'foo' }), params => ({ params }))) + .then(activation => assert.deepEqual(activation.response.result, { params: { error: 'foo' } })) + }) + + it('invalid options', function () { + try { + invoke(composer.finally('isNotOne', 'isNotOne', 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.finally('isNotOne', 'isNotOne', {}, 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) }) describe('let', function () { it('one variable', function () { - return invoke(composer.let('x', 42, () => x)) + return invoke(composer.let({ x: 42 }, () => x)) .then(activation => assert.deepEqual(activation.response.result, { value: 42 })) }) it('masking', function () { - return invoke(composer.let('x', 42, composer.let('x', 69, () => x))) + return invoke(composer.let({ x: 42 }, composer.let({ x: 69 }, () => x))) .then(activation => assert.deepEqual(activation.response.result, { value: 69 })) }) it('two variables', function () { - return invoke(composer.let('x', 42, composer.let('y', 69, () => x + y))) + return invoke(composer.let({ x: 42 }, composer.let({ y: 69 }, () => x + y))) + .then(activation => assert.deepEqual(activation.response.result, { value: 111 })) + }) + + it('two variables combined', function () { + return invoke(composer.let({ x: 42, y: 69 }, () => x + y)) .then(activation => assert.deepEqual(activation.response.result, { value: 111 })) }) it('scoping', function () { - return invoke(composer.let('x', 42, composer.let('x', 69, () => x), ({ value }) => value + x)) + return invoke(composer.let({ x: 42 }, composer.let({ x: 69 }, () => x), ({ value }) => value + x)) .then(activation => assert.deepEqual(activation.response.result, { value: 111 })) }) + + it('invalid argument', function () { + try { + invoke(composer.let(invoke)) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } + }) + }) + + describe('retain', function () { + it('base case', function () { + return invoke(composer.retain('TripleAndIncrement'), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { params: { n: 3 }, result: { n: 10 } })) + }) + + it('throw error', function () { + return invoke(composer.retain(() => ({ error: 'foo' })), { n: 3 }) + .then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result, { error: 'foo' })) + }) + + it('catch error', function () { + return invoke(composer.retain(() => ({ error: 'foo' }), { catch: true }), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { params: { n: 3 }, result: { error: 'foo' } })) + }) + + it('select field', function () { + return invoke(composer.retain('TripleAndIncrement', { field: 'p' }), { n: 3, p: 4 }) + .then(activation => assert.deepEqual(activation.response.result, { params: 4, result: { n: 10 } })) + }) + + it('select field, throw error', function () { + return invoke(composer.retain(() => ({ error: 'foo' }), { field: 'p' }), { n: 3, p: 4 }) + .then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result, { error: 'foo' })) + }) + + it('select field, catch error', function () { + return invoke(composer.retain(() => ({ error: 'foo' }), { field: 'p', catch: true }), { n: 3, p: 4 }) + .then(activation => assert.deepEqual(activation.response.result, { params: 4, result: { error: 'foo' } })) + }) + + it('filter function', function () { + return invoke(composer.retain('TripleAndIncrement', { filter: ({ n }) => ({ n: -n }) }), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { params: { n: -3 }, result: { n: 10 } })) + }) + + it('filter function, throw error', function () { + return invoke(composer.retain(() => ({ error: 'foo' }), { filter: ({ n }) => ({ n: -n }) }), { n: 3 }) + .then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result, { error: 'foo' })) + }) + + it('filter function, catch error', function () { + return invoke(composer.retain(() => ({ error: 'foo' }), { filter: ({ n }) => ({ n: -n }), catch: true }), { n: 3 }) + .then(activation => assert.deepEqual(activation.response.result, { params: { n: - 3 }, result: { error: 'foo' } })) + }) + + it('invalid options', function () { + try { + invoke(composer.retain('isNotOne', 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid options')) + } + }) + + it('too many arguments', function () { + try { + invoke(composer.retain('isNotOne', {}, 'isNotOne')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Too many arguments')) + } + }) + }) + + describe('repeat', function () { + it('a few iterations', function () { + return invoke(composer.repeat(3, 'DivideByTwo'), { n: 8 }) + .then(activation => assert.deepEqual(activation.response.result, { n: 1 })) + }) + + it('invalid argument', function () { + try { + invoke(composer.repeat('foo')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } + }) }) describe('retry', function () { - it('test 1', function () { - return invoke(composer.let('x', 2, composer.retry(2, () => x-- > 0 ? { error: 'foo' } : 42))) + it('success', function () { + return invoke(composer.let({ x: 2 }, composer.retry(2, () => x-- > 0 ? { error: 'foo' } : 42))) .then(activation => assert.deepEqual(activation.response.result, { value: 42 })) }) - it('test 2', function () { - return invoke(composer.let('x', 2, composer.retry(1, () => x-- > 0 ? { error: 'foo' } : 42))) + it('failure', function () { + return invoke(composer.let({ x: 2 }, composer.retry(1, () => x-- > 0 ? { error: 'foo' } : 42))) .then(() => assert.fail(), activation => assert.deepEqual(activation.error.response.result.error, 'foo')) + + }) + + it('invalid argument', function () { + try { + invoke(composer.retry('foo')) + assert.fail() + } catch (error) { + assert.ok(error.message.startsWith('Invalid argument')) + } }) }) }) @@ -263,55 +625,4 @@ describe('composer', function () { }) }) }) - - describe('non-blocking invocations', function () { - it('simple app must return session id', function () { - return invoke(composer.task(() => 42), {}, false).then(activation => assert.ok(activation.response.result.$session)) - }) - - it('complex app must return session id', function () { - return invoke(composer.task('DivideByTwo'), {}, false).then(activation => assert.ok(activation.response.result.$session)) - }) - - it('get after execution must succeed', function () { - return invoke(composer.task('DivideByTwo'), { n: 42 }, false) - .then(activation => new Promise(resolve => setTimeout(() => resolve(activation), 3000))) - .then(activation => mgr.get(activation.response.result.$session)) - .then(result => assert.deepEqual(result, { n: 21 })) - }) - - it('get during execution must fail', function () { - let session - return invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 })), { n: 10 }, false) - .then(activation => mgr.get(session = activation.response.result.$session)) - .then(() => assert.fail(), result => assert.equal(result, `Cannot find result of session ${session}`)) - }) - - it('kill after execution must fail', function () { - let session - return invoke(composer.task('DivideByTwo'), { n: 42 }, false) - .then(activation => new Promise(resolve => setTimeout(() => resolve(activation), 3000))) - .then(activation => mgr.kill(session = activation.response.result.$session)) - .then(() => assert.fail(), result => assert.equal(result, `Cannot find live session ${session}`)) - }) - - it('kill during execution must succeed', function () { - return invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 })), { n: 10 }, false) - .then(activation => mgr.kill(activation.response.result.$session)) - .then(result => assert.deepEqual(result, 'OK')) - }) - - it('purge after execution must succeed', function () { - return invoke(composer.task('DivideByTwo'), { n: 42 }, false) - .then(activation => new Promise(resolve => setTimeout(() => resolve(activation), 3000))) - .then(activation => mgr.purge(activation.response.result.$session)) - .then(result => assert.deepEqual(result, 'OK')) - }) - - it('purge during execution must succeed', function () { - return invoke(composer.while('isNotOne', ({ n }) => ({ n: n - 1 })), { n: 10 }, false) - .then(activation => mgr.purge(activation.response.result.$session)) - .then(result => assert.deepEqual(result, 'OK')) - }) - }) })