Skip to content

Commit

Permalink
added raw output, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Apr 12, 2024
1 parent b9fbf6c commit e96198c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 16 deletions.
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ def write_results(self, result):
o.write(repr(r).encode("utf-8"))
elif self.output_parser is DropParser.NPY:
drop_loaders.save_npy(o, r)
elif self.output_parser is DropParser.RAW:
o.write(r)
else:
ValueError(self.output_parser.__repr__())

Expand Down
12 changes: 4 additions & 8 deletions daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,13 @@ def read(self, descriptor, count=65536, **kwargs):
def _checkStateAndDescriptor(self, descriptor):
if self.status != DROPStates.COMPLETED:
raise Exception(
"%r is in state %s (!=COMPLETED), cannot be read"
% (self, self.status)
"%r is in state %s (!=COMPLETED), cannot be read" % (self, self.status)
)
if descriptor is None:
raise ValueError("Illegal empty descriptor given")
if descriptor not in self._rios:
raise Exception(
"Illegal descriptor %d given, remember to open() first"
% (descriptor)
"Illegal descriptor %d given, remember to open() first" % (descriptor)
)

def isBeingRead(self):
Expand All @@ -200,10 +198,8 @@ def write(self, data: Union[bytes, memoryview], **kwargs):
if self.status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("No more writing expected")

if not isinstance(data, (bytes, memoryview)):
raise Exception(
"Data type not of binary type: %s", type(data).__name__
)
if not isinstance(data, (bytes, memoryview, str)):
raise Exception("Data type not of binary type: ", type(data).__name__)

# We lazily initialize our writing IO instance because the data of this
# DROP might not be written through this DROP
Expand Down
10 changes: 2 additions & 8 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
Utility methods and classes to be used when interacting with DROPs
"""

import base64
import collections
import io
import time
import logging
import pickle
import re
import threading
import traceback
Expand Down Expand Up @@ -139,9 +137,7 @@ def allDropContents(drop, bufsize=65536) -> bytes:
return buf.getvalue()


def copyDropContents(
source: "DataDROP", target: "DataDROP", bufsize: int = 65536
):
def copyDropContents(source: "DataDROP", target: "DataDROP", bufsize: int = 65536):
"""
Manually copies data from one DROP into another, in bufsize steps
"""
Expand All @@ -151,9 +147,7 @@ def copyDropContents(
logger.debug("Read %d bytes from %s", len(buf), repr(source))
st = time.time()
ssize = source.size if source.size is not None else -1
logger.debug(
"Source size: %s; Source checksum: %s", ssize, source.checksum
)
logger.debug("Source size: %s; Source checksum: %s", ssize, source.checksum)
tot_w = 0
ofl = True
# target._expectedSize = ssize
Expand Down

0 comments on commit e96198c

Please sign in to comment.