Skip to content

Commit

Permalink
Added backwards compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed May 7, 2023
1 parent b3716b5 commit bd322a3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 29 deletions.
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ThreadingWSGIServer(

class LoggingWSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
def log_message(self, fmt, *args):
logger.debug(fmt, *args)
pass
# logger.debug(fmt, *args)


class RestServerWSGIServer:
Expand Down Expand Up @@ -143,7 +144,9 @@ def _get_json(self, url):
def _post_form(self, url, content=None):
if content is not None:
content = urllib.parse.urlencode(content)
ret = self._POST(url, content, content_type="application/x-www-form-urlencoded")
ret = self._POST(
url, content, content_type="application/x-www-form-urlencoded"
)
return json.load(ret) if ret else None

def _post_json(self, url, content, compress=False):
Expand Down Expand Up @@ -177,10 +180,11 @@ def _DELETE(self, url):
return stream

def _request(self, url, method, content=None, headers={}):

# Do the HTTP stuff...
url = self.url_prefix + url
logger.debug("Sending %s request to %s:%d%s", method, self.host, self.port, url)
logger.debug(
"Sending %s request to %s:%d%s", method, self.host, self.port, url
)

if not common.portIsOpen(self.host, self.port, self.timeout):
raise RestClientException(
Expand All @@ -198,7 +202,6 @@ def _request(self, url, method, content=None, headers={}):

# Server errors are encoded in the body as json content
if self._resp.status != http.HTTPStatus.OK:

msg = "Error on remote %s@%s:%s%s (status %d): " % (
method,
self.host,
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ def parent(self, parent):
if self._parent and parent:
logger.warning(
"A parent is already set in %r, overwriting with new value",
self,
self.oid,
)
if parent:
prevParent = self._parent
Expand Down Expand Up @@ -927,7 +927,7 @@ def addConsumer(self, consumer, back=True):
# Add the reverse reference too automatically
if cuid in self._consumers_uids:
return
logger.debug("Adding new consumer %r to %r", consumer, self)
# logger.debug("Adding new consumer %r to %r", consumer.oid, self.oid)
self._consumers.append(consumer)

# Subscribe the consumer to events sent when this DROP moves to
Expand All @@ -941,7 +941,7 @@ def addConsumer(self, consumer, back=True):

# Automatic back-reference
if back and hasattr(consumer, "addInput"):
logger.debug("Adding back %r as input of %r", self, consumer)
logger.debug("Adding back %r as input of %r", self.oid, consumer)
consumer.addInput(self, False)

# Add reproducibility subscription
Expand Down
24 changes: 17 additions & 7 deletions daliuge-engine/dlg/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,35 @@ def __init__(self):
Union[str, object], list[EventHandler]
] = defaultdict(list)

def subscribe(self, listener: EventHandler, eventType: Optional[str] = None):
def subscribe(
self, listener: EventHandler, eventType: Optional[str] = None
):
"""
Subscribes `listener` to events fired by this object. If `eventType` is
not `None` then `listener` will only receive events of `eventType` that
originate from this object, otherwise it will receive all events.
"""
logger.debug(
"Adding listener to %r eventType=%s: %r", self, eventType, listener
)
# logger.debug(
# "Adding listener to %r eventType=%s: %r",
# self,
# eventType,
# listener,
# )
eventType = eventType or EventFirer.__ALL_EVENTS
self._listeners[eventType].append(listener)

def unsubscribe(self, listener: EventHandler, eventType: Optional[str] = None):
def unsubscribe(
self, listener: EventHandler, eventType: Optional[str] = None
):
"""
Unsubscribes `listener` from events fired by this object.
"""
logger.debug(
"Removing listener to %r eventType=%s: %r", self, eventType, listener
)
"Removing listener to %r eventType=%s: %r",
self.oid,
eventType,
listener.oid,
) if hasattr(listener, "oid") else None

eventType = eventType or EventFirer.__ALL_EVENTS
if listener in self._listeners[eventType]:
Expand Down
1 change: 0 additions & 1 deletion daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ def createGraphFromDropSpecList(dropSpecList, session=None):
if not droputils.getUpstreamObjects(drop):
roots.append(drop)
logger.info("%d graph roots found, bye-bye!", len(roots))
logger.debug("Graph spec: %s", drops.values())

return roots

Expand Down
48 changes: 35 additions & 13 deletions daliuge-translator/dlg/dropmake/lg_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,17 @@ def dropclass(self, default_value):
default_value = "dlg.apps.simple.SleepApp"
if self.jd["categoryType"] == CategoryType.DATA:
self.is_data = True
keys = ["dropclass", "Data class"]
keys = [
"dropclass",
"Data class",
"dataclass",
]
elif self.jd["categoryType"] == CategoryType.APPLICATION:
keys = [
"dropclass",
"Application Class",
"Application class",
"appclass",
]
self.is_app = True
elif self.jd["categoryType"] in [
Expand Down Expand Up @@ -558,9 +563,6 @@ def gather_width(self):
TODO: use OO style to replace all type-related statements!
"""
return None
# raise GraphException(
# "Non-Gather LGN {0} does not have gather_width".format(self.id)
# )

@property
def groupby_width(self):
Expand All @@ -579,11 +581,6 @@ def groupby_width(self):
return self._grpw
else:
return None
# raise GraphException(
# "Non-GroupBy LGN {0} does not have groupby_width".format(
# self.id
# )
# )

@property
def group_by_scatter_layers(self):
Expand Down Expand Up @@ -956,7 +953,12 @@ def _create_listener_drops(self, drop_spec):

def _create_app_drop(self, drop_spec):
# default generic component becomes "sleep and copy"
if "appclass" in self.jd:
self.dropclass = self.jd["appclass"]
app_class = self.dropclass
logger.debug("Creating app drop using class: %s", self.dropclass)
if self.dropclass is None or self.dropclass == "":
logger.debug("No dropclass found in: %s", self)
app_class = "dlg.apps.simple.SleepApp"
else:
app_class = self.dropclass
Expand Down Expand Up @@ -987,6 +989,29 @@ def _create_app_drop(self, drop_spec):
drop_spec.update(kwargs)
return drop_spec

def _create_data_drop(self, drop_spec):
# backwards compatibility
if "dataclass" in self.jd:
self.dropclass = self.jd["dataclass"]
if (
not hasattr(self, "dropclass")
or self.dropclass == "dlg.apps.simple.SleepApp"
):
if self.category == "File":
self.dropclass = "dlg.data.drops.file.FileDROP"
elif self.category == "Memory":
self.dropclass = "dlg.data.drops.memory.InMemoryDROP"
else:
raise TypeError("Unknown data class for drop: %s", self.jd)
logger.debug("Creating data drop using class: %s", self.dropclass)
kwargs = {}
kwargs["dropclass"] = self.dropclass
kwargs["weight"] = self.weight
if self.is_start_listener:
drop_spec = self._create_listener_drops(drop_spec)
drop_spec.update(kwargs)
return drop_spec

def make_single_drop(self, iid="0", **kwargs):
"""
make only one drop from a LG nodes
Expand All @@ -1013,10 +1038,7 @@ def make_single_drop(self, iid="0", **kwargs):
)
drop_spec.update(kwargs)
if self.is_data:
# almost nothing special for data drops
kwargs["weight"] = self.weight
if self.is_start_listener:
drop_spec = self._create_listener_drops(drop_spec)
drop_spec = self._create_data_drop(drop_spec)
elif self.is_app:
drop_spec = self._create_app_drop(drop_spec)
elif self.category == Categories.GROUP_BY:
Expand Down

0 comments on commit bd322a3

Please sign in to comment.