From b80c5b91b57a65967911fda8fc1c863bb35e11ad Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Thu, 25 Nov 2021 01:08:22 +0800 Subject: [PATCH] expose buffer protocol getter for data drops --- daliuge-engine/dlg/io.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index 6fc174172..08c235b47 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -20,12 +20,13 @@ # MA 02111-1307 USA # from abc import abstractmethod, ABCMeta +from overrides import overrides import io import logging import os import urllib.parse -from typing import Optional +from typing import Optional, Union from . import ngaslite from .apps.plasmaflight import PlasmaFlightClient @@ -127,6 +128,14 @@ def delete(self): Deletes the data represented by this DataIO """ + @abstractmethod + def buffer(self) -> Union[memoryview, bytes, bytearray, pyarrow.Buffer]: + """ + Gets a buffer protocol compatible object of the drop data. + This may be a zero-copy view of the data or a copy depending + on whether the drop stores data in cpu memory or not. + """ + @abstractmethod def _open(self, **kwargs): pass @@ -210,6 +219,7 @@ class MemoryIO(DataIO): A DataIO class that reads/write from/into the BytesIO object given at construction time """ + _desc: io.BytesIO def __init__(self, buf: io.BytesIO, **kwargs): self._buf = buf @@ -245,8 +255,12 @@ def exists(self): def delete(self): self._buf.close() + def buffer(self): + return self._open().getbuffer() + class FileIO(DataIO): + _desc: io.BufferedReader def __init__(self, filename, **kwargs): super(FileIO, self).__init__() self._fnm = filename @@ -282,6 +296,10 @@ def exists(self): def delete(self): os.unlink(self._fnm) + @overrides + def buffer(self) -> bytes: + return self._desc.read(-1) + class NgasIO(DataIO): """ @@ -563,6 +581,10 @@ def exists(self): def delete(self): pass + def buffer(self): + [data] = self._desc.get_buffers([self._object_id]) + return memoryview(data) + class PlasmaFlightIO(DataIO): def __init__(