Skip to content

Commit

Permalink
expose buffer protocol getter for data drops
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Nov 24, 2021
1 parent 7b3be1a commit b80c5b9
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
# MA 02111-1307 USA
#
from abc import abstractmethod, ABCMeta
from overrides import overrides
import io
import logging
import os
import urllib.parse

from typing import Optional
from typing import Optional, Union

from . import ngaslite
from .apps.plasmaflight import PlasmaFlightClient
Expand Down Expand Up @@ -127,6 +128,14 @@ def delete(self):
Deletes the data represented by this DataIO
"""

@abstractmethod
def buffer(self) -> Union[memoryview, bytes, bytearray, pyarrow.Buffer]:
"""
Gets a buffer protocol compatible object of the drop data.
This may be a zero-copy view of the data or a copy depending
on whether the drop stores data in cpu memory or not.
"""

@abstractmethod
def _open(self, **kwargs):
pass
Expand Down Expand Up @@ -210,6 +219,7 @@ class MemoryIO(DataIO):
A DataIO class that reads/write from/into the BytesIO object given at
construction time
"""
_desc: io.BytesIO

def __init__(self, buf: io.BytesIO, **kwargs):
self._buf = buf
Expand Down Expand Up @@ -245,8 +255,12 @@ def exists(self):
def delete(self):
self._buf.close()

def buffer(self):
return self._open().getbuffer()


class FileIO(DataIO):
_desc: io.BufferedReader
def __init__(self, filename, **kwargs):
super(FileIO, self).__init__()
self._fnm = filename
Expand Down Expand Up @@ -282,6 +296,10 @@ def exists(self):
def delete(self):
os.unlink(self._fnm)

@overrides
def buffer(self) -> bytes:
return self._desc.read(-1)


class NgasIO(DataIO):
"""
Expand Down Expand Up @@ -563,6 +581,10 @@ def exists(self):
def delete(self):
pass

def buffer(self):
[data] = self._desc.get_buffers([self._object_id])
return memoryview(data)


class PlasmaFlightIO(DataIO):
def __init__(
Expand Down

0 comments on commit b80c5b9

Please sign in to comment.