Skip to content

Conversation

@fabito
Copy link
Contributor

@fabito fabito commented May 10, 2019

Overrides io.RawIOBase.readall in filesystemio.DownloaderStream as proposed in BEAM-6027.
It improves download time in ~40x.

@fabito fabito marked this pull request as ready for review May 10, 2019 18:15
fabito added 2 commits May 11, 2019 10:09
@robertwb robertwb requested a review from udim May 21, 2019 12:58
Copy link
Member

@udim udim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution!
Very cool

@udim
Copy link
Member

udim commented May 21, 2019

cc: @chamikaramj
Can you run some benchmarks and include your results here?

fabito added 4 commits May 22, 2019 22:43
Signed-off-by: fabito <fuechi@ciandt.com>
Signed-off-by: fabito <fuechi@ciandt.com>
Signed-off-by: fabito <fuechi@ciandt.com>
Signed-off-by: fabito <fuechi@ciandt.com>
@fabito
Copy link
Contributor Author

fabito commented May 23, 2019

Hi @udim ,

Using this snippet:

import tempfile
import timeit

from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import gcsio
from apache_beam.io.filesystemio import DownloaderStream


# https://issues.apache.org/jira/browse/BEAM-6027
def downloader_stream_readall(self):
    res = []
    while True:
        data = self.read(gcsio.DEFAULT_READ_BUFFER_SIZE)
        if not data:
            break
        res.append(data)
    return b''.join(res)


original_read_all = DownloaderStream.readall


if __name__ == '__main__':
    test_file = 'gs://cloud-samples-tests/vision/saigon.mp4'
    num_executions = 1

    def test_original():
        DownloaderStream.readall = original_read_all
        with FileSystems.open(test_file) as audio_file:
            with tempfile.NamedTemporaryFile(mode='w+b') as temp:
                temp.write(audio_file.read())

    def test_refactored():
        DownloaderStream.readall = downloader_stream_readall
        with FileSystems.open(test_file) as audio_file:
            with tempfile.NamedTemporaryFile(mode='w+b') as temp:
                temp.write(audio_file.read())

    print(timeit.timeit("test_original()", setup="from __main__ import test_original", number=num_executions))
    print(timeit.timeit("test_refactored()", setup="from __main__ import test_refactored", number=num_executions))

I got the following output:

120.99772120200214
1.0684915780002484

Hope that helps

@chamikaramj
Copy link
Contributor

Thanks for the update. This looks great.

Seems like the file you used for your microbenchmark is about 4MB which will be within the first chunk for the new buffer size. Can you try running a Beam pipeline with a larger input (say 10GB) with Dataflow to confirm that there's no regression at large scale ?

@fabito
Copy link
Contributor Author

fabito commented May 23, 2019

Yes I can. Any advice on how this pipeline would be and how can we measure the reading performance?
Maybe something like this:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | 'Read 10Gb file' >> beam.io.ReadAllFromText('gs://bucket/10Gb.txt')
            | 'Write file' >> beam.io.WriteToText('gs://bucket/10Gb_copy*.txt')
        )

@chamikaramj
Copy link
Contributor

Yeah, that pipeline looks good. End-to-end execution time and Total vCPU time show in Dataflow console should be good metrics to compare.

@fabito
Copy link
Contributor Author

fabito commented May 23, 2019

I ran the same pipeline, first with the original code and after with the new implementation of read_all. I used a text file with ~8.81Gb. Apparently performance wasn't affected. Check below the console snapshots from both executions:

Before the change:

beam_before

After the change:

beam_after

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

Thanks for fixing this.

Added one comment. Also please fixup your commits into a single commit for merging.

def readall(self):
"""Read until EOF, using multiple read() call."""
res = []
while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function used ?

Prob. remove if unused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually seems like you are overriding the function here: https://docs.python.org/3/library/io.html#io.IOBase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, still have a question.

Does Beam call readlll() function anywhere ? I couldn't find a usage. Beam textio for example, invokes read() not readall().
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L272

If it does, I'm not sure what will prevent us from reading a huge amount of data into memory and running into OOMs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only found this usage in ReadableFile (relatively new) where we don't specify the size:

def open(self, mime_type='text/plain'):
return filesystems.FileSystems.open(self.metadata.path)
def read(self):
return self.open().read()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I think ReadableFile is intended for small files. But probably we should add a readall() method there as well and update read() to take a buffer (not in this PR).

cc: @pabloem

@chamikaramj
Copy link
Contributor

Thanks. I'll squash and merge.

@chamikaramj chamikaramj merged commit 4cf2830 into apache:master May 23, 2019
@udim
Copy link
Member

udim commented May 23, 2019

Adding note to not forget to "run python postcommit" before merging

@fabito fabito deleted the override-readall-for-faster-downloads branch June 22, 2019 21:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants