Skip to content

Commit

Permalink
async datadrop
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Mar 29, 2022
1 parent 8ef4a74 commit a7c875b
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import sys
import inspect
import binascii
from typing import List, Optional, Union
from typing import AsyncIterable, Dict, List, Optional, Union
from overrides import overrides

import numpy as np
Expand Down Expand Up @@ -278,15 +278,15 @@ def __init__(self, oid, uid, **kwargs):
# The DataIO instance we use in our write method. It's initialized to
# None because it's lazily initialized in the write method, since data
# might be written externally and not through this DROP
self._wio = None
self._wio: Optional[DataIO] = None

# DataIO objects used for reading.
# Instead of passing file objects or more complex data types in our
# open/read/close calls we use integers, mainly because Pyro doesn't
# handle file types and other classes (like StringIO) well, but also
# because it requires less transport.
# TODO: Make these threadsafe, no lock around them yet
self._rios = {}
self._rios: Dict[int, DataIO] = {}

# The execution mode.
# When set to DROP (the default) the graph execution will be driven by
Expand Down Expand Up @@ -993,8 +993,8 @@ def open(self, **kwargs):
)
)

io = self._getIO()
logger.debug("Opening drop %s" % (self.oid))
io = self._getIO()
io.open(OpenMode.OPEN_READ, **kwargs)

# Save the IO object in the dictionary and return its descriptor instead
Expand All @@ -1021,8 +1021,8 @@ def close(self, descriptor, **kwargs):

# Decrement counter and then actually close
self.decrRefCount()
io = self._rios.pop(descriptor)
io.close(**kwargs)
bio = self._rios.pop(descriptor)
bio.close(**kwargs)

def _closeWriters(self):
"""
Expand All @@ -1045,6 +1045,11 @@ def read(self, descriptor, count=4096, **kwargs):
io = self._rios[descriptor]
return io.read(count, **kwargs)

async def readStream(self, descriptor, **kwargs) -> AsyncIterable:
self._checkStateAndDescriptor(descriptor)
io = self._rios[descriptor]
return await io.readStream()

def _checkStateAndDescriptor(self, descriptor):
if self.status != DROPStates.COMPLETED:
raise Exception(
Expand Down Expand Up @@ -1140,6 +1145,9 @@ def write(self, data: Union[bytes, memoryview], **kwargs):

return nbytes

async def writeStream(self, stream: AsyncIterable, **kwargs):
raise NotImplementedError

def _updateChecksum(self, chunk):
# see __init__ for the initialization to None
if self._checksum is None:
Expand Down

0 comments on commit a7c875b

Please sign in to comment.