Replies: 8 comments 6 replies
-
|
I managed to track down the cause using this parameter in my session thanks to the team at SQLAlchemy: I had endpoints where I used depends to create a DB session, and pass it to a background task - when returning the session was closed and the background task re-opened it, then failed to close it as the context manager was no longer valid. Please feel free to delete this issue, or leave it up if this may be helpful for others. Example code before: @router.post(
"/submit",
)
async def submit_order(
user: CurrentUserDep,
db: DBSessionDep,
background_tasks: BackgroundTasks,
order_request: List[OrderRequest] = Body(..., description="The order to submit."),
) -> List[OrderSubmissionV2]:
order: List[OrderSubmissionV2] = await submit_order(
db, order_request, version=2
)
# Refetch the last 10 minutes of orders to update the cache
background_tasks.add_task(refresh_past_orders, db)
return order # Session is closed
async def refresh_past_orders(db: AsyncSession):
# Closed session is re-opened and context not managed
await get_past_orders(
db=db,
)
logger.info("Refreshed past orders cache for the last 10 minutes.")Example code with fix: @router.post(
"/submit",
)
async def submit_order(
user: CurrentUserDep,
db: DBSessionDep,
background_tasks: BackgroundTasks,
order_request: List[OrderRequest] = Body(..., description="The order to submit."),
) -> List[OrderSubmissionV2]:
order: List[OrderSubmissionV2] = await submit_order(
db, order_request, version=2
)
# Refetch the last 10 minutes of orders to update the cache
background_tasks.add_task(refresh_past_orders)
return order # Session is closed
async def refresh_past_orders():
# Use new session
async with session_manager.session() as db:
await get_past_orders(
db=db,
)
logger.info("Refreshed past orders cache for the last 10 minutes.") |
Beta Was this translation helpful? Give feedback.
-
|
For anybody that still has this problem, I wasn't using background tasks, but I was using a Beforefrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/")
async def main(db=DBSession):
async def get_cars():
cars = models.Car.get_many(db) # Calling the db here will throw the error
for car in cars:
yield car
return StreamingResponse(get_cars())Afterfrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/")
async def main(db=DBSession):
cars = models.Car.get_many(db) # Move it outside
async def get_cars():
for car in cars:
yield car
return StreamingResponse(get_cars()) |
Beta Was this translation helpful? Give feedback.
-
|
For anyone coming here from a Google Search, similar to @nachonavarro, we were finding issues but with using the sse-starlette package. Similar to other solutions, the answer is to move the AsyncSession inside of your async generator and not to pass it/use a route-based dependency. The fact that SQLAlchemy sessions are not thread safe and that background tasks and streaming responses seem to use anyio TaskGroups smells like a likely issue but I'm not certain. In our case, it also seemed to eventually lead to segmentation faults that could kill our server (fun!). import sqlalchemy as sa
from sse_starlette.sse import EventSourceResponse
# ❌ Bad
@app.get("/")
async def bad(session: AsyncSessionDependency):
async def generator():
things = session.scalars(sa.select(models.Thing))
for thing in things:
yield {"data": thing}
return EventSourceResponse(generator)
# ✅ Good
@app.get("/")
async def good():
async def generator():
async with AsyncSessionMaker() as session:
things = session.scalars(sa.select(models.Thing))
for thing in things:
yield {"data": thing}
return EventSourceResponse(generator) |
Beta Was this translation helpful? Give feedback.
-
|
Hey guys! Wanted to get your thoughts on another approach I thought: explicitly pass your dependency to the async function so we ensure it's not closed until the function ends. In other words: from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/")
async def main(db=DBSession):
async def get_cars(session):
cars = models.Car.get_many(session) # No more errors?
for car in cars:
yield car
return StreamingResponse(get_cars(db))This has the benefit of avoiding the need to create another session. Would this work? |
Beta Was this translation helpful? Give feedback.
-
|
How can this be debugged - in terms of finding out non-checked-in connections and where they were created? |
Beta Was this translation helpful? Give feedback.
-
|
Hello! async_sessionmaker(close_resets_only=False)The reason is: def _close_impl(self, invalidate: bool, is_reset: bool = False) -> None:
if not is_reset and self._close_state is _SessionCloseState.ACTIVE:
self._close_state = _SessionCloseState.CLOSEDHere P.S: |
Beta Was this translation helpful? Give feedback.
-
|
Hi! I just faced the same issue after changing my app to work with background tasks.
Yes, you should definitely add the info on this error there as well as FastApi-correct way to deal with this error. Since DB call is not the most time-consuming part of my route, I'm thinking of getting the data from the DB first, then sending the results to be processed in the background task, but that would be nice to know if I could still inject the DB dependency in the route and pass it to the background task without losing the context. |
Beta Was this translation helpful? Give feedback.
-
|
Hi all, I've been investigating this issue and wanted to share my findings. After some digging, I've concluded that the problem likely isn't a bug in FastAPI itself, but rather on how we are managing the lifecycle of database sessions when using background tasks.
This doesn't immediately raise an error due to SQLAlchemy's default To confirm this hypothesis, I ran a couple of tests. Change close_resets_only to FalseI set Create a custom AsyncSession to track the sessionI created a custom class TrackedAsyncSession(AsyncSession):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_closed = False
logger.info(f"SESSION CREATED: {id(self)}")
async def execute(self, *args, **kwargs):
if self._is_closed:
logger.warning(f"ATTEMPTING TO USE ALREADY CLOSED SESSION: {id(self)}")
return await super().execute(*args, **kwargs)
async def close(self):
if self._is_closed:
logger.warning(f"SESSION ALREADY CLOSED: {id(self)}")
self._is_closed = True
logger.info(f"SESSION CLOSED: {id(self)}")
await super().close()We can use it setting
Another thing that helped me understand the flow was to print the session's internal state with A question that followed was, “Why don’t we see a warning log for every task, and why is it hard to reproduce locally?” My assumption is that this is related to Python's garbage collector. The unclosed session is only detected and flagged for cleanup under specific, non-deterministic conditions, which would explain why the warnings are intermittent and harder to trigger in a local development environment. But again I can't prove this, it's just my opinion on what's happening. Based on these findings, in my view the correct architectural solution is for the background task to create and manage its own, new database session, completely independent of the session from the originating HTTP request. I hope sharing my debugging process helps others who might run into this common pattern. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
First Check
Commit to Help
Example Code
Description
Hi all,
I am encountering the following error on my production server, which I am not able to reproduce locally and am having a hard time tracking down the cause. Any guidance on how to troubleshoot it further or what may be causing it would be greatly appreciated. I believe it may be related to using
AsyncSession, as I recently updated fromSessiontoAsyncSession(along with package version updates), and the error started occurring after that. I've checked with SQLAlchemy and they seem to think everything looks fine from their side, and it's likely related to the behaviour ofDepends
The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <asyncpg.connection.Connection object at 0x3dfdd19ee4d0>>, which will be terminated. Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle. I am running a FastAPI server (runs in a Docker image with Gunicorn/Uvicorn on GCP with Cloud SQL Postgres), using asyncpg and SQLAlchemy for my database ORM with the following versions:
I have a
DatabaseSessionManagerclass that manages the creation and closing of sessions. I then use FastAPI dependency injection to create an injectableDBSessionDep:This
DBSessionDepis the only way I ever access the database, and it is injected into each endpoint. I am under the impression that it should always call theclose()method on the session before the garbage collector (GC) runs, but that doesn't seem to be occurring.I have also ensured that I am not manually closing the session outside of the
sessioncontext manager and that I have appropriate exception handling in place to gracefully handle any errors and allow the session to be closed.If I've omitted any key details, please let me know. Apologies that I cannot provide a reproducible sample easily, as it is very sporadic - I've tried logging the session and connection IDs to track down where exactly it is coming from but didn't manage to find any form of pattern.
Any insights or suggestions on how to further diagnose and resolve this issue would be highly appreciated. Thank you in advance for your help!
Operating System
macOS
Operating System Details
No response
FastAPI Version
0.110.0
Pydantic Version
2.6.4
Python Version
3.11
Additional Context
No response
Beta Was this translation helpful? Give feedback.
All reactions