diff --git a/functions/kubernetes/amqpConnector.js b/functions/kubernetes/amqpConnector.js new file mode 100644 index 0000000..1516a3f --- /dev/null +++ b/functions/kubernetes/amqpConnector.js @@ -0,0 +1,57 @@ +const amqplib = require('amqplib'), + createJobMessage = require('../../common/jobMessage').createJobMessage; +let channels = {}; +let conn = null; + +async function initialize(queue_name) { + + if (conn === null) { + conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60"); + } + let ch = await conn.createChannel() + await ch.assertQueue(queue_name, {durable: false, expires: 6000000}); + channels[queue_name] = ch + +} + +function getQueueName(context) { + if ("executionModels" in context.appConfig) { + for (const taskType of context.appConfig.executionModels) { + if (taskType.name === context['name']) { + if ("queue" in taskType) { + return taskType.queue; + } + } + } + } + let namespace = process.env.HF_VAR_NAMESPACE || 'default' + return namespace + "." + context['name'] +} + +async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) { + let context = contextArr[0]; + let queue_name = getQueueName(context) + if (conn === null || !(queue_name in channels)) { + await initialize(queue_name) + } + let ch = channels[queue_name] + try { + + console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`) + let tasks = []; + + for (let i = 0; i < jobArr.length; i++) { + let job = jobArr[i]; + let taskId = taskIdArr[i]; + let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId); + await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove + tasks.push({"id": taskId, "message": jobMessage}); + } + + await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks}))); + } catch (error) { + console.log(error) + } +} + +exports.enqueueJobs = enqueueJobs \ No newline at end of file diff --git a/functions/kubernetes/jobSynchronization.js b/functions/kubernetes/jobSynchronization.js new file mode 100644 index 0000000..fac139f --- /dev/null +++ b/functions/kubernetes/jobSynchronization.js @@ -0,0 +1,46 @@ + +async function synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn) { + + let context = contextArr[0]; + // 'awaitJob' -- wait for the job to finish, possibly restarting it + // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined + var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; + var restartPolicy = backoffLimit > 0 ? "OnFailure" : "Never"; + var restartCount = 0; + var awaitJob = async (taskId) => { + try { + var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite + } catch (err) { + console.error(err); + throw err; + } + let taskEnd = new Date().toISOString(); + console.log('Job ended with result:', jobResult, 'time:', taskEnd); + // job exit code + return parseInt(jobResult[1]); + } + + var awaitJobs = async (taskIdArr) => { + let awaitPromises = [] + for (var i = 0; i < taskIdArr.length; i++) { + awaitPromises.push(awaitJob(taskIdArr[i])); + } + return Promise.all(awaitPromises); + } + + let jobExitCodes = await awaitJobs(taskIdArr); + for (let i = 0; i < jobExitCodes.length; i++) { + let jobExitCode = jobExitCodes[i]; + let taskId = taskIdArr[i]; + if (jobExitCode !== 0) { + console.log("Job", taskId, "failed"); + restartFn(i); + // NOTE: job message is preserved, so we don't have to send it again. + } + } + + return jobExitCodes; + +} + +exports.synchronizeJobs = synchronizeJobs \ No newline at end of file diff --git a/functions/kubernetes/k8sCommand.js b/functions/kubernetes/k8sCommand.js index bdfbf4e..8f4d6c7 100644 --- a/functions/kubernetes/k8sCommand.js +++ b/functions/kubernetes/k8sCommand.js @@ -4,6 +4,8 @@ const k8s = require('@kubernetes/client-node'); var BufferManager = require('./buffer_manager.js').BufferManager; var RestartCounter = require('./restart_counter.js').RestartCounter; var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob; +var amqpEnqueueJobs = require('./amqpConnector.js').enqueueJobs; +var synchronizeJobs = require('./jobSynchronization').synchronizeJobs var fs = require('fs'); let bufferManager = new BufferManager(); @@ -19,6 +21,18 @@ let restartCounter = new RestartCounter(backoffLimit); // * outs // * context // * cb + +function getExecutorType(context) { + if ("executionModels" in context.appConfig) { + for (const taskType of context.appConfig.executionModels) { + if (taskType.name === context['name']) { + return "WORKER_POOL" + } + } + } + return "JOB" +} + async function k8sCommandGroup(bufferItems) { // No action needed when buffer is empty @@ -112,7 +126,12 @@ async function k8sCommandGroup(bufferItems) { let jobExitCodes = []; try { - jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn); + if (getExecutorType(context) === "WORKER_POOL") { + await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams) + } else { + await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams) + } + jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn); } catch (err) { console.log("Error when submitting job:", err); throw err; diff --git a/functions/kubernetes/k8sJobSubmit.js b/functions/kubernetes/k8sJobSubmit.js index 0827d3b..1a15aeb 100644 --- a/functions/kubernetes/k8sJobSubmit.js +++ b/functions/kubernetes/k8sJobSubmit.js @@ -58,6 +58,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => var volumePath = '/work_dir'; var jobName = Math.random().toString(36).substring(7) + '-' + job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId; + var workingDirPath = context.workdir; // remove chars not allowd in Pod names jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase(); @@ -80,7 +81,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => restartPolicy: restartPolicy, backoffLimit: backoffLimit, experimentId: context.hfId + ":" + context.appId, workflowName: context.wfname, taskName: job.name, - appId: context.appId + appId: context.appId, workingDirPath: workingDirPath } // Add/override custom parameters for the job @@ -104,7 +105,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => // // // Returns: job exit code -var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) => { +var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => { // Load definition of the the worker job pod // File 'job-template.yaml' should be provided externally during deployment @@ -193,44 +194,6 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams throw err; } - // 'awaitJob' -- wait for the job to finish, possibly restarting it - // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined - var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; - var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never"; - var restartCount = 0; - var awaitJob = async(taskId) => { - try { - var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite - } catch (err) { - console.error(err); - throw err; - } - let taskEnd = new Date().toISOString(); - console.log('Job ended with result:', jobResult, 'time:', taskEnd); - var code = parseInt(jobResult[1]); // job exit code - return code; - } - - var awaitJobs = async(taskIdArr) => { - let awaitPromises = [] - for (var i=0; i