Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions functions/kubernetes/amqpConnector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const amqplib = require('amqplib'),
createJobMessage = require('../../common/jobMessage').createJobMessage;
let channels = {};
let conn = null;

async function initialize(queue_name) {

if (conn === null) {
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
}
let ch = await conn.createChannel()
await ch.assertQueue(queue_name, {durable: false, expires: 6000000});
channels[queue_name] = ch

}

function getQueueName(context) {
if ("executionModels" in context.appConfig) {
for (const taskType of context.appConfig.executionModels) {
if (taskType.name === context['name']) {
if ("queue" in taskType) {
return taskType.queue;
}
}
}
}
let namespace = process.env.HF_VAR_NAMESPACE || 'default'
return namespace + "." + context['name']
}

async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
let context = contextArr[0];
let queue_name = getQueueName(context)
if (conn === null || !(queue_name in channels)) {
await initialize(queue_name)
}
let ch = channels[queue_name]
try {

console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`)
let tasks = [];

for (let i = 0; i < jobArr.length; i++) {
let job = jobArr[i];
let taskId = taskIdArr[i];
let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId);
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove
tasks.push({"id": taskId, "message": jobMessage});
}

await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks})));
} catch (error) {
console.log(error)
}
}

exports.enqueueJobs = enqueueJobs
46 changes: 46 additions & 0 deletions functions/kubernetes/jobSynchronization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

async function synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn) {

let context = contextArr[0];
// 'awaitJob' -- wait for the job to finish, possibly restarting it
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
var restartPolicy = backoffLimit > 0 ? "OnFailure" : "Never";
var restartCount = 0;
var awaitJob = async (taskId) => {
try {
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
} catch (err) {
console.error(err);
throw err;
}
let taskEnd = new Date().toISOString();
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
// job exit code
return parseInt(jobResult[1]);
}

var awaitJobs = async (taskIdArr) => {
let awaitPromises = []
for (var i = 0; i < taskIdArr.length; i++) {
awaitPromises.push(awaitJob(taskIdArr[i]));
}
return Promise.all(awaitPromises);
}

let jobExitCodes = await awaitJobs(taskIdArr);
for (let i = 0; i < jobExitCodes.length; i++) {
let jobExitCode = jobExitCodes[i];
let taskId = taskIdArr[i];
if (jobExitCode !== 0) {
console.log("Job", taskId, "failed");
restartFn(i);
// NOTE: job message is preserved, so we don't have to send it again.
}
}

return jobExitCodes;

}

exports.synchronizeJobs = synchronizeJobs
21 changes: 20 additions & 1 deletion functions/kubernetes/k8sCommand.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const k8s = require('@kubernetes/client-node');
var BufferManager = require('./buffer_manager.js').BufferManager;
var RestartCounter = require('./restart_counter.js').RestartCounter;
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
var amqpEnqueueJobs = require('./amqpConnector.js').enqueueJobs;
var synchronizeJobs = require('./jobSynchronization').synchronizeJobs
var fs = require('fs');

let bufferManager = new BufferManager();
Expand All @@ -19,6 +21,18 @@ let restartCounter = new RestartCounter(backoffLimit);
// * outs
// * context
// * cb

function getExecutorType(context) {
if ("executionModels" in context.appConfig) {
for (const taskType of context.appConfig.executionModels) {
if (taskType.name === context['name']) {
return "WORKER_POOL"
}
}
}
return "JOB"
}

async function k8sCommandGroup(bufferItems) {

// No action needed when buffer is empty
Expand Down Expand Up @@ -112,7 +126,12 @@ async function k8sCommandGroup(bufferItems) {

let jobExitCodes = [];
try {
jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn);
if (getExecutorType(context) === "WORKER_POOL") {
await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams)
} else {
await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams)
}
jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn);
} catch (err) {
console.log("Error when submitting job:", err);
throw err;
Expand Down
43 changes: 3 additions & 40 deletions functions/kubernetes/k8sJobSubmit.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
var volumePath = '/work_dir';
var jobName = Math.random().toString(36).substring(7) + '-' +
job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId;
var workingDirPath = context.workdir;

// remove chars not allowd in Pod names
jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase();
Expand All @@ -80,7 +81,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
restartPolicy: restartPolicy, backoffLimit: backoffLimit,
experimentId: context.hfId + ":" + context.appId,
workflowName: context.wfname, taskName: job.name,
appId: context.appId
appId: context.appId, workingDirPath: workingDirPath
}

// Add/override custom parameters for the job
Expand All @@ -104,7 +105,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
//
//
// Returns: job exit code
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) => {
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => {

// Load definition of the the worker job pod
// File 'job-template.yaml' should be provided externally during deployment
Expand Down Expand Up @@ -193,44 +194,6 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams
throw err;
}

// 'awaitJob' -- wait for the job to finish, possibly restarting it
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never";
var restartCount = 0;
var awaitJob = async(taskId) => {
try {
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
} catch (err) {
console.error(err);
throw err;
}
let taskEnd = new Date().toISOString();
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
var code = parseInt(jobResult[1]); // job exit code
return code;
}

var awaitJobs = async(taskIdArr) => {
let awaitPromises = []
for (var i=0; i<taskIdArr.length; i++) {
awaitPromises.push(awaitJob(taskIdArr[i]));
}
return Promise.all(awaitPromises);
}

let jobExitCodes = await awaitJobs(taskIdArr);
for (let i = 0; i < jobExitCodes.length; i++) {
let jobExitCode = jobExitCodes[i];
let taskId = taskIdArr[i];
if (jobExitCode != 0) {
console.log("Job", taskId, "failed");
restartFn(i);
// NOTE: job message is preserved, so we don't have to send it again.
}
}

return jobExitCodes;
}

exports.submitK8sJob = submitK8sJob;
Expand Down