From 1c006ea24a054ccde7abecba11b377c7de984d56 Mon Sep 17 00:00:00 2001 From: Krzysztof Janecki Date: Wed, 25 May 2022 23:07:37 +0200 Subject: [PATCH 1/4] Add functionality of AMQP job executor --- functions/kubernetes/amqpCommand.js | 194 +++++++++++++++++++++ functions/kubernetes/amqpConnector.js | 44 +++++ functions/kubernetes/jobSynchronization.js | 46 +++++ functions/kubernetes/k8sCommand.js | 21 ++- functions/kubernetes/k8sJobSubmit.js | 41 +---- 5 files changed, 306 insertions(+), 40 deletions(-) create mode 100644 functions/kubernetes/amqpCommand.js create mode 100644 functions/kubernetes/amqpConnector.js create mode 100644 functions/kubernetes/jobSynchronization.js diff --git a/functions/kubernetes/amqpCommand.js b/functions/kubernetes/amqpCommand.js new file mode 100644 index 0000000..bdfbf4e --- /dev/null +++ b/functions/kubernetes/amqpCommand.js @@ -0,0 +1,194 @@ +// Runs a job as a Pod (Kubernetes Job) in a Kubernetes cluster + +const k8s = require('@kubernetes/client-node'); +var BufferManager = require('./buffer_manager.js').BufferManager; +var RestartCounter = require('./restart_counter.js').RestartCounter; +var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob; +var fs = require('fs'); + +let bufferManager = new BufferManager(); + +let backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; +let restartCounter = new RestartCounter(backoffLimit); + +// Function k8sCommandGroup +// +// Inputs: +// - bufferItems - array containing objects with following properties: +// * ins +// * outs +// * context +// * cb +async function k8sCommandGroup(bufferItems) { + + // No action needed when buffer is empty + if (bufferItems.length == 0) { + return; + } + + let startTime = Date.now(); + console.log("k8sCommandGroup started, time:", startTime); + + // Function for rebuffering items + let restartFn = (bufferIndex) => { + let bufferItem = bufferItems[bufferIndex]; + let taskId = bufferItem.context.taskId; + try { + var partition = bufferItem.context.executor.partition; // in case 'executor' doesn't exist + } catch(error) { } + if (restartCounter.isRestartPossible(taskId)) { + let restartVal = restartCounter.increase(taskId); + console.log("Readding task", taskId, "to buffer (restartCount:", restartVal + ") ..."); + let itemName = bufferItem.context.name; + bufferManager.addItem(itemName, bufferItem, partition); + } + return; + } + + // Extract particular arrays from buffer items + let jobArr = []; + let taskIdArr = []; + let contextArr = []; + let cbArr = []; + for (let i=0; i k8sCommandGroup(items)); + +async function k8sCommand(ins, outs, context, cb) { + /** Buffer Manager configuration. */ + buffersConf = context.appConfig.jobAgglomerations; + let alreadyConfigured = bufferManager.isConfigured(); + if (alreadyConfigured == false && buffersConf != undefined) { + bufferManager.configure(buffersConf); + } + + /** Buffer item. */ + let item = { + "ins": ins, + "outs": outs, + "context": context, + "cb": cb + }; + + try { + var partition = context.executor.partition; // in case 'executor' doesn't exist + } catch(error) { } + bufferManager.addItem(context.name, item, partition); + + return; +} + +exports.k8sCommand = k8sCommand; diff --git a/functions/kubernetes/amqpConnector.js b/functions/kubernetes/amqpConnector.js new file mode 100644 index 0000000..61a4824 --- /dev/null +++ b/functions/kubernetes/amqpConnector.js @@ -0,0 +1,44 @@ +const amqplib = require('amqplib'), + createJobMessage = require('../../common/jobMessage').createJobMessage; +let channels = {}; +let conn = null; + +async function initialize(queue_name) { + + if (conn === null) { + conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60"); + } + let ch = await conn.createChannel() + await ch.assertQueue(queue_name, {durable: false, expires: 600000}); // TODO: implement dynamic queue creation & cleanup + channels[queue_name] = ch + +} + +async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) { + let context = contextArr[0]; + let namespace = process.env.HF_VAR_NAMESPACE || 'default' + let queue_name = namespace + "." + context['name'] + if (conn === null || !(queue_name in channels)) { + await initialize(queue_name) + } + let ch = channels[queue_name] + try { + + console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`) + let tasks = []; + + for (let i = 0; i < jobArr.length; i++) { + let job = jobArr[i]; + let taskId = taskIdArr[i]; + let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId); + await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove + tasks.push({"id": taskId, "message": jobMessage}); + } + + await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks}))); + } catch (error) { + console.log(error) + } +} + +exports.enqueueJobs = enqueueJobs \ No newline at end of file diff --git a/functions/kubernetes/jobSynchronization.js b/functions/kubernetes/jobSynchronization.js new file mode 100644 index 0000000..fac139f --- /dev/null +++ b/functions/kubernetes/jobSynchronization.js @@ -0,0 +1,46 @@ + +async function synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn) { + + let context = contextArr[0]; + // 'awaitJob' -- wait for the job to finish, possibly restarting it + // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined + var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; + var restartPolicy = backoffLimit > 0 ? "OnFailure" : "Never"; + var restartCount = 0; + var awaitJob = async (taskId) => { + try { + var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite + } catch (err) { + console.error(err); + throw err; + } + let taskEnd = new Date().toISOString(); + console.log('Job ended with result:', jobResult, 'time:', taskEnd); + // job exit code + return parseInt(jobResult[1]); + } + + var awaitJobs = async (taskIdArr) => { + let awaitPromises = [] + for (var i = 0; i < taskIdArr.length; i++) { + awaitPromises.push(awaitJob(taskIdArr[i])); + } + return Promise.all(awaitPromises); + } + + let jobExitCodes = await awaitJobs(taskIdArr); + for (let i = 0; i < jobExitCodes.length; i++) { + let jobExitCode = jobExitCodes[i]; + let taskId = taskIdArr[i]; + if (jobExitCode !== 0) { + console.log("Job", taskId, "failed"); + restartFn(i); + // NOTE: job message is preserved, so we don't have to send it again. + } + } + + return jobExitCodes; + +} + +exports.synchronizeJobs = synchronizeJobs \ No newline at end of file diff --git a/functions/kubernetes/k8sCommand.js b/functions/kubernetes/k8sCommand.js index bdfbf4e..296c646 100644 --- a/functions/kubernetes/k8sCommand.js +++ b/functions/kubernetes/k8sCommand.js @@ -4,6 +4,8 @@ const k8s = require('@kubernetes/client-node'); var BufferManager = require('./buffer_manager.js').BufferManager; var RestartCounter = require('./restart_counter.js').RestartCounter; var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob; +var amqpEnqueueJobs = require('./amqpConnector.js').enqueueJobs; +var synchronizeJobs = require('./jobSynchronization').synchronizeJobs var fs = require('fs'); let bufferManager = new BufferManager(); @@ -19,6 +21,18 @@ let restartCounter = new RestartCounter(backoffLimit); // * outs // * context // * cb + +function getExecutorType(context) { + if ("workerpools" in context.appConfig) { + for (const taskType of context.appConfig.workerpools) { + if (taskType.name === context['name']) { + return "WORKER_POOL" + } + } + } + return "JOB" +} + async function k8sCommandGroup(bufferItems) { // No action needed when buffer is empty @@ -112,7 +126,12 @@ async function k8sCommandGroup(bufferItems) { let jobExitCodes = []; try { - jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn); + if (getExecutorType(context) === "WORKER_POOL") { + await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams) + } else { + await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) + } + jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams); } catch (err) { console.log("Error when submitting job:", err); throw err; diff --git a/functions/kubernetes/k8sJobSubmit.js b/functions/kubernetes/k8sJobSubmit.js index 0827d3b..80eaf6e 100644 --- a/functions/kubernetes/k8sJobSubmit.js +++ b/functions/kubernetes/k8sJobSubmit.js @@ -58,6 +58,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => var volumePath = '/work_dir'; var jobName = Math.random().toString(36).substring(7) + '-' + job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId; + var workingDirPath = context.workdir; // remove chars not allowd in Pod names jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase(); @@ -80,7 +81,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => restartPolicy: restartPolicy, backoffLimit: backoffLimit, experimentId: context.hfId + ":" + context.appId, workflowName: context.wfname, taskName: job.name, - appId: context.appId + appId: context.appId, workingDirPath: workingDirPath } // Add/override custom parameters for the job @@ -193,44 +194,6 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams throw err; } - // 'awaitJob' -- wait for the job to finish, possibly restarting it - // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined - var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; - var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never"; - var restartCount = 0; - var awaitJob = async(taskId) => { - try { - var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite - } catch (err) { - console.error(err); - throw err; - } - let taskEnd = new Date().toISOString(); - console.log('Job ended with result:', jobResult, 'time:', taskEnd); - var code = parseInt(jobResult[1]); // job exit code - return code; - } - - var awaitJobs = async(taskIdArr) => { - let awaitPromises = [] - for (var i=0; i Date: Wed, 25 May 2022 23:25:02 +0200 Subject: [PATCH 2/4] Delete unused files --- functions/kubernetes/amqpCommand.js | 194 ---------------------------- 1 file changed, 194 deletions(-) delete mode 100644 functions/kubernetes/amqpCommand.js diff --git a/functions/kubernetes/amqpCommand.js b/functions/kubernetes/amqpCommand.js deleted file mode 100644 index bdfbf4e..0000000 --- a/functions/kubernetes/amqpCommand.js +++ /dev/null @@ -1,194 +0,0 @@ -// Runs a job as a Pod (Kubernetes Job) in a Kubernetes cluster - -const k8s = require('@kubernetes/client-node'); -var BufferManager = require('./buffer_manager.js').BufferManager; -var RestartCounter = require('./restart_counter.js').RestartCounter; -var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob; -var fs = require('fs'); - -let bufferManager = new BufferManager(); - -let backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; -let restartCounter = new RestartCounter(backoffLimit); - -// Function k8sCommandGroup -// -// Inputs: -// - bufferItems - array containing objects with following properties: -// * ins -// * outs -// * context -// * cb -async function k8sCommandGroup(bufferItems) { - - // No action needed when buffer is empty - if (bufferItems.length == 0) { - return; - } - - let startTime = Date.now(); - console.log("k8sCommandGroup started, time:", startTime); - - // Function for rebuffering items - let restartFn = (bufferIndex) => { - let bufferItem = bufferItems[bufferIndex]; - let taskId = bufferItem.context.taskId; - try { - var partition = bufferItem.context.executor.partition; // in case 'executor' doesn't exist - } catch(error) { } - if (restartCounter.isRestartPossible(taskId)) { - let restartVal = restartCounter.increase(taskId); - console.log("Readding task", taskId, "to buffer (restartCount:", restartVal + ") ..."); - let itemName = bufferItem.context.name; - bufferManager.addItem(itemName, bufferItem, partition); - } - return; - } - - // Extract particular arrays from buffer items - let jobArr = []; - let taskIdArr = []; - let contextArr = []; - let cbArr = []; - for (let i=0; i k8sCommandGroup(items)); - -async function k8sCommand(ins, outs, context, cb) { - /** Buffer Manager configuration. */ - buffersConf = context.appConfig.jobAgglomerations; - let alreadyConfigured = bufferManager.isConfigured(); - if (alreadyConfigured == false && buffersConf != undefined) { - bufferManager.configure(buffersConf); - } - - /** Buffer item. */ - let item = { - "ins": ins, - "outs": outs, - "context": context, - "cb": cb - }; - - try { - var partition = context.executor.partition; // in case 'executor' doesn't exist - } catch(error) { } - bufferManager.addItem(context.name, item, partition); - - return; -} - -exports.k8sCommand = k8sCommand; From 94adc51b7b6dee1cbcee835c598bd12695466e65 Mon Sep 17 00:00:00 2001 From: Krzysztof Janecki Date: Sun, 10 Jul 2022 21:15:18 +0200 Subject: [PATCH 3/4] Fix restartFn bug --- functions/kubernetes/amqpConnector.js | 2 +- functions/kubernetes/k8sCommand.js | 4 ++-- functions/kubernetes/k8sJobSubmit.js | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/functions/kubernetes/amqpConnector.js b/functions/kubernetes/amqpConnector.js index 61a4824..638817c 100644 --- a/functions/kubernetes/amqpConnector.js +++ b/functions/kubernetes/amqpConnector.js @@ -9,7 +9,7 @@ async function initialize(queue_name) { conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60"); } let ch = await conn.createChannel() - await ch.assertQueue(queue_name, {durable: false, expires: 600000}); // TODO: implement dynamic queue creation & cleanup + await ch.assertQueue(queue_name, {durable: false, expires: 6000000}); // TODO: implement dynamic queue creation & cleanup channels[queue_name] = ch } diff --git a/functions/kubernetes/k8sCommand.js b/functions/kubernetes/k8sCommand.js index 296c646..076ca15 100644 --- a/functions/kubernetes/k8sCommand.js +++ b/functions/kubernetes/k8sCommand.js @@ -129,9 +129,9 @@ async function k8sCommandGroup(bufferItems) { if (getExecutorType(context) === "WORKER_POOL") { await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams) } else { - await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) + await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams) } - jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams); + jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn); } catch (err) { console.log("Error when submitting job:", err); throw err; diff --git a/functions/kubernetes/k8sJobSubmit.js b/functions/kubernetes/k8sJobSubmit.js index 80eaf6e..1a15aeb 100644 --- a/functions/kubernetes/k8sJobSubmit.js +++ b/functions/kubernetes/k8sJobSubmit.js @@ -105,7 +105,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => // // // Returns: job exit code -var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) => { +var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => { // Load definition of the the worker job pod // File 'job-template.yaml' should be provided externally during deployment From 6802960aa8e052c67bab35f07c17a837838acf73 Mon Sep 17 00:00:00 2001 From: Krzysztof Janecki Date: Wed, 7 Sep 2022 20:33:26 +0200 Subject: [PATCH 4/4] Change workerpools config name to executionModels, add override queue name mechanism --- functions/kubernetes/amqpConnector.js | 19 ++++++++++++++++--- functions/kubernetes/k8sCommand.js | 4 ++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/functions/kubernetes/amqpConnector.js b/functions/kubernetes/amqpConnector.js index 638817c..1516a3f 100644 --- a/functions/kubernetes/amqpConnector.js +++ b/functions/kubernetes/amqpConnector.js @@ -9,15 +9,28 @@ async function initialize(queue_name) { conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60"); } let ch = await conn.createChannel() - await ch.assertQueue(queue_name, {durable: false, expires: 6000000}); // TODO: implement dynamic queue creation & cleanup + await ch.assertQueue(queue_name, {durable: false, expires: 6000000}); channels[queue_name] = ch } +function getQueueName(context) { + if ("executionModels" in context.appConfig) { + for (const taskType of context.appConfig.executionModels) { + if (taskType.name === context['name']) { + if ("queue" in taskType) { + return taskType.queue; + } + } + } + } + let namespace = process.env.HF_VAR_NAMESPACE || 'default' + return namespace + "." + context['name'] +} + async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) { let context = contextArr[0]; - let namespace = process.env.HF_VAR_NAMESPACE || 'default' - let queue_name = namespace + "." + context['name'] + let queue_name = getQueueName(context) if (conn === null || !(queue_name in channels)) { await initialize(queue_name) } diff --git a/functions/kubernetes/k8sCommand.js b/functions/kubernetes/k8sCommand.js index 076ca15..8f4d6c7 100644 --- a/functions/kubernetes/k8sCommand.js +++ b/functions/kubernetes/k8sCommand.js @@ -23,8 +23,8 @@ let restartCounter = new RestartCounter(backoffLimit); // * cb function getExecutorType(context) { - if ("workerpools" in context.appConfig) { - for (const taskType of context.appConfig.workerpools) { + if ("executionModels" in context.appConfig) { + for (const taskType of context.appConfig.executionModels) { if (taskType.name === context['name']) { return "WORKER_POOL" }