Skip to content

Commit

Permalink
Merge pull request #233 from ICRAR/liu-365
Browse files Browse the repository at this point in the history
Liu 365
  • Loading branch information
awicenec committed May 31, 2023
2 parents abaa374 + 51d2e5f commit fcce6f3
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 92 deletions.
4 changes: 2 additions & 2 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# script builds the daliuge-engine docker container either with a tag referring to the current
# branch name or with a release tag depending whether this is a development or deployment
# version.
Expand Down Expand Up @@ -46,7 +46,7 @@ case "$1" in
docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock dslim/docker-slim build --include-shell \
--include-path /etc --include-path /usr/local/lib --include-path /usr/local/bin --include-path /usr/lib/python3.8 \
--include-path /usr/lib/python3 --include-path /dlg --include-path /daliuge --publish-exposed-ports=true \
--http-probe-exec start_local_managers.sh --http-probe=true --tag=icrar/daliuge-engine:${VCS_TAG}\
--http-probe=true --tag=icrar/daliuge-engine:${VCS_TAG}\
icrar/daliuge-engine.big:${VCS_TAG} \
;;
*)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ def deploySession(self, sessionId, completedDrops=[]):
"UIDs for completed drops not found: %r", not_found
)
logger.info(
"Moving Drops to COMPLETED right away: %r", completedDrops
"Moving graph root Drops to COMPLETED right away: %r",
completedDrops,
)
completed_by_host = group_by_node(completedDrops, self._graph)
self.replicate(
Expand Down
21 changes: 15 additions & 6 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def fwrapper(*args, **kwargs):
bottle.response.headers[
"Access-Control-Allow-Headers"
] = "Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token"
jres = (
json.dumps(res) if res else json.dumps({"Status": "Success"})
)
logger.debug(
"Bottle sending back result: %s", jres[: min(len(jres), 80)]
)
return json.dumps(res)
except Exception as e:
logger.exception("Error while fulfilling request")
Expand Down Expand Up @@ -220,7 +226,9 @@ def initializeSpecifics(self, app):

@daliuge_aware
def submit_methods(self):
return {"methods": [DeploymentMethods.BROWSER]}
return {
"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]
}

def _stop_manager(self):
self.dm.shutdown()
Expand Down Expand Up @@ -313,8 +321,8 @@ def deploySession(self, sessionId):
completedDrops = []
if "completed" in bottle.request.forms:
completedDrops = bottle.request.forms["completed"].split(",")
self.dm.deploySession(sessionId, completedDrops=completedDrops)
return {}
return self.dm.deploySession(sessionId, completedDrops=completedDrops)
# return {"Status": "Success"}

@daliuge_aware
def cancelSession(self, sessionId):
Expand All @@ -336,7 +344,7 @@ def getGraphStatus(self, sessionId):
@daliuge_aware
def addGraphParts(self, sessionId):
# WARNING: TODO: Somehow, the content_type can be overwritten to 'text/plain'
logger.debug(bottle.request.content_type)
logger.debug("Graph content type: %s", bottle.request.content_type)
if (
"application/json" not in bottle.request.content_type
and "text/plain" not in bottle.request.content_type
Expand All @@ -346,15 +354,16 @@ def addGraphParts(self, sessionId):

# We also accept gzipped content
hdrs = bottle.request.headers
logger.debug("Graph hdr: %s", {k: v for k, v in hdrs.items()})
if hdrs.get("Content-Encoding", None) == "gzip":
json_content = utils.ZlibUncompressedStream(bottle.request.body)
else:
json_content = bottle.request.body

graph_parts = bottle.json_loads(json_content.read())

self.dm.addGraphSpec(sessionId, graph_parts)
return {"graph_parts": graph_parts}
return self.dm.addGraphSpec(sessionId, graph_parts)
# return {"graph_parts": graph_parts}

# ===========================================================================
# non-REST methods
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/etc/init-scripts/dlg-dim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
### BEGIN INIT INFO
# Provides: dlg-dim
# Required-Start: $all
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/etc/init-scripts/dlg-nm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
### BEGIN INIT INFO
# Provides: dlg-nm
# Required-Start: $all
Expand Down
7 changes: 5 additions & 2 deletions daliuge-engine/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,17 @@ def run(self):
lp = sysconfig.get_path("stdlib")
with open(PTH_FILE, "w") as f:
f.write("{0}/dist-packages".format(lp))
install.copy_file(self, PTH_FILE, os.path.join(self.install_lib, PTH_FILE))
install.copy_file(
self, PTH_FILE, os.path.join(self.install_lib, PTH_FILE)
)


# Core requirements of DALiuGE
# Keep alpha-sorted PLEASE!
install_requires = [
"wheel", # need to get wheel first...
"bottle",
"urllib3<1.27,>=1.25.4",
"boto3",
"configobj",
"crc32c",
Expand Down Expand Up @@ -159,7 +162,7 @@ def run(self):
setup(
name="daliuge-engine",
version=get_version_info()[0],
description=u"Data Activated \uF9CA (flow) Graph Engine - Execution Engine",
description="Data Activated \uF9CA (flow) Graph Engine - Execution Engine",
long_description="""
The element of the DALiuGE system executing the workflows. This replaces
the former 'runtime' package (up to version 1.0). For more information
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/manager/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,4 @@ def test_reprostatus_get(self):
def test_submit_method(self):
c = NodeManagerClient(hostname)
response = c.get_submission_method()
self.assertEqual({"methods": [DeploymentMethods.BROWSER]}, response)
self.assertEqual({"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]}, response)
2 changes: 1 addition & 1 deletion daliuge-translator/build_translator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case "$1" in
echo ">>>>> docker-slim output <<<<<<<<<"
docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock dslim/docker-slim build --include-shell \
--include-path /usr/local/lib --include-path /usr/local/bin --include-path /daliuge --include-path /dlg \
--http-probe=true --tag=icrar/daliuge-translator:${VCS_TAG} icrar/daliuge-translator.big:${VCS_TAG}
--http-probe=false --tag=icrar/daliuge-translator:${VCS_TAG} icrar/daliuge-translator.big:${VCS_TAG}
;;
*)
echo "Usage: build_translator.sh <dep|dev|slim>"
Expand Down
4 changes: 2 additions & 2 deletions daliuge-translator/dlg/dropmake/lg.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def _link_drops(self, slgn, tlgn, src_drop, tgt_drop, llink):
elif s_type in ["Application", "Control"]:
sname = slgn._getPortName("outputPorts")
tname = tlgn._getPortName("inputPorts")
logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
# logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
sdrop.addOutput(tdrop, name=sname)
tdrop.addProducer(sdrop, name=tname)
if Categories.BASH_SHELL_APP == s_type:
Expand All @@ -499,7 +499,7 @@ def _link_drops(self, slgn, tlgn, src_drop, tgt_drop, llink):
# could be multiple ports, need to identify
portId = llink["toPort"] if "toPort" in llink else None
tname = tlgn._getPortName("inputPorts", portId=portId)
logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
# logger.debug("Found port names: IN: %s, OUT: %s", sname, tname)
# logger.debug(
# ">>> link from %s to %s (%s) (%s)",
# sname,
Expand Down
5 changes: 4 additions & 1 deletion daliuge-translator/dlg/dropmake/pg_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def unroll(lg, oid_prefix=None, zerorun=False, app=None):
if app:
logger.info("Replacing apps with %s", app)
for dropspec in drop_list:
if "dropclass" in dropspec:
if (
"dropclass" in dropspec
and dropspec["categoryType"] == "Application"
):
dropspec["dropclass"] = app
dropspec["sleep_time"] = (
dropspec["execution_time"]
Expand Down
13 changes: 10 additions & 3 deletions daliuge-translator/dlg/dropmake/pgt.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,17 @@ def to_pg_spec(
where the daliuge system is started up afer submission (e.g. SLURM)
"""
logger.debug(
"tpl_nodes_len: %s, node_list: %s", tpl_nodes_len, node_list
"# worker nodes: %s, node_list(incl. DIM): %s",
tpl_nodes_len,
node_list,
)
if tpl_nodes_len > 0: # generate pg_spec template
if (
len(node_list) == 0 and tpl_nodes_len > 0
): # generate pg_spec template
node_list = range(tpl_nodes_len) # create a fake list for now
tpl_fl = True
else:
tpl_fl = False

if 0 == self._num_parts_done:
raise GPGTException("The graph has not been partitioned yet")
Expand Down Expand Up @@ -314,7 +321,7 @@ def to_pg_spec(
# values = dict(zip(values,range(len(values)))) # dict with new values
# lm = {k:values[v] for (k, v) in lm.items()} # replace old values with new

if tpl_nodes_len:
if tpl_fl:
nm_list = [
"#%s" % x for x in range(nm_len)
] # so that nm_list[i] == '#i'
Expand Down
82 changes: 38 additions & 44 deletions daliuge-translator/dlg/dropmake/web/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,14 @@ async function checkUrlStatus(url) {
async function checkUrlSubmissionMethods(url) {
return new Promise((resolve) => {
$.ajax({
credentials: 'include',
url: url,
type: 'GET',
success: function (response) {
resolve(response)
},
error: function (jqXHR, textStatus, errorThrown) {
resolve({"methods": []})
resolve({ "methods": [] })
},
timeout: 2000
});
Expand Down Expand Up @@ -332,12 +333,12 @@ function saveSettings() {
let deployMethod;
if (!errorFillingOut) {
deployMethod =
{
name: $(this).find(".deployMethodName").val(),
url: $(this).find(".deployMethodUrl").val(),
deployMethod: $(this).find(".deployMethodMethod option:selected").val(),
active: $(this).find(".deployMethodActive").val()
}
{
name: $(this).find(".deployMethodName").val(),
url: $(this).find(".deployMethodUrl").val(),
deployMethod: $(this).find(".deployMethodMethod option:selected").val(),
active: $(this).find(".deployMethodActive").val()
}
console.debug($(this).find(".deployMethodMethod option:selected").val())
deployMethodsArray.push(deployMethod)
}
Expand Down Expand Up @@ -386,7 +387,7 @@ function buildDeployMethodEntry(method, selected) {

function fillOutSettings() {
let deployMethodsArray;
//get setting values from local storage
//get setting values from local storage
const manager_url = window.localStorage.getItem("manager_url");
$("#settingsModalErrorMessage").html('')

Expand Down Expand Up @@ -509,7 +510,7 @@ function makePNG() {
asArray[i] = data.charCodeAt(i);
}

const blob = new Blob([asArray.buffer], {type: "image/png"});
const blob = new Blob([asArray.buffer], { type: "image/png" });
saveAs(blob, pgtName + "_Template.png");
});
}
Expand Down Expand Up @@ -577,19 +578,20 @@ async function directRestDeploy() {

// sessionId must be unique or the request will fail
const lgName = pgtName.substring(0, pgtName.lastIndexOf("_pgt.graph"));
const sessionId = lgName + "-" + Date.now();
const dateId = new Date();
const sessionId = lgName + "-" + dateId.toISOString().replace(/\:/gi, "-");
console.debug("sessionId:", sessionId);

const nodes_url = manager_url + "/api/nodes";

const nodes = await fetch(nodes_url, {
method: 'GET',
mode: request_mode
// mode: request_mode
})
.then(handleFetchErrors)
.then(response => response.json())
.catch(function (error) {
showMessageModal(`Error ${error}\nGetting nodes unsuccessful`);
showMessageModal(`Error ${error}! Getting nodes unsuccessful`);
})

const pgt_url = "/gen_pg?tpl_nodes_len=" + nodes.length.toString() + "&pgt_id=" + pgtName;
Expand All @@ -613,9 +615,10 @@ async function directRestDeploy() {

// request pg_spec from translator
const pg_spec_url = "/gen_pg_spec";
console.debug("pg_spec_request", pg_spec_request_data);
const pg_spec_response = await fetch(pg_spec_url, {
method: 'POST',
mode: request_mode,
mode: 'cors',
headers: {
'Content-Type': 'application/json',
},
Expand All @@ -626,13 +629,14 @@ async function directRestDeploy() {
.catch(function (error) {
showMessageModal('Error', error + "\nGetting pg_spec unsuccessful: Unable to continue!");
});
const session_data = {"sessionId": sessionId};
const session_data = { "sessionId": sessionId };
console.debug("SessionId:", sessionId);
const create_session_url = manager_url + "/api/sessions";
const create_session = await fetch(create_session_url, {
credentials: 'include',
cache: 'no-cache',
method: 'POST',
mode: request_mode,
// mode: request_mode,
referrerPolicy: 'no-referrer',
headers: {
'Content-Type': 'application/json',
Expand All @@ -646,52 +650,39 @@ async function directRestDeploy() {
});
console.debug("create session response", create_session);
// gzip the pg_spec
const buf = fflate.strToU8(JSON.stringify(pg_spec_response.pg_spec));
const buf = fflate.strToU8(JSON.stringify(JSON.parse(pg_spec_response).pg_spec));
const compressed_pg_spec = fflate.zlibSync(buf);
console.debug("compressed_pg_spec", compressed_pg_spec);
// console.debug("compressed_pg_spec", compressed_pg_spec);
// console.debug("pg_spec", compressed_pg_spec);

// append graph to session on engine
const append_graph_url = manager_url + "/api/sessions/" + sessionId + "/graph/append";
const append_graph = await fetch(append_graph_url, {
credentials: 'include',
method: 'POST',
mode: request_mode,
headers: {
'Content-Type': 'application/json',
'Content-Encoding': 'gzip'
},
mode: "no-cors",
referrerPolicy: 'origin',
//body: JSON.stringify(pg_spec_response.pg_spec)
body: new Blob([compressed_pg_spec], {type: 'application/json'})
// body: new Blob([buf])
// body: JSON.stringify(pg_spec_response.pg_spec)
// body: new Blob(compressed_pg_spec, { type: 'application/json' })
body: new Blob([buf], { type: 'application/json' })
})
.then(handleFetchErrors)
.then(response => response.json())
.catch(function (error) {
showMessageModal('Error', error + "\nUnable to continue!");
});
console.debug("append graph response", append_graph);
// deploy graph
// NOTE: URLSearchParams here turns the object into a x-www-form-urlencoded form
const deploy_graph_url = manager_url + "/api/sessions/" + sessionId + "/deploy";
const deploy_graph = await fetch(deploy_graph_url, {
credentials: 'include',
// credentials: 'include',
method: 'POST',
mode: request_mode,
mode: "no-cors",
body: new URLSearchParams({
'completed': pg_spec_response.root_uids,
'completed': JSON.parse(pg_spec_response).root_uids,
})
})
.then(handleFetchErrors)
.then(response => response.json())
.catch(function (error) {
showMessageModal('Error', error + "\nUnable to continue!");
});
//showMessageModal("Chart deployed" , "Check the dashboard of your k8s cluster for status updates.");
const mgr_url = manager_url + "/session?sessionId=" + sessionId;
window.open(mgr_url, '_blank').focus();
}

function jsonEscape(str) {
return str.replace(/\n/g, "\\\\n").replace(/\r/g, "\\\\r").replace(/\t/g, "\\\\t");
}

async function restDeploy() {
// fetch manager host and port from local storage
murl = window.localStorage.getItem("manager_url");
Expand All @@ -706,7 +697,7 @@ async function restDeploy() {
let manager_url = new URL(murl);
console.info("In REST Deploy")

const request_mode = "cors";
const request_mode = "no-cors";
manager_url = manager_url.toString();
if (manager_url.endsWith('/')) {
manager_url = manager_url.substring(0, manager_url.length - 1);
Expand Down Expand Up @@ -734,10 +725,13 @@ async function restDeploy() {
.catch(function (error) {
showMessageModal('Error', error + "\nGetting PGT unsuccessful: Unable to continue!");
});
pgt = JSON.parse(pgt);
if (typeof pgt == "String") {
pgt = JSON.parse(jsonEscape(toString(pgt)));
}
// This is for a deferred start of daliuge, e.g. on SLURM
console.debug("sending request to ", create_slurm_url);
var body = [pgtName, pgt]; // we send the name in the body with the pgt
console.debug("Submission PGT:", JSON.stringify(body));
await fetch(create_slurm_url, {
method: 'POST',
credentials: 'include',
Expand Down
Loading

0 comments on commit fcce6f3

Please sign in to comment.