From 990aed744eae67a04a67eda8f5d29f83521afacd Mon Sep 17 00:00:00 2001 From: Francesco Faraone Date: Fri, 4 Feb 2022 15:32:25 +0100 Subject: [PATCH] Fix backoff (apply only to connect) --- .github/workflows/build.yml | 53 ++++++---- .github/workflows/deploy.yml | 13 ++- connect/eaas/constants.py | 11 +++ connect/eaas/helpers.py | 7 ++ connect/eaas/worker.py | 181 ++++++++++++++++------------------- tests/test_worker.py | 127 +++++++++++++++++------- 6 files changed, 242 insertions(+), 150 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 83cc40e..5acd41b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,12 +10,12 @@ on: branches: [ master ] jobs: - build: + test: runs-on: ubuntu-latest timeout-minutes: 10 strategy: matrix: - python-version: [3.8, 3.9] + python-version: ["3.8", "3.9", "3.10"] steps: - uses: actions/checkout@v2 with: @@ -35,20 +35,35 @@ jobs: - name: Testing run: | poetry run pytest - - name: Fix coverage.xml for Sonar - run: | - sed -i 's/\/home\/runner\/work\/connect-extension-runner\/connect-extension-runner\//\/github\/workspace\//g' coverage.xml - - name: SonarCloud - uses: SonarSource/sonarcloud-github-action@master - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} - - name: Wait sonar to process report - uses: jakejarvis/wait-action@master - with: - time: '60s' - - name: SonarQube Quality Gate check - uses: sonarsource/sonarqube-quality-gate-action@master - timeout-minutes: 5 - env: - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} \ No newline at end of file + sonar: + needs: [test] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Set up Python 3.10.0 + uses: actions/setup-python@v2 + with: + python-version: '3.10.0' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install poetry + poetry install + - name: Generate coverage report + run: | + poetry run pytest + - name: Fix coverage.xml for Sonar + run: | + sed -i 's/\/home\/runner\/work\/connect-extension-runner\/connect-extension-runner\//\/github\/workspace\//g' coverage.xml + - name: SonarCloud + uses: SonarSource/sonarcloud-github-action@master + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + - name: SonarQube Quality Gate check + uses: sonarsource/sonarqube-quality-gate-action@master + timeout-minutes: 5 + env: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 33f0c27..d880996 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -48,6 +48,17 @@ jobs: with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Docker meta + id: meta + uses: docker/metadata-action@v3 + with: + images: cloudblueconnect/connect-extension-runner + tags: | + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + flavor: | + latest=false - name: Build and push docker image id: docker_build uses: docker/build-push-action@v2 @@ -55,6 +66,6 @@ jobs: push: true build-args: | RUNNER_VERSION=${{ steps.tag.outputs.result }} - tags: cloudblueconnect/connect-extension-runner:${{ steps.tag.outputs.result }},cloudblueconnect/connect-extension-runner:latest + tags: ${{ steps.meta.outputs.tags }} - name: Docker image digest run: echo ${{ steps.docker_build.outputs.digest }} diff --git a/connect/eaas/constants.py b/connect/eaas/constants.py index 32f2091..a02100e 100644 --- a/connect/eaas/constants.py +++ b/connect/eaas/constants.py @@ -101,3 +101,14 @@ MAX_RETRY_TIME_GENERIC_SECONDS = 15 * 60 MAX_RETRY_TIME_MAINTENANCE_SECONDS = 3 * 60 * 60 MAX_RETRY_DELAY_TIME_SECONDS = 5 * 60 + +DELAY_ON_CONNECT_EXCEPTION_SECONDS = 10 + +ORDINAL_SUFFIX = { + 1: 'st', + 2: 'nd', + 3: 'rd', + 11: 'th', + 12: 'th', + 13: 'th', +} diff --git a/connect/eaas/helpers.py b/connect/eaas/helpers.py index eff4a0f..1f6567f 100644 --- a/connect/eaas/helpers.py +++ b/connect/eaas/helpers.py @@ -17,6 +17,7 @@ from connect.eaas.constants import ( BACKGROUND_TASK_MAX_EXECUTION_TIME, INTERACTIVE_TASK_MAX_EXECUTION_TIME, + ORDINAL_SUFFIX, SCHEDULED_TASK_MAX_EXECUTION_TIME, TASK_TYPE_EXT_METHOD_MAP, ) @@ -106,3 +107,9 @@ def get_version(): return get_distribution('connect-extension-runner').version except DistributionNotFound: return '0.0.0' + + +def to_ordinal(val): + if val > 14: + return f"{val}{ORDINAL_SUFFIX.get(int(str(val)[-1]), 'th')}" + return f"{val}{ORDINAL_SUFFIX.get(val, 'th')}" diff --git a/connect/eaas/worker.py b/connect/eaas/worker.py index 39c64b0..ef51ec3 100644 --- a/connect/eaas/worker.py +++ b/connect/eaas/worker.py @@ -9,17 +9,18 @@ import logging from asyncio.exceptions import TimeoutError +import backoff import websockets from websockets.exceptions import ( ConnectionClosedError, ConnectionClosedOK, InvalidStatusCode, - WebSocketException, ) -import backoff + from connect.eaas.config import ConfigHelper from connect.eaas.constants import ( + DELAY_ON_CONNECT_EXCEPTION_SECONDS, MAX_RETRY_DELAY_TIME_SECONDS, MAX_RETRY_TIME_GENERIC_SECONDS, MAX_RETRY_TIME_MAINTENANCE_SECONDS, @@ -37,6 +38,7 @@ StopBackoffError, ) from connect.eaas.handler import ExtensionHandler +from connect.eaas.helpers import to_ordinal from connect.eaas.managers import ( BackgroundTasksManager, InteractiveTasksManager, @@ -59,28 +61,6 @@ def _get_max_retry_delay_time(): return MAX_RETRY_DELAY_TIME_SECONDS -_ORDINAL_DICT = { - 1: 'st', - 2: 'nd', - 3: 'rd', - 11: 'th', - 12: 'th', - 13: 'th', -} - - -def _on_communication_backoff(details): - if details['tries'] > 14: - ordinal_attempt = _ORDINAL_DICT.get(int(str(details['tries'])[-1]), 'th') - else: - ordinal_attempt = _ORDINAL_DICT.get(details['tries'], 'th') - logger.info( - f'{details["tries"]}{ordinal_attempt} communication attempt failed, backing off waiting ' - f'{details["wait"]:.2f} seconds after next retry. Elapsed time: {details["elapsed"]:.2f}' - ' seconds.', - ) - - class Worker: """ The Worker is responsible to handle the websocket connection @@ -126,18 +106,50 @@ def get_url(self): url = f'{url}?running_tasks={self.background_manager.running_tasks}' return f'{url}&running_scheduled_tasks={self.scheduled_manager.running_tasks}' - async def ensure_connection(self): + async def ensure_connection(self): # noqa: CCR001 """ Ensure that a websocket connection is established. """ - if self.ws is None or self.ws.closed: - url = self.get_url() - self.ws = await websockets.connect( - url, - extra_headers=self.config.get_headers(), - ) - await (await self.ws.ping()) - logger.info(f'Connected to {url}') + @backoff.on_exception( + backoff.expo, + CommunicationError, + max_time=_get_max_retry_time_generic, + max_value=_get_max_retry_delay_time, + on_backoff=self._backoff_log, + giveup=self._backoff_shutdown, + ) + @backoff.on_exception( + backoff.expo, + MaintenanceError, + max_time=_get_max_retry_time_maintenance, + max_value=_get_max_retry_delay_time, + on_backoff=self._backoff_log, + giveup=self._backoff_shutdown, + ) + async def _connect(): + if self.ws is None or self.ws.closed: + try: + url = self.get_url() + self.ws = await websockets.connect( + url, + extra_headers=self.config.get_headers(), + ) + await (await self.ws.ping()) + logger.info(f'Connected to {url}') + except InvalidStatusCode as ic: + if ic.status_code == 502: + logger.warning('Maintenance in progress...') + raise MaintenanceError() + else: + logger.warning( + f'Received an unexpected status from server: {ic.status_code}...', + ) + raise CommunicationError() + except Exception as e: + logger.warning(f'Received an unexpected exception: {e}...') + raise CommunicationError() + + await _connect() async def send(self, message): """ @@ -169,82 +181,51 @@ def get_capabilities(self): ), ) - async def communicate(self): # noqa: CCR001 - @backoff.on_exception( - backoff.expo, - CommunicationError, - factor=10, - max_time=_get_max_retry_time_generic, - max_value=_get_max_retry_delay_time, - jitter=backoff.random_jitter, - on_backoff=_on_communication_backoff, - giveup=self._backoff_shutdown, - ) - @backoff.on_exception( - backoff.expo, - MaintenanceError, - factor=10, - max_time=_get_max_retry_time_maintenance, - max_value=_get_max_retry_delay_time, - jitter=backoff.random_jitter, - on_backoff=_on_communication_backoff, - giveup=self._backoff_shutdown, - ) - async def _do_communicate(): + async def run(self): # noqa: CCR001 + """ + Main loop for the websocket connection. + Once started, this worker will send the capabilities message to + the websocket server and start a loop to receive messages from the + websocket server. + """ + await self.run_event.wait() + while self.run_event.is_set(): try: await self.ensure_connection() await self.send(self.get_capabilities()) while self.run_event.is_set(): - await self.ensure_connection() message = await self.receive() if not message: continue await self.process_message(message) - except ConnectionClosedError as e: - logger.warning(f'Connection closed with code {e.rcvd} from: {self.get_url()}') - raise CommunicationError() + except (ConnectionClosedOK, StopBackoffError): + break + except ConnectionClosedError: + logger.warning( + f'Disconnected from: {self.get_url()}' + f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s', + ) + await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS) except InvalidStatusCode as ic: if ic.status_code == 502: - logger.warning('InvalidStatusCode 502 raised. Maintenance in progress.') - raise MaintenanceError() + logger.warning( + 'Maintenance in progress' + f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s', + ) + await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS) else: - logger.warning(f'InvalidStatusCode {ic.status_code} raised.') - raise CommunicationError() - except WebSocketException as wse: - logger.warning(f'Unexpected websocket exception {wse}.') - raise CommunicationError() + logger.warning( + f'Received an unexpected status from server: {ic.status_code}' + f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s', + ) + await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS) except Exception as e: - logger.warning(f'Unexpected error in communicate: {e}.') - raise CommunicationError() - - await _do_communicate() - - async def run(self): # noqa: CCR001 - """ - Main loop for the websocket connection. - Once started, this worker will send the capabilities message to - the websocket server and start a loop to receive messages from the - websocket server. - """ - await self.run_event.wait() - while self.run_event.is_set(): - try: - await self.communicate() - except ConnectionClosedOK: - self.run_event.clear() - except CommunicationError: - logger.error( - f'Max retries exceeded after {MAX_RETRY_TIME_GENERIC_SECONDS} seconds', + logger.exception( + f'Unexpected exception {e}' + f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s', ) - self.run_event.clear() - except MaintenanceError: - logger.error( - f'Max retries exceeded after {MAX_RETRY_TIME_MAINTENANCE_SECONDS} ' - 'seconds', - ) - self.run_event.clear() - except StopBackoffError: - pass + await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS) + if self.ws: await self.ws.close() @@ -381,3 +362,11 @@ def _backoff_shutdown(self, _): if not self.run_event.is_set(): logger.info('Worker exiting, stop backoff loop') raise StopBackoffError() + + def _backoff_log(self, details): + logger.info( + f'{to_ordinal(details["tries"])} communication attempt failed, backing off waiting ' + f'{details["wait"]:.2f} seconds after next retry. ' + f'Elapsed time: {details["elapsed"]:.2f}' + ' seconds.', + ) diff --git a/tests/test_worker.py b/tests/test_worker.py index d82ee36..ef545d3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -16,8 +16,9 @@ TaskPayload, TaskType, ) +from connect.eaas.exceptions import CommunicationError, MaintenanceError, StopBackoffError from connect.eaas.extension import Extension, ProcessingResponse, ScheduledExecutionResponse -from connect.eaas.worker import _on_communication_backoff, Worker +from connect.eaas.worker import Worker from tests.utils import WSHandler @@ -605,6 +606,7 @@ def get_descriptor(cls): async def test_connection_closed_error(mocker, ws_server, unused_port, caplog): mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 1) mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) + mocker.patch('connect.eaas.worker.DELAY_ON_CONNECT_EXCEPTION_SECONDS', 0.1) mocker.patch('connect.eaas.handler.get_extension_class') mocker.patch('connect.eaas.handler.get_extension_type') mocker.patch( @@ -636,7 +638,7 @@ async def test_connection_closed_error(mocker, ws_server, unused_port, caplog): await task assert ( - f'Connection closed with code 1006 from: ws://127.0.0.1:{unused_port}' + f'Disconnected from: ws://127.0.0.1:{unused_port}' '/public/v1/devops/ws/ENV-000-0001/INS-000-0002' '?running_tasks=0&running_scheduled_tasks=0' ) in caplog.text @@ -644,6 +646,7 @@ async def test_connection_closed_error(mocker, ws_server, unused_port, caplog): @pytest.mark.asyncio async def test_connection_websocket_exception(mocker, ws_server, unused_port, caplog): + mocker.patch('connect.eaas.worker.DELAY_ON_CONNECT_EXCEPTION_SECONDS', 0.1) mocker.patch('connect.eaas.handler.get_extension_class') mocker.patch('connect.eaas.handler.get_extension_type') mocker.patch( @@ -667,18 +670,19 @@ async def test_connection_websocket_exception(mocker, ws_server, unused_port, ca async with ws_server(handler): worker = Worker(secure=False) - worker.send = mocker.AsyncMock(side_effect=WebSocketException()) + worker.send = mocker.AsyncMock(side_effect=WebSocketException('test error')) with caplog.at_level(logging.INFO): task = asyncio.create_task(worker.start()) await asyncio.sleep(.5) worker.stop() await task - assert 'Unexpected websocket exception' in caplog.text + assert 'Unexpected exception test error, try to reconnect in 0.1s' in caplog.text @pytest.mark.asyncio async def test_connection_maintenance(mocker, ws_server, unused_port, caplog): + mocker.patch('connect.eaas.worker.DELAY_ON_CONNECT_EXCEPTION_SECONDS', 0.1) mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_MAINTENANCE_SECONDS', 1) mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) mocker.patch('connect.eaas.handler.get_extension_class') @@ -711,12 +715,12 @@ async def test_connection_maintenance(mocker, ws_server, unused_port, caplog): worker.stop() await task - assert 'InvalidStatusCode 502 raised. Maintenance in progress.' in caplog.text - assert 'Backing off ' in caplog.text + assert 'Maintenance in progress, try to reconnect in 0.1s' in caplog.text @pytest.mark.asyncio async def test_connection_internal_server_error(mocker, ws_server, unused_port, caplog): + mocker.patch('connect.eaas.worker.DELAY_ON_CONNECT_EXCEPTION_SECONDS', 0.1) mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 1) mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) mocker.patch('connect.eaas.handler.get_extension_class') @@ -749,8 +753,7 @@ async def test_connection_internal_server_error(mocker, ws_server, unused_port, worker.stop() await task - assert 'InvalidStatusCode 500 raised.' in caplog.text - assert 'Backing off ' in caplog.text + assert 'Received an unexpected status from server: 500' in caplog.text @pytest.mark.asyncio @@ -1073,51 +1076,107 @@ async def test_sender_ws_closed(mocker, config_payload, task_payload): (23, 'rd'), ), ) -def test__on_communication_backoff(caplog, tries, ordinal): +def test_backoff_log(mocker, caplog, tries, ordinal): + mocker.patch('connect.eaas.handler.get_extension_class') + mocker.patch('connect.eaas.handler.get_extension_type') details = {'tries': tries, 'elapsed': 2.2, 'wait': 1.1} expected = ( f'{tries}{ordinal} communication attempt failed, backing off waiting ' f'{details["wait"]:.2f} seconds after next retry. Elapsed time: {details["elapsed"]:.2f}' ' seconds.' ) + w = Worker() with caplog.at_level(logging.INFO): - _on_communication_backoff(details) + w._backoff_log(details) assert expected in caplog.records[0].message @pytest.mark.asyncio -async def test_connection_unexpected_error(mocker, ws_server, unused_port, caplog): +async def test_ensure_connection_maintenance(mocker, caplog): + mocker.patch('connect.eaas.handler.get_extension_class') + mocker.patch('connect.eaas.handler.get_extension_type') + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 1) + mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_MAINTENANCE_SECONDS', 1) + mocker.patch('connect.eaas.worker.websockets.connect', side_effect=InvalidStatusCode(502, None)) + + worker = Worker() + worker.run_event.set() + worker.get_url = lambda: 'ws://test' + + with pytest.raises(MaintenanceError): + with caplog.at_level(logging.INFO): + await worker.ensure_connection() + + assert '1st communication attempt failed, backing off waiting' in caplog.text + + +@pytest.mark.asyncio +@pytest.mark.parametrize('status', (400, 401, 403, 500, 501)) +async def test_ensure_connection_other_statuses(mocker, caplog, status): + mocker.patch('connect.eaas.handler.get_extension_class') + mocker.patch('connect.eaas.handler.get_extension_type') + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 1) mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_MAINTENANCE_SECONDS', 1) + mocker.patch( + 'connect.eaas.worker.websockets.connect', + side_effect=InvalidStatusCode(status, None), + ) + + worker = Worker() + worker.run_event.set() + worker.get_url = lambda: 'ws://test' + + with pytest.raises(CommunicationError): + with caplog.at_level(logging.INFO): + await worker.ensure_connection() + + assert '1st communication attempt failed, backing off waiting' in caplog.text + + +@pytest.mark.asyncio +async def test_ensure_connection_generic_exception(mocker, caplog): mocker.patch('connect.eaas.handler.get_extension_class') mocker.patch('connect.eaas.handler.get_extension_type') + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 1) + mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_MAINTENANCE_SECONDS', 1) mocker.patch( - 'connect.eaas.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - }, + 'connect.eaas.worker.websockets.connect', + side_effect=RuntimeError('generic error'), ) - handler = WSHandler( - '/public/v1/devops/ws/ENV-000-0001/INS-000-0002?running_tasks=0&running_scheduled_tasks=0', - None, - [], + + worker = Worker() + worker.run_event.set() + worker.get_url = lambda: 'ws://test' + + with pytest.raises(CommunicationError): + with caplog.at_level(logging.INFO): + await worker.ensure_connection() + + assert '1st communication attempt failed, backing off waiting' in caplog.text + + +@pytest.mark.asyncio +async def test_ensure_connection_exit_backoff(mocker, caplog): + mocker.patch('connect.eaas.handler.get_extension_class') + mocker.patch('connect.eaas.handler.get_extension_type') + mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_GENERIC_SECONDS', 600) + mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1) + mocker.patch( + 'connect.eaas.worker.websockets.connect', + side_effect=RuntimeError('generic error'), ) - async with ws_server(handler): - worker = Worker(secure=False) - worker.send = mocker.AsyncMock(side_effect=RuntimeError('generic error')) + worker = Worker() + worker.run_event.set() + worker.get_url = lambda: 'ws://test' + + with pytest.raises(StopBackoffError): with caplog.at_level(logging.INFO): - task = asyncio.create_task(worker.start()) - await asyncio.sleep(.5) - worker.stop() + task = asyncio.create_task(worker.ensure_connection()) + worker.run_event.clear() await task - assert 'Unexpected error in communicate: generic error' in caplog.text - assert 'Backing off ' in caplog.text + assert 'Worker exiting, stop backoff loop' in caplog.text