Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to Use botocore.response.StreamingBody as stdin PIPE #426

Closed
mslinn opened this issue Jan 2, 2016 · 39 comments
Closed

How to Use botocore.response.StreamingBody as stdin PIPE #426

mslinn opened this issue Jan 2, 2016 · 39 comments

Comments

@mslinn
Copy link

mslinn commented Jan 2, 2016

I want to pipe large video files from AWS S3 into Popen's stdin. This code runs as an AWS Lambda function, so these files won't fit in memory or on the local file system. Also, I don't want to copy these huge files anywhere, I just want to stream the input, process on the fly, and stream the output. I've already got the processing and streaming output bits working. The problem is how to obtain an input stream as a Popen pipe.

I can access a file in an S3 bucket:

import boto3
s3 = boto3.resource('s3')
response = s3.Object(bucket_name=bucket, key=key).get()
body = response['Body']  

body is a botocore.response.StreamingBody. I intend to use body something like this:

from subprocess import Popen, PIPE
Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(input=body)[0]

But of course body needs to be converted into a file-like object. The question is how?

@mslinn
Copy link
Author

mslinn commented Jan 7, 2016

Can anyone help?

@JordonPhillips
Copy link
Contributor

@mslinn Sorry for the delays!

StreamingBody is a file-like object, so any method that works against a file should work here as well. At the moment I'm not sure what the answer is, but it seems to be a question for the general python community so you'll probably have more luck on StackOverflow. This seems to be in the right direction.

I'm going to close this issue, but feel free to update here if you find a working solution.

@mslinn
Copy link
Author

mslinn commented Jan 8, 2016

I wish it were so simple. I must have dumped at least 30 hours into this problem, and I've tried a variety of approaches. I've tried the AWS & Python communities on StackOverflow but got no response. I think this issue requires too much setup for a generic Python programmer to address.

I'm sure this will be an often-repeated question, I'm just the first to hit it. I think someone from AWS should take it up.

For my part, I've got a broken product in production as a result of this issue. I am willing to pay cash money for a fix.

@Scoots
Copy link

Scoots commented Jan 18, 2016

I was working with json files specifically, so far from your large video file requirement. However, I was able to access response['Body']._raw_stream.data to get in a I-should-not-access-this-member kinda way. I hope you can do the same, I don't know the specifics of how S3 works ATM.

@JordonPhillips
Copy link
Contributor

if communicate needs more methods than read, you can dump the information into a different class. This will necessitate buffering the whole file in memory, but it can be done since the result from .read() is a byte array.

@mslinn
Copy link
Author

mslinn commented Jan 19, 2016

Buffering the entire file in memory is not an option. I am putting together some sample code that almost works, will share later today or tomorrow

@mslinn
Copy link
Author

mslinn commented Jan 19, 2016

A test project that works in a variety of environments is here.

@JordonPhillips
Copy link
Contributor

If you can't fit the data in memory, you won't be able to use Popen.communicate because it buffers the whole file in memory anyway. Lambda does give you 512MB of ephemeral storage, so you could write to a temporary file and direct ffmpeg to use that.

@mslinn
Copy link
Author

mslinn commented Jan 19, 2016

I did not use communicate in the sample code I posted 1/2 hour ago. Take a look
BTW, videos can be larger than 512MB.

@royrusso
Copy link

Not sure if this helps, but I struggled with a similar issue streaming from boto3:s3 to flask output stream. Some sample code here, may help you:

s3_response = s3_client.get_object(Bucket=BUCKET, Key=FILENAME)

def generate(result):
   for chunk in iter(lambda: result['Body'].read(self.CHUNK_SIZE), b''):
      yield chunk

return Response(generate(s3_response), mimetype='application/zip', headers={'Content-Disposition': 'attachment;filename=' + FILENAME})
`

@mslinn
Copy link
Author

mslinn commented Feb 16, 2016

Hmm, looks interesting, thanks!

@royrusso
Copy link

I tested that it does indeed iterate per chunk size, but I have not profiled it - meaning I'm hoping StreamingBody really is a stream and it's not all consumed in memory.

@c0b
Copy link

c0b commented Mar 29, 2016

performance is bad, though;

does python have something like a nodejs SrcStream.pipe(response) call? or Golang io.Copy(SrcStream, DstStream) ??

Node support multi-IO concurrency natively, Golang's io.Copy is using goroutines internally; in Python world I only found the werkzeug IterIO as a wrapper to write a stream to, which internally calls greenlet as the lightweight process model to simulate multi-io concurrency

  1. http://werkzeug.pocoo.org/docs/dev/contrib/iterio/
  2. https://github.com/marianoguerra/tubes/blob/master/werkzeug/contrib/iterio.py

@rossy62-zz
Copy link

rossy62-zz commented Aug 17, 2016

Thanks! I'm looking at this same issue too. I'm trying to "stream" a StreamingBody S3 input file and copy it to an S3 output file. I want to do txt file processing on potentially LARGE files. I'm a newbie to Python and AWS, but this information is exactly what I was looking for.

@mslinn
Copy link
Author

mslinn commented Aug 17, 2016

I spent a lot of time but never got this to work. If someone does, please show the juicy details.

@rossy62-zz
Copy link

I'm resorting to using buffered reads (4096 chars/read) at the moment. But I'm getting farther. I saw your github code submission, if I find anything I'll share.

Sent from my iPad

On Aug 17, 2016, at 3:46 PM, Mike Slinn notifications@github.com wrote:

I spent a lot of time but never got this to work. If someone does, please show the juicy details.


You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.

@ghost
Copy link

ghost commented Sep 19, 2016

What happens if you do

from contextlib import closing

with closing(response['Body']) as body_fileobj:
    p = Popen(..cmd.., stdin=body_fileobj, stdout=PIPE)
(stdout,_) = p.communicate()
...

That should be equivalent to

stdout_var = `curl URL | somecmd` 

in bash

@uditbhatia
Copy link

Hey
Have you found any useful solution for this ?

@npfp
Copy link

npfp commented Jun 28, 2017

@JordonPhillips I'm confused by this sentence:

StreamingBody is a file-like object, so any method that works against a file should work here as well.

I looked at the code of the StreamingBody and it seems to me that is is really a wrapper of a class inheriting from io.IOBase) but only the read method from the raw stream is exposed, so not really a file-like object.

To me it would make a lot of sense to expose the io.IOBase interface in the StreamingBody as we could wrapped S3 objects into a io.BufferedReader or a io.TextIOWrapper. Is there any reasons not to do so? Happy to make a pull request if it can help.

@mubeta06
Copy link

As pointed out above by @Scoots the actual file-like object is found in the ._raw_stream attribute of the StreamingBody class. What are the ramifications of accessing it directly for downstream file-like consumers ??

@wholeinsoul
Copy link

@mslinn : Were you ever able to solve this ?

Does the download_fileobj() with TransferConfig(max_concurrency=1, max_io_queue=1) solves this problem ?

Thanks.

@mslinn
Copy link
Author

mslinn commented Jan 23, 2018 via email

@wholeinsoul
Copy link

Thank you for responding.

I did encounter this library which supports streaming large files to/from s3 and is written on top of boto - https://github.com/RaRe-Technologies/smart_open .

@mslinn
Copy link
Author

mslinn commented Jan 23, 2018

@bishtpradeep looks interesting and useful, thanks!

@delwaterman
Copy link

@mslinn I think I found a possible solution (at least for Python 3). Looking at the objects in boto3. I had a similar problem using pickle with a particularly large object. Here's what I got working:

>>> obj
s3.Object(bucket_name='my-bucket', key='path/to/data/pickled-object.pkl')
>>> body = obj.get()["Body"]
>>> body
<botocore.response.StreamingBody object at 0x10e48d978>
>>> body._raw_stream
<botocore.vendored.requests.packages.urllib3.response.HTTPResponse object at 0x10e48d3c8>

So looking at the docs for this response here http://urllib3.readthedocs.io/en/latest/reference/#module-urllib3.response you can see:

class urllib3.response.HTTPResponse(body='', headers=None, status=0, version=0, reason=None, strict=0, preload_content=True, decode_content=True, original_response=None, pool=None, connection=None, retries=None, enforce_content_length=False, request_method=None)
Bases: io.IOBase

This is good news because Python offers some nice IO objects to allow for BufferedIOReading (https://docs.python.org/3/library/io.html#io.BufferedReader). So continuing from the code above:

>>> import io
>>> buff_reader = io.BufferedReader(body._raw_stream)
>>> buff_reader
<_io.BufferedReader>
>>> import pickle
>>> data_dict = pickle.load(buff_reader)
>>> data_dict
{'my-data': 'VERY LONG DATA'}

The BufferedReader is a true file like object. @JordonPhillips is this dangerous?

@nickovs
Copy link

nickovs commented Mar 15, 2018

I had to do this recently. In the end the simplest solution that I found was simply to have a loop that alternately read chunks of data from the source StreamingBody and wrote to them the stdin pipe and then read as much as possible from the stdout pipe and wrote the result to my output stream (which was an SSL connection). The problem with Popen.communicate() is that as far as I can tell it attempts to send all of the input to the process before reading back (and buffering) all of the output. This will lock up if the subprocess writes in a blocking manner and the output is so large that it fills up the output pipe's buffers before you've fed it all the input and it will also fail if the output is too large to fit in memory even if you have succeeded in feeding in all of the input.. You need to drain the output and pass it on down the line before feeding it more input.

@mslinn
Copy link
Author

mslinn commented Mar 15, 2018

@nickovs Got any code you could share? Feel inspired to write an article about this?

@nickovs
Copy link

nickovs commented Mar 15, 2018

I'll see if I can did it out. I seem to recall that the key was to use call fileno() on the output and use fcntl to set the stdout pipe to have non-blocking reads, so that I could reliably read it until it was empty without getting stuck.

Note that an alternative option, if you have trouble getting non-blocking IO to work, is just to start a thread that reads from the StreamingBody and writes to the stdin pipe (and terminates when the SteamingBody runs dry or the far end of the pipe gets closed) and then have the main path of your code read the output from stdout and pass it down the line. In this case you need to take care to have a way to clean up the thread if the subprocess crashes.

@SerialDev
Copy link

@mslinn Would streaming it through BytesIO be sufficient to emulate the file for piping through? this also would work I think.

I'm currently using it like so:

    def from_bin_streaming(self, bucket_name, key_name):
        out_buffer = io.BytesIO()
        obj = self.get_s3_client().get_object(Bucket=bucket_name, Key=key_name)['Body'].iter_lines()
        for i in obj:
            out_buffer.write(i)
        out_buffer.seek(0)
        return out_buffer

Of course you could yield instead use seek to slice bytes too. I havent tested it with popen but I feel like it could be useful

@ianknowles
Copy link

ianknowles commented Jun 14, 2019

This issue is still something you have to either read the boto3 source or go to stack overflow to resolve. Which means the library needs work and this issue should not have been closed and instead assigned as a feature request.

https://stackoverflow.com/questions/7624900/how-can-i-use-boto-to-stream-a-file-out-of-amazon-s3-to-rackspace-cloudfiles/40661459#40661459

@nicornk
Copy link

nicornk commented Jun 19, 2019

I just ran into this issue.
My use case is to copy potentially large files with bo3o from one s3 bucket to another s3 bucket with different credentials.
The following snippet fails with 'ValueError: read of closed file', most probably due to underlying issue described in urllib3/urllib3#1305

import io
source_obj = source_s3.Object(bucket_name='bucket1', key='source_key')
target_s3.Object('bucket2', 'target_key').put(Body=io.BufferedReader(source_obj.get()['Body']._raw_stream))

Does anybody have a better idea how to efficiently copy objects from one bucket to the other without having to worry about tuning CHUNK_SIZE?

@eprochasson
Copy link

eprochasson commented Jan 7, 2020

This is especially frustrating since apparently put_object can not take a StreamingBody as input.

I'm trying to copy file between S3-compatible services:

# both `src_client` and `tgt_client` are valid, AWS S3 clients from different accounts, `key` is a valid key
obj = src_client.get_object(Bucket="bucket", Key=key)
tgt_client.put_object(Bucket="tgt_bucket", Key=key, Body=obj['Body'])

responds: AttributeError: 'StreamingBody' object has no attribute 'tell'

using instead _raw_stream:

obj = src_client.get_object(Bucket="bucket", Key=key)
tgt_client.put_object(Bucket="tgt_bucket", Key=key, Body=obj['Body']._raw_stream)

fails with "UnsupportedOperation: seek"

@eprochasson
Copy link

@nicornk the problem with urllib3 seems to have a workaround, but it still doesn't work anyway.
FYI, you can use .auto_close on the stream to prevent it being closed at the end of .read().

obj = src_client.get_object(Bucket="bucket", Key=key)

stream = obj['Body']._raw_stream
stream.auto_close = False

tgt_client.put_object(Bucket="sharethis_archive", Key=key, Body=io.BufferedReader(stream))

fails with "UnsupportedOperation: File or stream is not seekable."

@CMCDragonkai
Copy link

I have a question about this...

If I pass around this streaming body object, does this mean that the http connection isn't closed? Is it only closed once the streaming body object is garbage collected or when the entire stream is read?

@Hiryus
Copy link

Hiryus commented Jan 14, 2021

Hello there,
I'm trying to do exactly like @eprochasson and @nicornk : copy a file from a S3 bucket in AWS account A to another S3 bucket in AWS account B.

I've read the whole discussion and merged PRs, but I have not yet found a proper way to do it: put_object() does not accept directly a StreamingBody (nor its ._raw_stream) and I don't see any good solution to turn it into a readable stream with seek() capability.

Did anyone find a good solution ?

@l0b0
Copy link

l0b0 commented Jun 13, 2022

@Hiryus Does upload_fileobj (example) work for you? It doesn't have the symmetry of get_object and put_object, but it does take StreamingBody as the first argument.

@Hiryus
Copy link

Hiryus commented Jun 13, 2022

To be honest, I don't remember much about this issue.
I'm not even sure we still use this code somewhere.
So, I cannot help you, sorry...

@louis-gale
Copy link

louis-gale commented Nov 25, 2022

@eprochasson @Hiryus and for anyone discovering this thread going forward, the way to copy between S3 buckets is to use the copy() method, something like:

boto3.client('s3').copy(
    CopySource={"Bucket": source_bucket_name, "Key": source_object_key},
    Bucket=destination_bucket_name,
    Key=destination_object_key,
    ExtraArgs=extra_args)

If copying between accounts you just need to set up a trust relationship using policies.

The original question of streaming data from an S3 object is still a good one though, to which i haven't found an ideal solution.

@Alexander-Serov
Copy link

Please re-open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests