Skip to content

Commit

Permalink
Merge f0ecde2 into 811e55c
Browse files Browse the repository at this point in the history
  • Loading branch information
james-strauss-uwa committed Mar 8, 2022
2 parents 811e55c + f0ecde2 commit 70fbd8b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 209 deletions.
72 changes: 25 additions & 47 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1855,6 +1855,17 @@ def dataURL(self):
return "mem://%s/%d/%d" % (hostname, os.getpid(), id(self._buf))


##
# @brief SharedMemory
# @details Data stored in shared memory
# @par EAGLE_START
# @param category SharedMemory
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @par EAGLE_END
class SharedMemoryDROP(DataDROP):
"""
A DROP that points to data stored in shared memory.
Expand Down Expand Up @@ -2118,16 +2129,17 @@ def exists(self):
# @brief Plasma
# @details An object in a Apache Arrow Plasma in-memory object store
# @par EAGLE_START
# @par category Plasma
# @param[in] param/data_volume Data volume/5/Float/readwrite/
# @param category Plasma
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] param/group_end Group end/False/Boolean/readwrite/
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @param[in] param/plasma_path Plasma Path//String/readwrite/
# @param[in] cparam/plasma_path Plasma Path//String/readwrite/False//False/
# \~English Path to the local plasma store
# @param[in] param/object_id Object Id//String/readwrite/
# @param[in] cparam/object_id Object Id//String/readwrite/False//False/
# \~English PlasmaId of the object for all compute nodes
# @param[in] param/use_staging Use Staging/False/Boolean/readwrite/
# @param[in] cparam/use_staging Use Staging/False/Boolean/readwrite/False//False/
# \~English Enables writing to a dynamically resizeable staging buffer
# @par EAGLE_END
class PlasmaDROP(DataDROP):
Expand Down Expand Up @@ -2162,16 +2174,17 @@ def dataURL(self):
# @details An Apache Arrow Flight server providing distributed access
# to a Plasma in-memory object store
# @par EAGLE_START
# @par category Plasma
# @param[in] param/data_volume Data volume/5/Float/readwrite/
# @param category PlasmaFlight
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] param/group_end Group end/False/Boolean/readwrite/
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @param[in] param/plasma_path Plasma Path//String/readwrite/
# @param[in] cparam/plasma_path Plasma Path//String/readwrite/False//False/
# \~English Path to the local plasma store
# @param[in] param/object_id Object Id//String/readwrite/
# @param[in] cparam/object_id Object Id//String/readwrite/False//False/
# \~English PlasmaId of the object for all compute nodes
# @param[in] param/flight_path Flight Path//String/readwrite/
# @param[in] cparam/flight_path Flight Path//String/readwrite/False//False/
# \~English IP and flight port of the drop owner
# @par EAGLE_END
class PlasmaFlightDROP(DataDROP):
Expand Down Expand Up @@ -2647,23 +2660,6 @@ def execute(self, _send_notifications=True):
self._notifyAppIsFinished()


##
# @brief Plasma
# @details An object in a Apache Arrow Plasma in-memory object store
# @par EAGLE_START
# @param category Plasma
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @param[in] cparam/plasma_path Plasma Path//String/readwrite/False//False/
# \~English Path to the local plasma store
# @param[in] cparam/object_id Object Id//String/readwrite/False//False/
# \~English PlasmaId of the object for all compute nodes
# @param[in] cparam/use_staging Use Staging/False/Boolean/readwrite/False//False/
# \~English Enables writing to a dynamically resizeable staging buffer
# @par EAGLE_END
class PlasmaDROP(AbstractDROP):
"""
A DROP that points to data stored in a Plasma Store
Expand Down Expand Up @@ -2691,24 +2687,6 @@ def dataURL(self):
return "plasma://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief PlasmaFlight
# @details An Apache Arrow Flight server providing distributed access
# to a Plasma in-memory object store
# @par EAGLE_START
# @param category PlasmaFlight
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @param[in] cparam/plasma_path Plasma Path//String/readwrite/False//False/
# \~English Path to the local plasma store
# @param[in] cparam/object_id Object Id//String/readwrite/False//False/
# \~English PlasmaId of the object for all compute nodes
# @param[in] cparam/flight_path Flight Path//String/readwrite/False//False/
# \~English IP and flight port of the drop owner
# @par EAGLE_END
class PlasmaFlightDROP(AbstractDROP):
"""
A DROP that points to data stored in a Plasma Store
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/parset_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/Estimated size of the data contained in this node
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/Is this node the end of a group?
# @param[in] aparam/mode Parset mode/"YANDA"/String/readonly/False//False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] aparam/config_data ConfigData/""/String/readwrite/False//False/Additional configuration information to be mixed in with the initial data
# @param[in] cparam/mode Parset mode/"YANDA"/String/readonly/False//False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] cparam/config_data ConfigData/""/String/readwrite/False//False/Additional configuration information to be mixed in with the initial data
# @param[out] port/Config ConfigFile/File/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(AbstractDROP):
Expand Down
189 changes: 29 additions & 160 deletions tools/xml2palette/xml2palette.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

KNOWN_PARAM_DATA_TYPES = ["String", "Integer", "Float", "Complex", "Boolean", "Select", "Password", "Json"]
KNOWN_CONSTRUCT_TYPES = ["Scatter", "Gather"]
KNOWN_DATA_CATEGORIES = ["File", "Memory", "SharedMemory", "NGAS", "ParameterSet", "S3", "Plasma", "PlasmaFlight"]


def get_options_from_command_line(argv):
Expand Down Expand Up @@ -150,164 +151,32 @@ def find_field_by_name(fields, name):
return None


def add_required_fields_for_category(text, fields, category):
def check_required_fields_for_category(text, fields, category):
if category in ["DynlibApp", "PythonApp", "Branch", "BashShellApp", "Mpi", "Docker"]:
add_field_if_missing(
text,
fields,
"execution_time",
"Execution time",
5,
"Estimated execution time",
"readwrite",
"Float",
False,
[],
False,
)
add_field_if_missing(
text,
fields,
"num_cpus",
"Num CPUs",
1,
"Number of cores used",
"readwrite",
"Integer",
False,
[],
False,
)
alert_if_missing(text, fields, "execution_time")
alert_if_missing(text, fields, "num_cpus")

if category in ["DynlibApp", "PythonApp", "Branch", "BashShellApp", "Docker"]:
add_field_if_missing(
text,
fields,
"group_start",
"Group start",
"false",
"Component is start of a group",
"readwrite",
"Boolean",
False,
[],
False,
)
alert_if_missing(text, fields, "group_start")

if category == "DynlibApp":
add_field_if_missing(text, fields, "libpath", "Library path", "", "", "readwrite", "String", False, [], False)
alert_if_missing(text, fields, "libpath")

if category in ["PythonApp", "Branch"]:
add_field_if_missing(
text,
fields,
"appclass",
"Appclass",
"dlg.apps.simple.SleepApp",
"Application class",
"readwrite",
"String",
False,
[],
False,
)
alert_if_missing(text, fields, "appclass")

if category in ["File", "Memory", "NGAS", "ParameterSet", "Plasma", "PlasmaFlight", "S3"]:
add_field_if_missing(
text,
fields,
"data_volume",
"Data volume",
5,
"Estimated size of the data contained in this node",
"readwrite",
"Integer",
False,
[],
False,
)
alert_if_missing(text, fields, "data_volume")

if category in ["File", "Memory", "NGAS", "ParameterSet", "Plasma", "PlasmaFlight", "S3", "Mpi"]:
add_field_if_missing(
text,
fields,
"group_end",
"Group end",
"false",
"Component is end of a group",
"readwrite",
"Boolean",
False,
[],
False,
)
alert_if_missing(text, fields, "group_end")

if category in ["BashShellApp", "Mpi", "Docker", "Singularity"]:
add_field_if_missing(
text,
fields,
"input_redirection",
"Input redirection",
"",
"The command line argument that specifies the input into this application",
"readwrite",
"String",
False,
[],
False,
)
add_field_if_missing(
text,
fields,
"output_redirection",
"Output redirection",
"",
"The command line argument that specifies the output from this application",
"readwrite",
"String",
False,
[],
False,
)
add_field_if_missing(
text,
fields,
"command_line_arguments",
"Command line arguments",
"",
"Additional command line arguments to be added to the command line to be executed",
"readwrite",
"String",
False,
[],
False,
)
add_field_if_missing(
text,
fields,
"paramValueSeparator",
"Param Value Separator",
" ",
"Separator character(s) between parameters on the command line",
"readwrite",
"String",
False,
[],
False,
)
add_field_if_missing(
text,
fields,
"argumentPrefix",
"Argument prefix",
"--",
"Prefix to each keyed argument on the command line",
"readwrite",
"String",
False,
[],
False,
)
alert_if_missing(text, fields, "input_redirection")
alert_if_missing(text, fields, "output_redirection")
alert_if_missing(text, fields, "command_line_arguments")
alert_if_missing(text, fields, "paramValueSeparator")
alert_if_missing(text, fields, "argumentPrefix")


def create_field(internal_name, name, value, description, access, type, precious, options, positional):
Expand All @@ -325,14 +194,9 @@ def create_field(internal_name, name, value, description, access, type, precious
}


def add_field_if_missing(text, fields, internal_name, name, value, description, access, type, precious, options, positional):
def alert_if_missing(text, fields, internal_name):
if find_field_by_name(fields, internal_name) is None:
logging.warning(
text + " component added missing " + internal_name + " cparam"
)
fields.append(
create_field(internal_name, name, value, description, access, type, precious, options, positional)
)
logging.warning(text + " component missing " + internal_name + " cparam")


def parse_key(key):
Expand Down Expand Up @@ -482,7 +346,6 @@ def create_palette_node_from_params(params):
category = ""
tag = ""
construct = ""
categoryType = ""
inputPorts = []
outputPorts = []
inputLocalPorts = []
Expand Down Expand Up @@ -578,6 +441,12 @@ def create_palette_node_from_params(params):
text + " aparam '" + name + "' has unknown type: " + type
)

# check that category if suitable for aparams
if category in KNOWN_DATA_CATEGORIES:
logging.warning(
text + " has aparam, which is not suitable for a " + category + " node"
)

# check that a param of type "Select" has some options specified,
# and check that every param with some options specified is of type "Select"
if type == "Select" and len(options) == 0:
Expand Down Expand Up @@ -662,8 +531,8 @@ def create_palette_node_from_params(params):
else:
logging.warning("Unknown port direction: " + direction)

# add extra fields that must be included for the category
add_required_fields_for_category(text, fields, category)
# check for presence of extra fields that must be included for each category
check_required_fields_for_category(text, fields, category)

# create and return the node
return ({
Expand Down Expand Up @@ -952,11 +821,11 @@ def create_construct_node(type, node):
logging.info("Adding component: " + node["text"])
nodes.append(node)

# if a construct is found, add to nodes
if data["construct"] != "":
logging.info("Adding component: " + data["construct"] + "/" + node["text"])
construct_node = create_construct_node(data["construct"], node)
nodes.append(construct_node)
# if a construct is found, add to nodes
if data["construct"] != "":
logging.info("Adding component: " + data["construct"] + "/" + node["text"])
construct_node = create_construct_node(data["construct"], node)
nodes.append(construct_node)

# check if gitrepo and version params were found and cache the values
for param in params:
Expand Down

0 comments on commit 70fbd8b

Please sign in to comment.