Skip to content
Discussion options

You must be logged in to vote

Thanks to Doctor from discord I was able to finalize this:

from time import sleep
from typing import Any

from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def save_to_db(data: Any):
    print("Saving to the database:")
    print(data)


def fake_streamer(background_tasks: BackgroundTasks):
    chunks = []
    for i in range(3):
        sleep(1)
        chunks.append(i)
        yield f"Streaming: {i}\n"
    print("Streaming Done!", chunks)
    background_tasks.add_task(save_to_db, chunks)


@app.get("/")
async def main(background_tasks: BackgroundTasks):
    return StreamingResponse(fake_streamer(background_tasks))

We're still …

Replies: 1 comment 3 replies

Comment options

You must be logged in to vote
3 replies
@jd-solanki
Comment options

@jd-solanki
Comment options

Answer selected by YuriiMotov
@hamdan-27
Comment options

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question or problem
3 participants