Skip to content

Commit

Permalink
Merge pull request #259 from ICRAR/LIU-375_SupportInputNamedPortsForFile
Browse files Browse the repository at this point in the history
LIU-375: Support input Named Ports for FileDROPS
  • Loading branch information
myxie committed Jul 8, 2024
2 parents 2c4049f + 42539f3 commit 779cde4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
69 changes: 69 additions & 0 deletions daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,75 @@ def checksumType(self, value):
)
self._checksumType = value

def _map_input_ports_to_params(self):
"""
Map the input ports that are on the drop to the its parameters
This method performs the following steps:
- Iterate through each producer, finding the portname and value of the
port associated with that producers
- Iterate through the input port names of _this_ drop, and match the UID
and name to the producer map, and then getting the value.
- Finally, match the value of the named input drop with a DROP parameter (
if it exists).
It is expected that this method be used within the child DataDrop class that is
inheriting this method; see the FileDrop class implemenetation for an example
use case.
"""

try:
dropInputPorts = self.parameters['producers']
except KeyError:
logging.debug("No producers available for drop: %s", self.uid)
return

producerPortValueMap = {} # Map Producer UIDs to a portname
finalDropPortMap = {} # Final mapping of named port to value stored in producer

for p in self.producers:
params = p.parameters['outputs']
for param in params:
try:
key = list(param.keys())[0]
except AttributeError:
logging.debug("Producer %s does not have named ports", p.uid)
continue
portName = param[key]
portValue = ""
producerUid = p.uid
if portName in p.parameters:
portValue = p.parameters[param[key]]
# TODO This currently only allows 1 UID -> Portname/Value
# Investigate UID -> [Portname1:Value1, Portnam2:value2,..,]
producerPortValueMap[producerUid] = {"portname": portName,
"value": portValue}

for port in dropInputPorts:
try:
port.items()
except AttributeError:
logging.debug("Producer %s does not have named ports", p.uid)
continue
for uid, portname in port.items():
try:
print(uid, portname)
tmp = producerPortValueMap[uid]
if tmp['portname'] == portname:
finalDropPortMap[portname] = tmp['value']
except KeyError:
print("Not available")

for portname in finalDropPortMap:
if portname in self.parameters:
self.parameters[portname] = finalDropPortMap[portname]

self._updatedPorts = True


@abstractmethod
def getIO(self) -> DataIO:
"""
Expand Down
12 changes: 12 additions & 0 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# MA 02111-1307 USA
#
import errno
import logging
import os
import re

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, *args, **kwargs):
if kwargs["persist"] and "lifespan" not in kwargs:
kwargs["expireAfterUse"] = False
self.is_dir = False
self._updatedPorts = False
super().__init__(*args, **kwargs)

def sanitize_paths(self, filepath: str) -> Union[None, str]:
Expand Down Expand Up @@ -109,6 +111,10 @@ def initialize(self, **kwargs):
"""
FileDROP-specific initialization.
"""

self._setupFilePaths()

def _setupFilePaths(self):
filepath = self.parameters.get("filepath", None)
dirname = None
filename = None
Expand Down Expand Up @@ -157,6 +163,12 @@ def initialize(self, **kwargs):
self._wio = None

def getIO(self):

# We need to update named_ports now we have runtime information
if not self._updatedPorts:
self._map_input_ports_to_params()
self._setupFilePaths()

return FileIO(self._path)

def delete(self):
Expand Down

0 comments on commit 779cde4

Please sign in to comment.