Skip to content

Commit

Permalink
connection leaks fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kalombos committed Apr 3, 2024
1 parent ecfc5ca commit 9e3dda8
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 127 deletions.
2 changes: 1 addition & 1 deletion load-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ uvicorn app:app
Run yandex-tank:

```bash
docker run -v $(pwd):/var/loadtest --net host -it yandex/yandex-tank
docker run -v $(pwd):/var/loadtest --net host -it yandex/yandex-tank
```
110 changes: 77 additions & 33 deletions load-testing/app.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,56 @@
import peewee
from fastapi import FastAPI
import logging
import uvicorn
import random
from contextlib import asynccontextmanager

import peewee
import uvicorn
from aiopg.connection import Connection
from aiopg.pool import Pool
from fastapi import FastAPI

acquire = Pool.acquire
cursor = Connection.cursor


def new_acquire(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return acquire(self)


def new_cursor(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return cursor(self)

Connection.cursor = new_cursor
Pool.acquire = new_acquire

import peewee_async
from contextlib import asynccontextmanager
import functools


logging.basicConfig()
pg_db = peewee_async.PooledPostgresqlDatabase(None)
pg_db = peewee_async.PooledPostgresqlDatabase(
database='postgres',
user='postgres',
password='postgres',
host='localhost',
port=5432,
max_connections=3
)


class Manager(peewee_async.Manager):
"""Async models manager."""

database = peewee_async.PooledPostgresqlDatabase(
database='postgres',
user='postgres',
password='postgres',
host='localhost',
port=5432,
)
database = pg_db


manager = Manager()


def patch_manager(manager):
async def cursor(self, conn=None, *args, **kwargs):

choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation

# actual code
in_transaction = conn is not None
if not conn:
conn = await self.acquire()
cursor = await conn.cursor(*args, **kwargs)
cursor.release = functools.partial(
self.release_cursor, cursor,
in_transaction=in_transaction)
return cursor

manager.database._async_conn_cls.cursor = cursor

def setup_logging():
logger = logging.getLogger("uvicorn.error")
handler = logging.FileHandler(filename="app.log", mode="w")
Expand All @@ -70,7 +75,6 @@ async def lifespan(app: FastAPI):
operation='TRUNCATE TABLE MySimplestModel;',
)
setup_logging()
patch_manager(manager)
yield
# Clean up the ML models and release the resources
await manager.close()
Expand All @@ -90,6 +94,46 @@ async def test():
return errors


async def nested_transaction():
async with manager.transaction():
await manager.execute(MySimplestModel.update(id=1))


async def nested_atomic():
async with manager.atomic():
await manager.execute(MySimplestModel.update(id=1))


@app.get("/transaction")
async def test():
try:
async with manager.transaction():
await manager.execute(MySimplestModel.update(id=1))
await nested_transaction()
except Exception as e:
errors.add(str(e))
raise
return errors


@app.get("/atomic")
async def test():
try:
async with manager.atomic():
await manager.execute(MySimplestModel.update(id=1))
await nested_atomic()
except Exception as e:
errors.add(str(e))
raise
return errors


@app.get("/recreate_pool")
async def test():
await manager.database.close_async()
await manager.database.connect_async()


if __name__ == "__main__":
uvicorn.run(
app,
Expand Down
2 changes: 1 addition & 1 deletion load-testing/load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ phantom:
- /select
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: const(50, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
schedule: const(100, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
console:
enabled: true # enable console output
telegraf:
Expand Down

0 comments on commit 9e3dda8

Please sign in to comment.