From 441d92e88bdf1203f1f7d1e7a4ab186184f9dedc Mon Sep 17 00:00:00 2001 From: Olivier Tardieu Date: Tue, 31 Oct 2017 20:50:59 -0400 Subject: [PATCH] Support new openwhisk notification for action completion --- conductor.js | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/conductor.js b/conductor.js index b9e1e1b..bd341f3 100644 --- a/conductor.js +++ b/conductor.js @@ -117,7 +117,10 @@ const main = (() => { const session = params.$sessionId || process.env.__OW_ACTIVATION_ID // initialize openwhisk instance - if (!wsk) wsk = openwhisk({ ignore_certs: true }) + if (!wsk) { + wsk = openwhisk({ ignore_certs: true }) + if (!notify) wsk.actions.qs_options.invoke = ['blocking', 'notify', 'cause'] + } // redis keys const apiKey = process.env.__OW_API_KEY.substring(0, process.env.__OW_API_KEY.indexOf(':')) @@ -298,14 +301,14 @@ const main = (() => { break case 'Task': if (typeof json.Action === 'string') { // invoke user action + const invocation = notify ? { name: json.Action, params, blocking: true } : { name: json.Action, params, notify: process.env.__OW_ACTION_NAME, cause: session } return persist(fsm, state, stack) - .then(() => wsk.actions.invoke({ name: json.Action, params, blocking: notify }) - .catch(error => error.error && error.error.response ? error.error : badRequest(`Failed to invoke action ${json.Action}: ${encodeError(error).error}`)) // catch error reponses - .then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId) - .then(() => activation.response ? activation : new Promise(resolve => poll(activation.activationId, resolve))) // poll if timeout - .then(activation => { - if (notify) return wsk.actions.invoke({ name: process.env.__OW_ACTION_NAME, params: { $activationId: activation.activationId, $sessionId: session, $result: activation.response.result } }) - }).then(() => blocking ? getSessionResult() : { $session: session }))) + .then(() => wsk.actions.invoke(invocation) + .catch(error => error.error && error.error.response ? error.error : badRequest(`Failed to invoke action ${json.Action}: ${encodeError(error).error}`))) // catch error reponses + .then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId) + .then(() => activation.response || !notify ? activation : new Promise(resolve => poll(activation.activationId, resolve)))) // poll if timeout + .then(activation => notify && wsk.actions.invoke({ name: process.env.__OW_ACTION_NAME, params: { $activationId: activation.activationId, $sessionId: session, $result: activation.response.result } })) + .then(() => blocking ? getSessionResult() : { $session: session }) } else if (typeof json.Value !== 'undefined') { // value params = JSON.parse(JSON.stringify(json.Value)) inspect()