-
Notifications
You must be signed in to change notification settings - Fork 1
/
session.py
61 lines (47 loc) · 1.4 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from typing import Any
import os
from contextvars import ContextVar
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import PendingRollbackError
import traceback
from .util import collect_engine_variables
request_id_ctx_var = ContextVar("request_id", default=None)
def get_request_id():
return request_id_ctx_var.get()
(
pool_size,
pool_max_overflow,
pool_recycle,
pool_use_lifo,
pool_pre_ping,
) = collect_engine_variables()
engine = create_engine(
os.getenv("POSTGRES"),
pool_size=pool_size,
max_overflow=pool_max_overflow,
pool_recycle=pool_recycle,
pool_use_lifo=pool_use_lifo,
pool_pre_ping=pool_pre_ping,
)
session = scoped_session(
sessionmaker(autocommit=False, autoflush=True, bind=engine),
scopefunc=get_request_id,
)
## uncomment following lines to enable db logging
""" import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
"""
def check_session_and_rollback():
try:
_ = session.connection()
except PendingRollbackError:
print("session issue detected, rollback initiated", flush=True)
print(traceback.format_exc(), flush=True)
while session.registry().in_transaction():
session.rollback()
def get_engine_dialect() -> Any:
if not engine:
return None
return engine.dialect