Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions mini-a-subtask.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var SubtaskManager = function(parentArgs, opts) {
this._workerFailures = {}
this._workerLastHeartbeat = {}
this._lastWorkerSelectionError = __
this.workerMaxFailures = _$(opts.workerMaxFailures, "opts.workerMaxFailures").isNumber().default(5)
this.workerProbeRetries = _$(opts.workerProbeRetries, "opts.workerProbeRetries").isNumber().default(3)
this.workerProbeRetryDelayMs = _$(opts.workerProbeRetryDelayMs, "opts.workerProbeRetryDelayMs").isNumber().default(250)
this.workerReviveCooldownMs = _$(opts.workerReviveCooldownMs, "opts.workerReviveCooldownMs").isNumber().default(15000)
Expand All @@ -60,6 +61,8 @@ var SubtaskManager = function(parentArgs, opts) {
maxDepthUsed: 0
}

this._running = true

if (this.remoteDelegation) {
this._refreshWorkerProfiles()
}
Expand All @@ -68,6 +71,17 @@ var SubtaskManager = function(parentArgs, opts) {
this._startWatchdog()
}

/**
* <odoc>
* <key>SubtaskManager.destroy()</key>
* Signals the watchdog thread to stop running.
* Call this when the SubtaskManager is no longer needed to free the background thread.
* </odoc>
*/
SubtaskManager.prototype.destroy = function() {
this._running = false
}

SubtaskManager.prototype._normalizeWorkers = function(workers) {
var list = []
var parsed = workers
Expand Down Expand Up @@ -113,14 +127,6 @@ SubtaskManager.prototype._buildChildArgs = function(subtask) {
return mergedArgs
}

SubtaskManager.prototype._nextWorker = function() {
var healthyWorkers = this._getHealthyWorkers()
if (healthyWorkers.length === 0) return __
var idx = this._workerCursor % healthyWorkers.length
this._workerCursor++
return healthyWorkers[idx]
}

SubtaskManager.prototype._refreshWorkerProfiles = function() {
var parent = this

Expand Down Expand Up @@ -370,7 +376,7 @@ SubtaskManager.prototype._recordWorkerFailure = function(workerUrl, error) {
if (!isString(workerUrl) || workerUrl.length === 0) return
var failures = _$(this._workerFailures[workerUrl], "this._workerFailures[workerUrl]").isNumber().default(0) + 1
this._workerFailures[workerUrl] = failures
if (failures >= this.defaultMaxAttempts) {
if (failures >= this.workerMaxFailures) {
var reason = "Repeated transport/runtime failures"
if (isString(error) && error.length > 0) reason += ": " + error
this._markWorkerDead(workerUrl, reason)
Expand Down Expand Up @@ -630,7 +636,7 @@ SubtaskManager.prototype._failOrRetrySubtask = function(subtask, prefix, error)
}

if (this.remoteDelegation && isString(subtask.workerUrl) && subtask.workerUrl.length > 0) {
this._markWorkerDead(subtask.workerUrl, "Subtask exhausted max attempts (" + subtask.maxAttempts + ")")
this._recordWorkerFailure(subtask.workerUrl, "Subtask exhausted max attempts (" + subtask.maxAttempts + ")")
}

subtask.status = "failed"
Expand Down Expand Up @@ -1101,9 +1107,11 @@ SubtaskManager.prototype.waitForAll = function(subtaskIds, timeoutMs) {

timeoutMs = _$(timeoutMs, "timeoutMs").isNumber().default(300000)
var results = []
var startAll = new Date().getTime()

for (var i = 0; i < subtaskIds.length; i++) {
var remainingTime = timeoutMs - (results.length > 0 ? 0 : 0)
var elapsed = new Date().getTime() - startAll
var remainingTime = Math.max(1, timeoutMs - elapsed)
results.push(this.waitFor(subtaskIds[i], remainingTime))
}

Expand Down Expand Up @@ -1199,13 +1207,12 @@ SubtaskManager.prototype.getMetrics = function() {
*/
SubtaskManager.prototype._processQueue = function() {
while (this.runningCount < this.maxConcurrent && this.pendingQueue.length > 0) {
var nextId = this.pendingQueue[0]
var nextId = this.pendingQueue.shift()
var subtask = this.subtasks[nextId]
if (!isDef(subtask) || subtask.status !== "pending") continue
try {
this.start(nextId)
} catch(e) {
// If start fails, remove from queue and mark as failed
this.pendingQueue.shift()
var subtask = this.subtasks[nextId]
if (isDef(subtask)) {
subtask.status = "failed"
subtask.error = "Failed to start: " + (isDef(e) && isString(e.message) ? e.message : stringify(e, __, ""))
Expand All @@ -1226,7 +1233,7 @@ SubtaskManager.prototype._startWatchdog = function() {
var parent = this

$doV(function() {
while (true) {
while (parent._running) {
try {
var now = new Date().getTime()
var subtaskIds = Object.keys(parent.subtasks)
Expand Down
216 changes: 92 additions & 124 deletions mini-a-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ todo:
}
if (isArray(global.__worker_ipallow) && global.__worker_ipallow.length > 0) {
var remoteAddr = request.header["x-forwarded-for"] || request.remoteAddress;
var allowed = global.__worker_ipallow.some(function(pattern) { return remoteAddr.indexOf(pattern) === 0; });
var allowed = global.__worker_ipallow.some(function(pattern) { return global.__worker_cidrMatch(remoteAddr, pattern); });
if (!allowed) {
return ow.server.httpd.reply(stringify({ error: "Forbidden" }, __, ""), 403, ow.server.httpd.mimes.JSON);
}
Expand All @@ -143,67 +143,11 @@ todo:
try { postData = jsonParse(request.files.postData); } catch(jsonErr) {
return ow.server.httpd.reply(stringify({ error: "Invalid JSON in request body" }, __, ""), 400, ow.server.httpd.mimes.JSON);
}
var wargs = global.__worker_args;
var goalTrimmed = isString(postData.goal) ? postData.goal.trim() : "";
if (goalTrimmed.length === 0) {
return ow.server.httpd.reply(stringify({ error: "Missing required field: goal" }, __, ""), 400, ow.server.httpd.mimes.JSON);
}
if (goalTrimmed.length > 10000) {
return ow.server.httpd.reply(stringify({ error: "Goal exceeds maximum length of 10000 characters" }, __, ""), 400, ow.server.httpd.mimes.JSON);
}
var taskArgs = postData.args || {};
var allowedRemoteKeys = {
goal: true, format: true, raw: true, chatbotmode: true, useplanning: true,
updatefreq: true, updateinterval: true, forceupdates: true, planlog: true,
planmode: true, planformat: true, convertplan: true, maxsteps: true
};
var mergedArgs = merge({}, wargs);
Object.keys(taskArgs).forEach(function(key) {
if (allowedRemoteKeys[key] === true) { mergedArgs[key] = taskArgs[key]; }
});
mergedArgs.goal = goalTrimmed;
var timeout = wargs.defaulttimeout;
if (isDef(postData.timeout)) {
var parsedTimeout = Number(postData.timeout);
if (!isNaN(parsedTimeout) && parsedTimeout > 0) {
timeout = parsedTimeout * 1000;
}
}
if (timeout > wargs.maxtimeout) timeout = wargs.maxtimeout;
var taskId = genUUID();
var now = new Date().getTime();
if (isUnDef(global.__worker_taskManager)) {
global.__worker_taskManager = new SubtaskManager(mergedArgs, {
maxConcurrent: wargs.maxconcurrent,
defaultDeadlineMs: wargs.defaulttimeout,
defaultMaxAttempts: wargs.delegationmaxretries,
maxDepth: wargs.delegationmaxdepth,
interactionFn: function(event, message) {
var resolvedTaskId = global.__worker_resolveTaskIdFromMessage(message);
if (isString(resolvedTaskId) && resolvedTaskId.length > 0) {
global.__worker_appendTaskEvent(resolvedTaskId, event, message);
}
global.__worker_logTaskEvent(resolvedTaskId, event, message);
},
currentDepth: 0
});
var result = global.__worker_submitTask(postData, global.__worker_args);
if (isMap(result.error)) {
return ow.server.httpd.reply(stringify(result.error, __, ""), result.code, ow.server.httpd.mimes.JSON);
}
var subtaskId = global.__worker_taskManager.submit(mergedArgs.goal, mergedArgs, {
deadlineMs: timeout,
metadata: postData.metadata || {}
});
global.__worker_tasks[taskId] = {
taskId: taskId, subtaskId: subtaskId, status: "queued", goal: mergedArgs.goal,
args: mergedArgs, createdAt: now, startedAt: __, completedAt: __,
events: [], metadata: postData.metadata || {}
};
global.__worker_subtaskToTaskId[subtaskId] = taskId;
global.__worker_subtaskShortToTaskId[subtaskId.substring(0, 8)] = taskId;
global.__worker_logTaskEvent(taskId, "info", "[subtask:" + subtaskId.substring(0, 8) + "] Task accepted by worker");
global.__worker_taskManager.start(subtaskId);
global.__worker_tasks[taskId].status = "running";
global.__worker_tasks[taskId].startedAt = new Date().getTime();
return ow.server.httpd.reply(stringify({ taskId: taskId, status: "queued", createdAt: now }, __, ""), 202, ow.server.httpd.mimes.JSON);
return ow.server.httpd.reply(stringify({ taskId: result.taskId, status: global.__worker_tasks[result.taskId].status, createdAt: result.createdAt }, __, ""), 202, ow.server.httpd.mimes.JSON);
} catch(e) {
logErr(e);
return ow.server.httpd.reply(stringify({ error: String(e) }, __, ""), 500, ow.server.httpd.mimes.JSON);
Expand Down Expand Up @@ -243,68 +187,9 @@ todo:
}
};

var requestBody = { files: { postData: stringify(taskPayload, __, "") } };
var _request = request;
request = requestBody;
var submission = __;
try {
submission = (function() {
var postData;
postData = jsonParse(request.files.postData);
var wargs = global.__worker_args;
var goalTrimmed = isString(postData.goal) ? postData.goal.trim() : "";
var taskArgs = postData.args || {};
var allowedRemoteKeys = {
goal: true, format: true, raw: true, chatbotmode: true, useplanning: true,
updatefreq: true, updateinterval: true, forceupdates: true, planlog: true,
planmode: true, planformat: true, convertplan: true, maxsteps: true
};
var mergedArgs = merge({}, wargs);
Object.keys(taskArgs).forEach(function(key) {
if (allowedRemoteKeys[key] === true) { mergedArgs[key] = taskArgs[key]; }
});
mergedArgs.goal = goalTrimmed;
var timeout = wargs.defaulttimeout;
if (isDef(postData.timeout)) {
var parsedTimeout = Number(postData.timeout);
if (!isNaN(parsedTimeout) && parsedTimeout > 0) timeout = parsedTimeout * 1000;
}
if (timeout > wargs.maxtimeout) timeout = wargs.maxtimeout;
var taskId = genUUID();
var now = new Date().getTime();
if (isUnDef(global.__worker_taskManager)) {
global.__worker_taskManager = new SubtaskManager(mergedArgs, {
maxConcurrent: wargs.maxconcurrent,
defaultDeadlineMs: wargs.defaulttimeout,
defaultMaxAttempts: wargs.delegationmaxretries,
maxDepth: wargs.delegationmaxdepth,
interactionFn: function(event, message) {
var resolvedTaskId = global.__worker_resolveTaskIdFromMessage(message);
if (isString(resolvedTaskId) && resolvedTaskId.length > 0) global.__worker_appendTaskEvent(resolvedTaskId, event, message);
global.__worker_logTaskEvent(resolvedTaskId, event, message);
},
currentDepth: 0
});
}
var subtaskId = global.__worker_taskManager.submit(mergedArgs.goal, mergedArgs, {
deadlineMs: timeout,
metadata: postData.metadata || {}
});
global.__worker_tasks[taskId] = {
taskId: taskId, subtaskId: subtaskId, status: "queued", goal: mergedArgs.goal,
args: mergedArgs, createdAt: now, startedAt: __, completedAt: __,
events: [], metadata: postData.metadata || {}
};
global.__worker_subtaskToTaskId[subtaskId] = taskId;
global.__worker_subtaskShortToTaskId[subtaskId.substring(0, 8)] = taskId;
global.__worker_logTaskEvent(taskId, "info", "[subtask:" + subtaskId.substring(0, 8) + "] Task accepted by worker");
global.__worker_taskManager.start(subtaskId);
global.__worker_tasks[taskId].status = "running";
global.__worker_tasks[taskId].startedAt = new Date().getTime();
return { taskId: taskId, createdAt: now };
})();
} finally {
request = _request;
var submission = global.__worker_submitTask(taskPayload, global.__worker_args);
if (isMap(submission.error)) {
return ow.server.httpd.reply(stringify(submission.error, __, ""), submission.code, ow.server.httpd.mimes.JSON);
}

var task = global.__worker_tasks[submission.taskId];
Expand Down Expand Up @@ -605,7 +490,6 @@ jobs:
// Initialize global state
global.__worker_args = args
global.__worker_tasks = {}
global.__worker_taskManager = __
global.__worker_startTime = java.lang.System.currentTimeMillis()
global.__worker_subtaskToTaskId = {}
global.__worker_subtaskShortToTaskId = {}
Expand Down Expand Up @@ -644,6 +528,90 @@ jobs:
return __
}

global.__worker_cidrMatch = function(ip, cidr) {
if (cidr.indexOf("/") < 0) return ip === cidr
var parts = cidr.split("/")
var cidrIp = parts[0]
var prefixLen = parseInt(parts[1], 10)
if (isNaN(prefixLen) || prefixLen < 0 || prefixLen > 32) return false
var ipToInt = function(addr) {
var segs = addr.split(".")
if (segs.length !== 4) return NaN
var result = 0
for (var i = 0; i < 4; i++) {
var octet = parseInt(segs[i], 10)
if (isNaN(octet) || octet < 0 || octet > 255) return NaN
result = (result << 8) | octet
}
return result >>> 0
}
var ipNum = ipToInt(ip)
var cidrNum = ipToInt(cidrIp)
if (isNaN(ipNum) || isNaN(cidrNum)) return false
var mask = prefixLen === 0 ? 0 : (0xFFFFFFFF << (32 - prefixLen)) >>> 0
return (ipNum & mask) === (cidrNum & mask)
}

global.__worker_submitTask = function(postData, wargs) {
var goalTrimmed = isString(postData.goal) ? postData.goal.trim() : ""
if (goalTrimmed.length === 0) {
return { error: { error: "Missing required field: goal" }, code: 400 }
}
if (goalTrimmed.length > 10000) {
return { error: { error: "Goal exceeds maximum length of 10000 characters" }, code: 400 }
}
var taskArgs = postData.args || {}
var allowedRemoteKeys = {
goal: true, format: true, raw: true, chatbotmode: true, useplanning: true,
updatefreq: true, updateinterval: true, forceupdates: true, planlog: true,
planmode: true, planformat: true, convertplan: true, maxsteps: true
}
var mergedArgs = merge({}, wargs)
Object.keys(taskArgs).forEach(function(key) {
if (allowedRemoteKeys[key] === true) { mergedArgs[key] = taskArgs[key] }
})
mergedArgs.goal = goalTrimmed
var timeout = wargs.defaulttimeout
if (isDef(postData.timeout)) {
var parsedTimeout = Number(postData.timeout)
if (!isNaN(parsedTimeout) && parsedTimeout > 0) timeout = parsedTimeout * 1000
}
if (timeout > wargs.maxtimeout) timeout = wargs.maxtimeout
var taskId = genUUID()
var now = new Date().getTime()
var subtaskId = global.__worker_taskManager.submit(mergedArgs.goal, mergedArgs, {
deadlineMs: timeout,
metadata: postData.metadata || {}
})
global.__worker_tasks[taskId] = {
taskId: taskId, subtaskId: subtaskId, status: "queued", goal: mergedArgs.goal,
args: mergedArgs, createdAt: now, startedAt: __, completedAt: __,
events: [], metadata: postData.metadata || {}
}
global.__worker_subtaskToTaskId[subtaskId] = taskId
global.__worker_subtaskShortToTaskId[subtaskId.substring(0, 8)] = taskId
global.__worker_logTaskEvent(taskId, "info", "[subtask:" + subtaskId.substring(0, 8) + "] Task accepted by worker")
global.__worker_taskManager.start(subtaskId)
global.__worker_tasks[taskId].status = "running"
global.__worker_tasks[taskId].startedAt = new Date().getTime()
return { taskId: taskId, createdAt: now }
}

global.__worker_taskManager = new SubtaskManager(args, {
maxConcurrent: args.maxconcurrent,
defaultDeadlineMs: args.defaulttimeout,
defaultMaxAttempts: args.delegationmaxretries,
maxDepth: args.delegationmaxdepth,
interactionFn: function(event, message) {
var resolvedTaskId = global.__worker_resolveTaskIdFromMessage(message)
if (isString(resolvedTaskId) && resolvedTaskId.length > 0) {
global.__worker_appendTaskEvent(resolvedTaskId, event, message)
}
global.__worker_logTaskEvent(resolvedTaskId, event, message)
},
currentDepth: 0
})

log("Mini-A Worker API starting on port " + args.onport)
log("Max concurrent tasks: " + args.maxconcurrent)
log("Default timeout: " + args.defaulttimeout + "ms")
Expand Down