Skip to content

Commit

Permalink
Add support for the new wait-change API (#595)
Browse files Browse the repository at this point in the history
Corresponds to the new wait-change API endpoint in
canonical/pebble#63. This uses the new API if it's
present, otherwise falls back to the existing polling-every-100ms method.
  • Loading branch information
benhoyt committed Sep 2, 2021
1 parent c441f1a commit 3d0c364
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 19 deletions.
77 changes: 72 additions & 5 deletions ops/pebble.py
Expand Up @@ -865,18 +865,85 @@ def _services_action(
def wait_change(
self, change_id: ChangeID, timeout: float = 30.0, delay: float = 0.1,
) -> Change:
"""Poll change every delay seconds (up to timeout) for it to be ready."""
deadline = time.time() + timeout
"""Wait for the given change to be ready.
while time.time() < deadline:
If the Pebble server supports the /v1/changes/{id}/wait API endpoint,
use that to avoid polling, otherwise poll /v1/changes/{id} every delay
seconds.
Args:
change_id: Change ID of change to wait for.
timeout: Maximum time in seconds to wait for the change to be
ready. May be None, in which case wait_change never times out.
delay: If polling, this is the delay in seconds between attempts.
Returns:
The Change object being waited on.
Raises:
TimeoutError: If the maximum timeout is reached.
"""
try:
return self._wait_change_using_wait(change_id, timeout)
except NotImplementedError:
# Pebble server doesn't support wait endpoint, fall back to polling
return self._wait_change_using_polling(change_id, timeout, delay)

def _wait_change_using_wait(self, change_id, timeout):
"""Wait for a change to be ready using the wait-change API."""
deadline = time.time() + timeout if timeout is not None else None

# Hit the wait endpoint every Client.timeout-1 seconds to avoid long
# requests (the -1 is to ensure it wakes up before the socket timeout)
while True:
this_timeout = max(self.timeout - 1, 1) # minimum of 1 second
if timeout is not None:
time_remaining = deadline - time.time()
if time_remaining <= 0:
break
# Wait the lesser of the time remaining and Client.timeout-1
this_timeout = min(time_remaining, this_timeout)

try:
return self._wait_change(change_id, this_timeout)
except TimeoutError:
# Catch timeout from wait endpoint and loop to check deadline
pass

raise TimeoutError('timed out waiting for change {} ({} seconds)'.format(
change_id, timeout))

def _wait_change(self, change_id: ChangeID, timeout: float = None) -> Change:
"""Call the wait-change API endpoint directly."""
query = {}
if timeout is not None:
query['timeout'] = '{:.3f}s'.format(timeout)

try:
resp = self._request('GET', '/v1/changes/{}/wait'.format(change_id), query)
except APIError as e:
if e.code == 404:
raise NotImplementedError('server does not implement wait-change endpoint')
if e.code == 504:
raise TimeoutError('timed out waiting for change {} ({} seconds)'.format(
change_id, timeout))
raise

return Change.from_dict(resp['result'])

def _wait_change_using_polling(self, change_id, timeout, delay):
"""Wait for a change to be ready by polling the get-change API."""
deadline = time.time() + timeout if timeout is not None else None

while timeout is None or time.time() < deadline:
change = self.get_change(change_id)
if change.ready:
return change

time.sleep(delay)

raise TimeoutError(
'timed out waiting for change {} ({} seconds)'.format(change_id, timeout))
raise TimeoutError('timed out waiting for change {} ({} seconds)'.format(
change_id, timeout))

def add_layer(
self, label: str, layer: typing.Union[str, dict, Layer], *, combine: bool = False):
Expand Down
6 changes: 6 additions & 0 deletions test/pebble_cli.py
Expand Up @@ -98,6 +98,10 @@ def main():

p = subparsers.add_parser('system-info', help='show Pebble system information')

p = subparsers.add_parser('wait', help='wait for a change by ID')
p.add_argument('-t', '--timeout', type=float, help='timeout in seconds')
p.add_argument('change_id', help='ID of change to wait for')

p = subparsers.add_parser('warnings', help='show (filtered) warnings')
p.add_argument('--select', help='warning state to filter on, default %(default)s',
choices=[s.value for s in pebble.WarningState], default='all')
Expand Down Expand Up @@ -173,6 +177,8 @@ def main():
result = client.stop_services(args.service)
elif args.command == 'system-info':
result = client.get_system_info()
elif args.command == 'wait':
result = client.wait_change(args.change_id, timeout=args.timeout)
elif args.command == 'warnings':
result = client.get_warnings(select=pebble.WarningState(args.select))
else:
Expand Down
132 changes: 118 additions & 14 deletions test/test_pebble.py
Expand Up @@ -751,10 +751,16 @@ class MockClient(pebble.Client):
def __init__(self):
self.requests = []
self.responses = []
self.timeout = 5

def _request(self, method, path, query=None, body=None):
self.requests.append((method, path, query, body))
return self.responses.pop(0)
resp = self.responses.pop(0)
if isinstance(resp, Exception):
raise resp
if callable(resp):
resp = resp()
return resp

def _request_raw(self, method, path, query=None, headers=None, data=None):
self.requests.append((method, path, query, headers, data))
Expand Down Expand Up @@ -973,14 +979,6 @@ def _services_action_helper(self, action, api_func, services):
"type": "async"
})
change = self.build_mock_change_dict()
change['ready'] = False
self.client.responses.append({
"result": change,
"status": "OK",
"status-code": 200,
"type": "sync"
})
change = self.build_mock_change_dict()
change['ready'] = True
self.client.responses.append({
"result": change,
Expand All @@ -992,8 +990,7 @@ def _services_action_helper(self, action, api_func, services):
self.assertEqual(change_id, '70')
self.assertEqual(self.client.requests, [
('POST', '/v1/services', None, {'action': action, 'services': services}),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
])

def _services_action_async_helper(self, action, api_func, services):
Expand Down Expand Up @@ -1079,11 +1076,115 @@ def test_change_error(self):

self.assertEqual(self.client.requests, [
('POST', '/v1/services', None, {'action': 'autostart', 'services': []}),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
])

def test_wait_change_success(self, timeout=30.0):
change = self.build_mock_change_dict()
self.client.responses.append({
"result": change,
"status": "OK",
"status-code": 200,
"type": "sync"
})

response = self.client.wait_change('70', timeout=timeout)
self.assertEqual(response.id, '70')
self.assertTrue(response.ready)

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
])

def test_wait_change_success_timeout_none(self):
self.test_wait_change_success(timeout=None)

def test_wait_change_success_multiple_calls(self):
mock_time = MockTime()
with unittest.mock.patch('ops.pebble.time', mock_time):
def timeout_response(n):
mock_time.sleep(n) # simulate passing of time due to wait_change call
raise pebble.APIError({}, 504, "Gateway Timeout", "timed out")
self.client.responses.append(lambda: timeout_response(4))

change = self.build_mock_change_dict()
self.client.responses.append({
"result": change,
"status": "OK",
"status-code": 200,
"type": "sync"
})

response = self.client.wait_change('70')
self.assertEqual(response.id, '70')
self.assertTrue(response.ready)

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
])

self.assertEqual(mock_time.time(), 4)

def test_wait_change_success_polled(self, timeout=30.0):
mock_time = MockTime()
with unittest.mock.patch('ops.pebble.time', mock_time):
# Trigger polled mode
self.client.responses.append(pebble.APIError({}, 404, "Not Found", "not found"))

for i in range(3):
change = self.build_mock_change_dict()
change['ready'] = i == 2
self.client.responses.append({
"result": change,
"status": "OK",
"status-code": 200,
"type": "sync"
})

response = self.client.wait_change('70', timeout=timeout, delay=1)
self.assertEqual(response.id, '70')
self.assertTrue(response.ready)

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70', None, None),
])

self.assertEqual(mock_time.time(), 2)

def test_wait_change_success_polled_timeout_none(self):
self.test_wait_change_success_polled(timeout=None)

def test_wait_change_timeout(self):
with unittest.mock.patch('ops.pebble.time', MockTime()):
mock_time = MockTime()
with unittest.mock.patch('ops.pebble.time', mock_time):
def timeout_response(n):
mock_time.sleep(n) # simulate passing of time due to wait_change call
raise pebble.APIError({}, 504, "Gateway Timeout", "timed out")
self.client.responses.append(lambda: timeout_response(4))
self.client.responses.append(lambda: timeout_response(2))

with self.assertRaises(pebble.TimeoutError) as cm:
self.client.wait_change('70', timeout=6)
self.assertIsInstance(cm.exception, pebble.Error)
self.assertIsInstance(cm.exception, TimeoutError)

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
('GET', '/v1/changes/70/wait', {'timeout': '2.000s'}, None),
])

self.assertEqual(mock_time.time(), 6)

def test_wait_change_timeout_polled(self):
mock_time = MockTime()
with unittest.mock.patch('ops.pebble.time', mock_time):
# Trigger polled mode
self.client.responses.append(pebble.APIError({}, 404, "Not Found", "not found"))

change = self.build_mock_change_dict()
change['ready'] = False
for _ in range(3):
Expand All @@ -1100,11 +1201,14 @@ def test_wait_change_timeout(self):
self.assertIsInstance(cm.exception, TimeoutError)

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70/wait', {'timeout': '3.000s'}, None),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70', None, None),
])

self.assertEqual(mock_time.time(), 3)

def test_wait_change_error(self):
change = self.build_mock_change_dict()
change['err'] = 'Some kind of service error'
Expand All @@ -1120,7 +1224,7 @@ def test_wait_change_error(self):
self.assertEqual(response.err, 'Some kind of service error')

self.assertEqual(self.client.requests, [
('GET', '/v1/changes/70', None, None),
('GET', '/v1/changes/70/wait', {'timeout': '4.000s'}, None),
])

def test_add_layer(self):
Expand Down

0 comments on commit 3d0c364

Please sign in to comment.