Skip to content

Commit

Permalink
Merge pull request #108 from JTSIV1/generalBackoff
Browse files Browse the repository at this point in the history
Generalized backoff functionality
  • Loading branch information
alexandermichels committed Mar 21, 2024
2 parents 5216a3e + f99c844 commit ee9c837
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
26 changes: 26 additions & 0 deletions src/Helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ var Helper = {
consoleEnd: "\x1b[0m",

consoleGreen: "\x1b[32m",

/**
*
* @param funcCall - The function that is run with backoff
* @param parameters - What the function is input as parameters (in the form of one array)
* @param printOnError - Printed with error when catch block reached
*/
async runCommandWithBackoff(funcCall: (...args: any[]) => {}, parameters: any[], printOnError: string | null) {
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
var wait = 0;
var end = false;

while (true && !end) {
if (wait > 100) {
throw new Error("The function was attempted too mant times unsuccessfully");
}
try {
await sleep(wait * 1000);
await funcCall(...parameters);
end = true;
} catch (e) {
console.error(printOnError + e.stack);
}
wait = wait == 0 ? 2 : wait * wait;
}
}
};

export default Helper;
38 changes: 25 additions & 13 deletions src/Supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import connectionPool from "./connectors/ConnectionPool";
import * as events from "events";
import DB from "./DB";
import NodeSSH = require("node-ssh");
import Helper from "./Helper";

class Supervisor {
private db = new DB();
Expand Down Expand Up @@ -126,7 +127,7 @@ class Supervisor {

async createMaintainerWorker(job: Job) {
var self = this;

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
while (true) {
// get ssh connector from pool
var ssh: SSH;
Expand All @@ -136,21 +137,31 @@ class Supervisor {
ssh = connectionPool[job.id].ssh;
}

// connect ssh & run
try {
if (!ssh.connection.isConnected())
await ssh.connection.connect(ssh.config);
await ssh.connection.execCommand("echo"); // test connection
if (job.maintainerInstance.isInit) {
await job.maintainerInstance.maintain();
} else {
await job.maintainerInstance.init();
if (!ssh.connection.isConnected()) {
try {
// wraps command with backoff -> takes lambda function and array of inputs to execute command
await Helper.runCommandWithBackoff(async (ssh1: SSH) => {
if (!ssh1.connection.isConnected()) {
await ssh1.connection.connect(ssh1.config);
}
ssh1.connection.execCommand("echo");
}, [ssh], null);
} catch (e) {
console.log(`job [${job.id}]: Caught ${e}`)
self.emitter.registerEvents(
job,
"JOB_FAILED",
`job [${job.id}] failed because the HPC could not connect within the allotted time`
);
}
} catch (e) {
if (config.is_testing) console.error(e.stack);
continue;

}

if (job.maintainerInstance.isInit) {
await job.maintainerInstance.maintain();
} else {
await job.maintainerInstance.init();
}
// emit events & logs
var events = job.maintainerInstance.dumpEvents();
var logs = job.maintainerInstance.dumpLogs();
Expand Down Expand Up @@ -252,3 +263,4 @@ class Supervisor {
}

export default Supervisor;

11 changes: 9 additions & 2 deletions src/connectors/BaseConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as path from "path";
import connectionPool from "./ConnectionPool";
import { config, hpcConfigMap } from "../../configs/config";
import FileUtil from "../lib/FolderUtil";
import Helper from "../Helper";

class BaseConnector {
/**
Expand Down Expand Up @@ -169,7 +170,10 @@ class BaseConnector {
"SSH_SCP_DOWNLOAD",
`get file from ${from} to ${to}`
);
await this.ssh().connection.getFile(to, fromZipFilePath);
// wraps command with backoff -> takes lambda function and array of inputs to execute command
await Helper.runCommandWithBackoff.call(this, async (to1: string, zipPath: string) => {
await this.ssh().connection.getFile(to1, zipPath);
}, [to, fromZipFilePath], "Trying to download file again");
await this.rm(fromZipFilePath);
await FileUtil.putFileFromZip(to, toZipFilePath);
} catch (e) {
Expand All @@ -195,7 +199,10 @@ class BaseConnector {
"SSH_SCP_UPLOAD",
`put file from ${from} to ${to}`
);
await this.ssh().connection.putFile(from, to);
// wraps command with backoff -> takes lambda function and array of inputs to execute command
await Helper.runCommandWithBackoff.call(this, async (from1: string, to1: string) => {
await this.ssh().connection.putFile(from1, to1);
}, [from, to], "Trying again to transfer file");
} catch (e) {
const error =
`unable to put file from ${from} to ${to}: ` + e.toString();
Expand Down

0 comments on commit ee9c837

Please sign in to comment.