Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement] Stream blocks #6

Closed
thomascoquet opened this issue Sep 15, 2021 · 3 comments
Closed

[improvement] Stream blocks #6

thomascoquet opened this issue Sep 15, 2021 · 3 comments

Comments

@thomascoquet
Copy link
Contributor

Hello Thomas,

I would like to suggest an improvement over the current main interface KeyReaderAt. Taking a look at the handler implementations, it appears most of them use io.ReadFull(r.Body, p) to return the buffer to the adapter.

Taking advantage of that, I think we could define as well a KeyStreamerAt interface:

type KeyStreamerAt interface {
	StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)
}

It is to io.SectionReader what is osio.KeyReaderAt to io.ReaderAt.

The key idea is that when ranges are fetched, all mutexes are blocked whereas we could release them progressively to decrease the contention on other reads.

Example:

  • GDAL needs range .BCDEF for first read,
  • GDAL needs range AB for second read.

The second read needs to wait for the block range request to finish before serving the second range. With the new implementation, the Adapter can return sooner for the second read.

It gives something like that in my current implementation for the adapter:

	if nToFetch == len(blocks) && a.canStream {
		r, err := a.srcStreamAt(key, rng.start*a.blockSize, (rng.end-rng.start+1)*a.blockSize)
		if err != nil {
			for i := rng.start; i <= rng.end; i++ {
				blockID := a.blockKey(key, i)
				a.blmu.Unlock(blockID)
			}
			return nil, err
		}
		defer r.Close()
		for bid := int64(0); bid <= rng.end-rng.start; bid++ {
			blockID := a.blockKey(key, bid+rng.start)
			buf := make([]byte, a.blockSize)
			n, err := io.ReadFull(r, buf)
			if err == io.ErrUnexpectedEOF {
				err = io.EOF
			}
			if err == nil || err == io.EOF {
				blocks[bid] = buf[:n]
				a.cache.Add(key, uint(rng.start+bid), blocks[bid])
			}
			if err != nil {
				for i := rng.start + bid; i <= rng.end; i++ {
					a.blmu.Unlock(a.blockKey(key, i))
				}
				if err == io.EOF {
					break
				}
				return nil, err
			}
			a.blmu.Unlock(blockID)
		}
		return blocks, nil
	}

It could be backward compatible by simple checking if the handler implements the interface:

func NewAdapter(reader KeyReaderAt, opts ...AdapterOption) (*Adapter, error) {
	_, canStream := reader.(KeyStreamerAt)
	bc := &Adapter{
		...
		canStream:       canStream,
	}
        ...
}

Thomas

@thomascoquet thomascoquet changed the title [improvement] [improvement] Stream blocks Sep 15, 2021
@tbonfort
Copy link
Member

Seems reasonable to me, this also has the potential to avoid allocating a byte slice of nBlocks*blockSize. Can you submit a PR fro this?

@thomascoquet
Copy link
Contributor Author

Will do this week!

@tbonfort
Copy link
Member

closed by #7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants