-
Notifications
You must be signed in to change notification settings - Fork 254
/
utils.py
151 lines (110 loc) · 3.51 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from collections.abc import Coroutine
class _ContextManager(Coroutine):
__slots__ = ('_coro', '_obj')
def __init__(self, coro):
self._coro = coro
self._obj = None
def send(self, value):
return self._coro.send(value)
def throw(self, typ, val=None, tb=None):
if val is None:
return self._coro.throw(typ)
elif tb is None:
return self._coro.throw(typ, val)
else:
return self._coro.throw(typ, val, tb)
def close(self):
return self._coro.close()
@property
def gi_frame(self):
return self._coro.gi_frame
@property
def gi_running(self):
return self._coro.gi_running
@property
def gi_code(self):
return self._coro.gi_code
def __next__(self):
return self.send(None)
def __iter__(self):
return self._coro.__await__()
def __await__(self):
return self._coro.__await__()
async def __aenter__(self):
self._obj = await self._coro
return self._obj
async def __aexit__(self, exc_type, exc, tb):
await self._obj.close()
self._obj = None
class _ConnectionContextManager(_ContextManager):
async def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
self._obj.close()
else:
await self._obj.ensure_closed()
self._obj = None
class _PoolContextManager(_ContextManager):
async def __aexit__(self, exc_type, exc, tb):
self._obj.close()
await self._obj.wait_closed()
self._obj = None
class _SAConnectionContextManager(_ContextManager):
async def __aiter__(self):
result = await self._coro
return result
class _TransactionContextManager(_ContextManager):
async def __aexit__(self, exc_type, exc, tb):
if exc_type:
await self._obj.rollback()
else:
if self._obj.is_active:
await self._obj.commit()
self._obj = None
class _PoolAcquireContextManager(_ContextManager):
__slots__ = ('_coro', '_conn', '_pool')
def __init__(self, coro, pool):
self._coro = coro
self._conn = None
self._pool = pool
async def __aenter__(self):
self._conn = await self._coro
return self._conn
async def __aexit__(self, exc_type, exc, tb):
try:
await self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None
class _PoolConnectionContextManager:
"""Context manager.
This enables the following idiom for acquiring and releasing a
connection around a block:
with (yield from pool) as conn:
cur = yield from conn.cursor()
while failing loudly when accidentally using:
with pool:
<block>
"""
__slots__ = ('_pool', '_conn')
def __init__(self, pool, conn):
self._pool = pool
self._conn = conn
def __enter__(self):
assert self._conn
return self._conn
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None
async def __aenter__(self):
assert not self._conn
self._conn = await self._pool.acquire()
return self._conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
await self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None