Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

BYOB ("Bring Your Own Buffer") read interface #160

Closed
sk1p opened this issue Apr 24, 2018 · 8 comments
Closed

BYOB ("Bring Your Own Buffer") read interface #160

sk1p opened this issue Apr 24, 2018 · 8 comments

Comments

@sk1p
Copy link
Contributor

sk1p commented Apr 24, 2018

Currently, HDFile.read(...) involves both allocation and copying of buffers. When reading locally, with read short circuiting enabled, this can become a bottleneck. Here is HDFile.read annotated with the allocations and copies:

    def read(self, length=None):
        """ Read bytes from open file """
        if not _lib.hdfsFileIsOpenForRead(self._handle):
            raise IOError('File not read mode')
        buffers = []
        buffer_size = self.buff if self.buff != 0 else DEFAULT_READ_BUFFER_SIZE

        if length is None:
            out = 1
            while out:
                out = self.read(buffer_size)
                buffers.append(out)
        else:
            while length:
                bufsize = min(buffer_size, length)
                p = ctypes.create_string_buffer(bufsize)  # <-- allocation for each slice that is read
                ret = _lib.hdfsRead(
                    self._fs, self._handle, p, ctypes.c_int32(bufsize))
                if ret == 0:
                    break
                if ret > 0:
                    if ret < bufsize:
                        buffers.append(p.raw[:ret])   # <-- .raw creates a copy
                    elif ret == bufsize:
                        buffers.append(p.raw)  # <-- .raw again
                    length -= ret
                else:
                    raise IOError('Read file %s Failed:' % self.path, -ret)

        return b''.join(buffers)  # <-- this of course has to create a copy again

I suggest adding the possibility to specify the output buffer, without doing any additional copying/buffering. Here is a prototype implementation that, in one of my tests, speeds up reading large binary data by a factor of about 4:

        def byob_read(self, length, out):
            """
            Read ``length`` bytes from the file into the ``out`` buffer

            ``out`` needs to be a ctypes array, for example created
            with ``ctypes.create_string_buffer``, and must be at least ``length`` bytes long.
            """
            _lib = hdfs3.core._lib
            if not _lib.hdfsFileIsOpenForRead(self._handle):
                raise IOError('File not read mode')
            bufsize = length
            bufpos = 0

            while length:
                bufp = ctypes.byref(out, bufpos)
                ret = _lib.hdfsRead(
                    self._fs, self._handle, bufp, ctypes.c_int32(bufsize - bufpos))
                if ret == 0:  # EOF
                    break
                if ret > 0:
                    length -= ret
                    bufpos += ret
                else:
                    raise IOError('Read file %s Failed:' % self.path, -ret)
            return out

The final interface should probably be something like read(self, length=None, out=None) to support both modes of operation and not pollute the API namespace, and there should be some range checks to prevent overflows.

Thoughts?

By the way, there are still copies happening inside of libhdfs3 which, when patched out/worked around, give another nice speedup (in short: setting input.localread.default.buffersize=1 and patching out checksumming → reads go directly into the buffer given by the user). Where is the libhdfs3 development happening these days? Is ContinuumIO/libhdfs3-downstream the right place to work on this?

@martindurant
Copy link
Member

martindurant commented Apr 24, 2018

Interesting, thanks for providing this!
Is there any possible downside or failure case you can think of?

In fact, we always know the expected size of the buffer from the known file-size, current location and requested read length. Therefore, I would imagine an optional out= on the normal read() method, and, if not given (out=None, the default), we create a buffer up-front. Perhaps we could maintain out=False for a while, retaining the previous behaviour - or just long enough to prove the bechmarking favours your approach. Other read-like methods such as cat and head should pass through the same kwarg.

As far as libhdfs3 is concerned, yes, ContinuumIO/libhdfs3-downstream is the correct place. You will see that there has not been a whole lot of development, and there remains confusion over how to handle all the security possibilities. If you submit a PR there which improves performance, I'll be sure to merge and release it.

@sk1p
Copy link
Contributor Author

sk1p commented Apr 25, 2018

I created a simple self-contained benchmark: https://github.com/LiberTEM/LiberTEM/blob/master/benchmarks/hdfs/bench_buffering.py

On my laptop, this results in:

--- stopping timer old style, delta=28.25197 ---
--- starting timer new style ---
--- stopping timer new style, delta=7.12839 ---
--- starting timer new style w/ realloc ---
--- stopping timer new style w/ realloc, delta=7.13468 ---

Or, with the mentioned libhdfs3 tweaks:

--- starting timer old style ---
--- stopping timer old style, delta=0.52935 ---
--- starting timer new style ---
--- stopping timer new style, delta=0.21415 ---
--- starting timer new style w/ realloc ---
--- stopping timer new style w/ realloc, delta=0.26298 ---

(of course reads are served from fs cache)

So, in perspective, the reallocation costs are not nearly that bad as the buffering and copying.

As to the downside: to communicate short reads (i.e. at the end of the file), the method needs to return the number of bytes read, and not the buffer itself. Thinking about it, there is an alternative: it could return a sliced memory view of the 'valid' part of the buffer, but that may not be obvious for the user, who could just use the whole buffer he passed in...

So, new proposal: add a method read_into(self, length, out) for BYOB-style operations, and express other methods (or just read?) in terms of read_into. There would still be at least one copy involved if the API is supposed to be backward-compatible and always return bytes. Returning bytes or buffers or even the number of bytes read depending on the out parameter may be confusing.

People who need the performance would need to use read_into(...) for now...

@martindurant
Copy link
Member

Yes, agree that readinto (no underscore!) is exactly the right method name, "Read up to len(b) bytes into b", whereb is a bytearray or memoryview.

I propose read(length=None, buffer=None) where

  • buffer=None (default) produces bytes objects. Note that from what you say, it is faster to create a bytearray of size (self.size - self.loc) if length in [None, -1] else length and fill it, and convert to bytes when returning
  • buffer=True, create the bytearray as above, and return it without conversion to bytes
  • buffer=b same as readinto, except raise exception if len(b) doesn't fix the data.

@mrocklin
Copy link
Member

mrocklin commented Apr 25, 2018 via email

@martindurant
Copy link
Member

Hah. I suppose we are best of keeping to the builtin file standard.

@sk1p
Copy link
Contributor Author

sk1p commented Apr 26, 2018

So this is what I came up with:

        def readinto(self, length, out):
            """
            Read up to ``length`` bytes from the file into the ``out`` buffer,
            which can be of any type that implements the buffer protocol (example: bytearray,
            memoryview, numpy array, ...).

            Returns the number of bytes read.
            """
            _lib = hdfs3.core._lib
            if not _lib.hdfsFileIsOpenForRead(self._handle):
                raise IOError('File not read mode')
            bufsize = length
            bufpos = 0

            # convert from buffer protocol to ctypes-compatible type
            out = memoryview(out)
            buf_for_ctypes = (ctypes.c_byte * out.nbytes).from_buffer(out)

            while length:
                bufp = ctypes.byref(buf_for_ctypes, bufpos)
                ret = _lib.hdfsRead(
                    self._fs, self._handle, bufp, ctypes.c_int32(bufsize - bufpos))
                if ret == 0:  # EOF
                    break
                if ret > 0:
                    length -= ret
                    bufpos += ret
                else:
                    raise IOError('Read file %s Failed:' % self.path, -ret)
            return bufpos

        @property
        def size(self):
            if self._size is None:
                self._size = self.info()['size']
            return self._size

        def read(self, length=None, buffer_=None):
            return_buffer = True
            max_read = self.size - self.tell()
            read_length = max_read if length in [None, -1] else length

            if buffer_ is None:
                return_buffer = False
            if buffer_ is None or buffer_ is False:
                buffer_ = memoryview(bytearray(read_length))
            else:
                buffer_ = memoryview(buffer_)
                if buffer_.nbytes < read_length:
                    raise IOError('buffer too small (%d < %d)' % (buffer_.nbytes, read_length))

            bytes_read = self.read_into(length=read_length, out=buffer_)

            if bytes_read < buffer_.nbytes:
                buffer_ = buffer_[:bytes_read]

            if return_buffer:
                return buffer_
            return buffer_.tobytes()

Some notes:

  • I don't have an opinion on read_into vs readinto, I guess with underscore is the older variant used in socket, without underscore the newer variant used in io?
  • Is caching the file size ok? I'm not so sure about the semantics of HDFS. Without the cache, significant time is spent requesting the info
  • buffer_ with underscore as we shadow the builtin buffer otherwise
  • range check may be a bit aggressive, if the buffer is larger than max_read but smaller than length it raises without real danger of overflow. I guess read_length = min(max_read, read_length) could work?
  • the buffer is always converted to a memoryview; I didn't find a reliable way to know its size otherwise

Let me know if I should start a pull request for this, if it is easier to review.

@martindurant
Copy link
Member

Yes, I think this is definitely along the right lines. My thoughts

  • use readinto() - it's still called this in io
  • the size could be found and stored in the File's init method, which is what other FileSystem implementations do. I would call it size without the underscore, it may actually be useful for people
  • buffer_ is a bit untidy, how about out_buffer?
  • if buffer_.nbytes < min(read_length, max_read), agreed
  • if bytes_read < buffer_.nbytes: should also cause a logger warning, since it is not expected.
  • "the buffer is always converted" - no problem, this takes no time.

Some tests covering the different options would be nice.
I like that you can directly write into a numpy buffer.

@sk1p
Copy link
Contributor Author

sk1p commented May 2, 2018

I think I addressed all your notes in #162 . I think if bytes_read < buffer_.nbytes can happen in a normal situation if you manually pass in a large buffer, but the current file position is near the end.

I liked the docker container for testing, it worked well, at least on my work laptop. My personal laptop didn't really enjoy the stress testing :)

@sk1p sk1p closed this as completed May 4, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants