How to use Starlettes Streaming response with FastAPI #8042
-
|
Description How can I use Starlettes streaming response with synchronous and async generators in fastapi, Its mentioned on the front page of the docs, but no example (that I can find is provided, other than websocket) Is it possible to this? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
|
It is possible. You need to return an instance of @router.get(orders_export_download_uri,
responses={
404: responses.empty_response(404)
})
async def get_download(job_id: UUID,
hash: str,
*,
config: ConfigTree = Depends(get_config)) -> Response:
order_export_job = await order_export_dao.get(job_id)
if not order_export_job or order_export_job.hash != hash or not order_export_job.filename:
raise NotFoundError()
async def s3_stream(chunk_size: int = 4096):
try:
async with AWSClientProvider.s3() as s3:
s3_ob = await s3.get_object(
Bucket=config.get_string("aws.s3.exports.bucket"),
Key="{}_{}.csv".format(
order_export_job.job_id,
order_export_job.hash,
)
)
async with s3_ob["Body"] as stream:
chunk = await stream.read(chunk_size)
while chunk:
yield chunk
chunk = await stream.read(chunk_size)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
raise NotFoundError()
else:
raise e
return StreamingResponse(
s3_stream(),
media_type="text/csv",
headers={
"Content-Disposition": "attachment;filename=export.csv"
}
) |
Beta Was this translation helpful? Give feedback.
-
|
There are now docs for |
Beta Was this translation helpful? Give feedback.
-
|
Assuming the original issue was solved, it will be automatically closed now. But feel free to add more comments or create new issues. |
Beta Was this translation helpful? Give feedback.
There are now docs for
StreamingResponse: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse 🎉