-
Notifications
You must be signed in to change notification settings - Fork 415
/
Copy pathtest_adversity.py
88 lines (75 loc) · 3.1 KB
/
test_adversity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
"""Tests how asyncpg behaves in non-ideal conditions."""
import asyncio
import os
import platform
import unittest
from asyncpg import _testbase as tb
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
@unittest.skipIf(
platform.system() == 'Windows',
'not compatible with ProactorEventLoop which is default in Python 3.8+')
class TestConnectionLoss(tb.ProxiedClusterTestCase):
@tb.with_timeout(30.0)
async def test_connection_close_timeout(self):
con = await self.connect()
self.proxy.trigger_connectivity_loss()
with self.assertRaises(asyncio.TimeoutError):
await con.close(timeout=0.5)
@tb.with_timeout(30.0)
async def test_pool_acquire_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
try:
self.proxy.trigger_connectivity_loss()
for _ in range(2):
with self.assertRaises(asyncio.TimeoutError):
async with pool.acquire(timeout=0.5):
pass
self.proxy.restore_connectivity()
async with pool.acquire(timeout=0.5):
pass
finally:
self.proxy.restore_connectivity()
pool.terminate()
@tb.with_timeout(30.0)
async def test_pool_release_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
try:
with self.assertRaises(asyncio.TimeoutError):
async with pool.acquire(timeout=0.5):
self.proxy.trigger_connectivity_loss()
finally:
self.proxy.restore_connectivity()
pool.terminate()
@tb.with_timeout(30.0)
async def test_pool_handles_abrupt_connection_loss(self):
pool_size = 3
query_runtime = 0.5
pool_timeout = cmd_timeout = 1.0
concurrency = 9
pool_concurrency = (concurrency - 1) // pool_size + 1
# Worst expected runtime + 20% to account for other latencies.
worst_runtime = (pool_timeout + cmd_timeout) * pool_concurrency * 1.2
async def worker(pool):
async with pool.acquire(timeout=pool_timeout) as con:
await con.fetch('SELECT pg_sleep($1)', query_runtime)
def kill_connectivity():
self.proxy.trigger_connectivity_loss()
new_pool = self.create_pool(
database='postgres', min_size=pool_size, max_size=pool_size,
timeout=cmd_timeout, command_timeout=cmd_timeout)
with self.assertRunUnder(worst_runtime):
pool = await new_pool
try:
workers = [worker(pool) for _ in range(concurrency)]
self.loop.call_later(1, kill_connectivity)
await asyncio.gather(
*workers, return_exceptions=True)
finally:
pool.terminate()