diff --git a/src/req/operations/atomicOperations.ts b/src/req/operations/atomicOperations.ts index b9d88c652..2a16890b2 100644 --- a/src/req/operations/atomicOperations.ts +++ b/src/req/operations/atomicOperations.ts @@ -10,20 +10,90 @@ import {AtomicOperationForkEvent} from "./../atomicOperationsIPC"; import {SpawnRequestParams} from "./../JobIPC"; import {getReadableAndWritable} from "./../getAppPath"; +/** + * Class representing some operation which should be run atomically + * + * @export + * @abstract + * @class AtomicOperation + */ export abstract class AtomicOperation { + /** + * Those files which will generated by running the operation. They will be deleted on success OR failure + * + * @type {Array} + * @memberof AtomicOperation + */ public generatedArtifacts : Array; - public destinationArtifacts : Array; + /** + * Those files which will generated as final output by the operation. They will be deleted ONLY on failure + * + * @type {Array} + * @memberof AtomicOperation + */ + public destinationArtifacts : Array; + + + /** + * Those directories which will be generated by running the operation. They will be deleted on success OR failure + * + * @type {Array} + * @memberof AtomicOperation + */ public generatedArtifactsDirectories : Array; - public destinationArtifactsDirectories : Array; + + /** + * Those directories which will be generated as final output by the operation. They will be deleted ONLY on failure + * + * @type {Array} + * @memberof AtomicOperation + */ + public destinationArtifactsDirectories : Array; + + /** + * Holds data allowing the operation to manage its log + * + * @type {LogRecord} + * @memberof AtomicOperation + */ public logRecord : LogRecord; + + /** + * Whether the operation's log should be automatically closed upon failure + * + * @type {boolean} + * @memberof AtomicOperation + */ public closeLogOnFailure : boolean; + + + /** + * Whether the operation's log should be automatically closed on success + * + * @type {boolean} + * @memberof AtomicOperation + */ public closeLogOnSuccess : boolean; + + /** + * The name of the operation. Used in registering and invocation + * + * @type {string} + * @memberof AtomicOperation + */ public name : string; + + /** + * Flags indicating the current running status of the operation + * + * @type {CompletionFlags} + * @memberof AtomicOperation + */ public flags : CompletionFlags; public update : () => void; @@ -34,8 +104,22 @@ export abstract class AtomicOperation public totalSteps : number; public extraData : any; + + /** + * Indicates whether the current operation is currently executing + * + * @type {boolean} + * @memberof AtomicOperation + */ public running : boolean; + + /** + * Indicates whether the operation should override the scheduler and run regardless of its position in the queue + * + * @type {boolean} + * @memberof AtomicOperation + */ public ignoreScheduler : boolean; @@ -90,12 +174,25 @@ export abstract class AtomicOperation + /** + * Sets flags to indicate failure + * + * @param {CompletionFlags} flags + * @memberof AtomicOperation + */ public setFailure(flags : CompletionFlags) : void { flags.done = true; flags.success = false; flags.failure = true; } + + /** + * Sets flags to indicate success + * + * @param {CompletionFlags} flags + * @memberof AtomicOperation + */ public setSuccess(flags : CompletionFlags) : void { flags.done = true; @@ -105,11 +202,29 @@ export abstract class AtomicOperation + /** + * Method called by the scheduler when first invoking the operation + * + * @abstract + * @memberof AtomicOperation + */ public abstract run() : void; - public abstract setData(data : any) : void; - + /** + * Method called by the scheduler immediately before run is called + * + * @abstract + * @param {*} data + * @memberof AtomicOperation + */ + public abstract setData(data : any) : void; + /** + * Abort the operation an error message + * + * @param {string} msg + * @memberof AtomicOperation + */ public abortOperationWithMessage(msg : string) : void { this.setFailure(this.flags); @@ -117,12 +232,25 @@ export abstract class AtomicOperation this.update(); } + /** + * Write an object to the operation's log file + * + * @param {*} obj + * @memberof AtomicOperation + */ public logObject(obj : any) : void { logString(this.logRecord,JSON.stringify(obj)); } } +/** + * Provides integrated logging to operations running in forked processes + * + * @export + * @class ForkLogger + * @extends {AtomicOperation} + */ export class ForkLogger extends AtomicOperation { public constructor() @@ -133,6 +261,13 @@ export class ForkLogger extends AtomicOperation public run(){} } +/** + * Registers traps for unhandled errors in the current process. Logs exception details and stack traces + * using the provided logger + * @export + * @param {ForkLogger} [logger] + * @param {string} [progressMessage] + */ export function handleForkFailures(logger? : ForkLogger,progressMessage? : string) { let signalFailure = function(err : string){ @@ -168,6 +303,12 @@ export function handleForkFailures(logger? : ForkLogger,progressMessage? : strin }); } +/** + * Flags indicating the state of a running operation + * + * @export + * @class CompletionFlags + */ export class CompletionFlags { public done : boolean; @@ -180,6 +321,13 @@ export class CompletionFlags this.failure = false; } } + +/** + * Defines the shape of an operation registration + * + * @export + * @interface RegisteredAtomicOperation + */ export interface RegisteredAtomicOperation { name : string; @@ -191,11 +339,24 @@ export let operationsQueue : Array = new Array export let updates : EventEmitter = new EventEmitter(); +/** + * Destroy the current queue of operations + * + * @export + */ export function clearOperationsQueue() { operationsQueue = new Array(); } +/** + * Register a new operation of name opName + * + * @export + * @param {string} opName + * @param {typeof AtomicOperation} op + * @returns {void} + */ export function register(opName : string,op : typeof AtomicOperation) : void { for(let i = 0; i != registeredOperations.length; ++i) @@ -214,6 +375,12 @@ export function register(opName : string,op : typeof AtomicOperation) : void ); } +/** + * Delete all of an operation's generatedArtifacts + * + * @export + * @param {AtomicOperation} op + */ export function cleanGeneratedArtifacts(op : AtomicOperation) : void { for(let i = 0; i != op.generatedArtifacts.length; ++i) @@ -234,6 +401,12 @@ export function cleanGeneratedArtifacts(op : AtomicOperation) : void } } +/** + * Delete all of an operation's destinationArtifacts + * + * @export + * @param {AtomicOperation} op + */ export function cleanDestinationArtifacts(op : AtomicOperation) : void { for(let i = 0; i != op.destinationArtifacts.length; ++i) @@ -254,10 +427,27 @@ export function cleanDestinationArtifacts(op : AtomicOperation) : void } } export let onComplete : (op : AtomicOperation) => void = undefined; + +/** + * Register a function to be called upon completion of any operation + * + * @export + * @param {(op : AtomicOperation) => void} func + */ export function setOnComplete(func : (op : AtomicOperation) => void) : void { onComplete = func; } + +/** + * Add a new operation given by opName onto the queue. data will be passed to the operation + * prior to execution + * + * @export + * @param {string} opName + * @param {*} data + * @returns {void} + */ export function addOperation(opName : string,data : any) : void { for(let i = 0; i != registeredOperations.length; ++i) @@ -301,6 +491,13 @@ export function addOperation(opName : string,data : any) : void console.log("Could not add operation "+opName); } +/** + * Check the queue for spare capacity. Run up to maxRunning operations concurrently. + * This method should be called on a timer to continuously update the queue and run operations + * + * @export + * @param {number} maxRunning + */ export function runOperations(maxRunning : number) : void { for(let i = 0; i != operationsQueue.length; ++i) @@ -344,6 +541,13 @@ export function runOperations(maxRunning : number) : void } export let logRecordFile = getReadableAndWritable(`logs/logRecords`); + +/** + * Structure describing an operation log + * + * @export + * @class LogRecord + */ export class LogRecord { name : string = ""; @@ -355,16 +559,38 @@ export class LogRecord uuid : string = ""; } +/** + * Returns the directory a given log is stored in + * + * @export + * @param {LogRecord} logRecord + * @returns {string} + */ export function getLogDirectory(logRecord : LogRecord) : string { return getReadableAndWritable(`logs/${logRecord.uuid}`); } +/** + * Returns the give log's backing log file + * + * @export + * @param {LogRecord} logRecord + * @returns {string} + */ export function getLogFile(logRecord : LogRecord) : string { return getReadableAndWritable(`logs/${logRecord.uuid}/log`); } +/** + * Opens a new log and returns a LogRecord describing it + * + * @export + * @param {string} name + * @param {string} description + * @returns {LogRecord} + */ export function openLog(name : string,description : string) : LogRecord { let uuid = uuidv4(); @@ -381,6 +607,13 @@ export function openLog(name : string,description : string) : LogRecord return res; } +/** + * Closes a given log with status. Updates the endEpoch and runTime properties of logRecord + * + * @export + * @param {LogRecord} logRecord + * @param {string} status + */ export function closeLog(logRecord : LogRecord,status : string) : void { if(!logRecord || !logRecord.uuid) @@ -395,6 +628,12 @@ export function closeLog(logRecord : LogRecord,status : string) : void } +/** + * Write a given LogRecord to the global LogRecord so it can be retrievable later + * + * @export + * @param {LogRecord} record + */ export function recordLogRecord(record : LogRecord) : void { if(record === undefined) @@ -403,6 +642,13 @@ export function recordLogRecord(record : LogRecord) : void fs.appendFileSync(logRecordFile,JSON.stringify(record)+"\n"); } +/** + * Write a string to the given log + * + * @export + * @param {LogRecord} logRecord + * @param {string} data + */ export function logString(logRecord : LogRecord,data : string) : void { if(!logRecord || !logRecord.uuid) @@ -412,6 +658,13 @@ export function logString(logRecord : LogRecord,data : string) : void } +/** + * Returns a promise containing the last most recent logs written to the global LogRecord + * + * @export + * @param {number} last + * @returns {Promise>} + */ export async function getLogRecords(last : number) : Promise> { return new Promise>((resolve,reject) => {