-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo_database.py
34 lines (27 loc) · 1018 Bytes
/
mongo_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
from config import settings
from monitor import Monitor
import asyncio
class MongoDBClient:
client = None
lock = asyncio.Lock()
def __init__(self, collection: str):
self.collection = collection
async def __aenter__(self) -> AsyncIOMotorCollection:
async with self.lock:
db = self.client.get_default_database()
return db[self.collection]
async def __aexit__(self, *args):
pass
@classmethod
async def connect(cls):
async with cls.lock:
if not cls.client:
cls.client = AsyncIOMotorClient(settings.mongodb_conn_string)
await Monitor.log("Подключение к Монге открыто")
@classmethod
async def disconnect(cls):
async with cls.lock:
if cls.client:
cls.client.close()
await Monitor.log("Подключение к Монге закрыто")