-
Notifications
You must be signed in to change notification settings - Fork 431
Open
Description
- asyncpg version:
- PostgreSQL version:
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
the issue with a local PostgreSQL install?: - Python version:
- Platform:
- Do you use pgbouncer?:
- Did you install asyncpg with pip?:
- If you built asyncpg locally, which version of Cython did you use?:
- Can the issue be reproduced under both asyncio and
uvloop?:
Latest version of asyncpg
No PGBouncer
Python 3.8.2
Hi, I am using fast API but I have no idea how to maintain DB connection and pool. I am switching from Django ORM but still, it is hard for me to get a good grasp on this. Please guide me on how can I improve this and also add this to your documentation.
My questions:
How to maintain a constant database connection?
How to maintain a pool of connections?
How to use the same connection out of the pool for every query in the same request?
Is my naive solution correct? How can I improve this and make this production-ready?
db_connection.py
`import asyncpg
from configs.settings import settings
class Database:
def __init__(self):
self.user = settings.POSTGRES_USER
self.password = settings.POSTGRES_PASSWORD
self.host = settings.POSTGRES_SERVER
self.port = "5432"
self.database = settings.POSTGRES_DB
self._cursor = None
self._connection_pool = None
self.con = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
min_size=1,
max_size=10,
command_timeout=60,
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
)
except Exception as e:
print(e)
async def fetch_rows(self, query: str):
print(query)
if not self._connection_pool:
print("shouldnt be here")
await self.connect()
else:
self.con = await self._connection_pool.acquire()
try:
result = await self.con.fetch(query)
print(result)
return result
except Exception as e:
print(e)
finally:
print("pool released")
await self._connection_pool.release(self.con)
`
db_session.py
`from .db_connection import Database
database_instance = Database()
`
main.py
`from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from routes import items, user
from utils.middleware import middleware
from configs import open_api_tags
from configs.settings import settings
from db.db_session import database_instance
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version="0.0.1",
openapi_tags=open_api_tags.tags_metadata,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.middleware("http")(middleware)
@app.on_event("startup")
async def startup():
await database_instance.connect()
app.include_router(user.router)
app.include_router(items.router, prefix="/items")
`
user.py
`from fastapi import APIRouter
from db.db_session import database_instance
router = APIRouter()
@router.get("/users/me", tags=["users"])
async def read_user_me():
result = await database_instance.fetch_rows("SELECT * from user")
print(result)
return {"username": "fakecurrentuser"}`
marcodelmoral and coma8765
Metadata
Metadata
Assignees
Labels
No labels