-
Notifications
You must be signed in to change notification settings - Fork 58
/
test_connection.py
199 lines (151 loc) · 5.1 KB
/
test_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import asyncio
import gc
from unittest import mock
import pytest
import pyodbc
import aioodbc
def test_connect(loop, conn):
assert conn.loop is loop
assert not conn.autocommit
assert conn.timeout == 0
assert not conn.closed
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_connect_hook(connection_maker):
raw_conn = None
async def hook(conn):
nonlocal raw_conn
raw_conn = conn
connection = await connection_maker(after_created=hook)
assert connection._conn == raw_conn
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_basic_cursor(conn):
cursor = await conn.cursor()
sql = 'SELECT 10;'
await cursor.execute(sql)
(resp,) = await cursor.fetchone()
assert resp == 10
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_default_loop(loop, dsn):
asyncio.set_event_loop(loop)
conn = await aioodbc.connect(dsn=dsn)
assert conn._loop is loop
await conn.close()
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_close_twice(conn):
await conn.close()
await conn.close()
assert conn.closed
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_execute(conn):
cur = await conn.execute('SELECT 10;')
(resp,) = await cur.fetchone()
await conn.close()
assert resp == 10
assert conn.closed
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_getinfo(conn):
data = await conn.getinfo(pyodbc.SQL_CREATE_TABLE)
pg = 14057
sqlite = 1793
mysql = 3093
assert data in (pg, sqlite, mysql)
@pytest.mark.parametrize('db', ['mysql'])
@pytest.mark.asyncio
async def test_output_conversion(conn, table):
def convert(value):
# value will be a string. We'll simply add an X at the
# beginning at the end.
if isinstance(value, str):
return 'X' + value + 'X'
return b'X' + value + b'X'
await conn.add_output_converter(pyodbc.SQL_VARCHAR, convert)
cur = await conn.cursor()
await cur.execute("INSERT INTO t1 VALUES (3, '123.45')")
await cur.execute("SELECT v FROM t1 WHERE n=3;")
(value,) = await cur.fetchone()
assert value in (b'X123.45X', 'X123.45X')
# Now clear the conversions and try again. There should be
# no Xs this time.
await conn.clear_output_converters()
await cur.execute("SELECT v FROM t1")
(value,) = await cur.fetchone()
assert value == '123.45'
await cur.close()
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_autocommit(loop, connection_maker):
conn = await connection_maker(autocommit=True)
assert conn.autocommit, True
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_rollback(conn):
assert not conn.autocommit
cur = await conn.cursor()
await cur.execute("CREATE TABLE t1(n INT, v VARCHAR(10));")
await conn.commit()
await cur.execute("INSERT INTO t1 VALUES (1, '123.45');")
await cur.execute("SELECT v FROM t1")
(value,) = await cur.fetchone()
assert value == '123.45'
await conn.rollback()
await cur.execute("SELECT v FROM t1;")
value = await cur.fetchone()
assert value is None
await cur.execute("DROP TABLE t1;")
await conn.commit()
await conn.close()
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_custom_executor(loop, dsn, executor):
conn = await aioodbc.connect(dsn=dsn, executor=executor, loop=loop)
assert conn._executor is executor
cur = await conn.execute('SELECT 10;')
(resp,) = await cur.fetchone()
await conn.close()
assert resp == 10
assert conn.closed
@pytest.mark.asyncio
async def test_dataSources(loop, executor):
data = await aioodbc.dataSources(loop, executor)
assert isinstance(data, dict)
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_connection_simple_with(loop, conn):
assert not conn.closed
async with conn:
pass
assert conn.closed
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test_connect_context_manager(loop, dsn):
async with aioodbc.connect(dsn=dsn, loop=loop, echo=True) as conn:
assert not conn.closed
assert conn.echo
cur = await conn.execute('SELECT 10;')
assert cur.echo
(resp,) = await cur.fetchone()
assert resp == 10
await cur.close()
assert conn.closed
@pytest.mark.parametrize('db', pytest.db_list)
@pytest.mark.asyncio
async def test___del__(loop, dsn, recwarn, executor):
return
conn = await aioodbc.connect(dsn=dsn, loop=loop, executor=executor)
exc_handler = mock.Mock()
loop.set_exception_handler(exc_handler)
del conn
gc.collect()
w = recwarn.pop()
assert issubclass(w.category, ResourceWarning)
msg = {'connection': mock.ANY, # conn was deleted
'message': 'Unclosed connection'}
if loop.get_debug():
msg['source_traceback'] = mock.ANY
exc_handler.assert_called_with(loop, msg)