Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ on:
branches: [ master ]

jobs:
build:
test:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
python-version: [3.8, 3.9]
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -35,20 +35,35 @@ jobs:
- name: Testing
run: |
poetry run pytest
- name: Fix coverage.xml for Sonar
run: |
sed -i 's/\/home\/runner\/work\/connect-extension-runner\/connect-extension-runner\//\/github\/workspace\//g' coverage.xml
- name: SonarCloud
uses: SonarSource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
- name: Wait sonar to process report
uses: jakejarvis/wait-action@master
with:
time: '60s'
- name: SonarQube Quality Gate check
uses: sonarsource/sonarqube-quality-gate-action@master
timeout-minutes: 5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
sonar:
needs: [test]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Python 3.10.0
uses: actions/setup-python@v2
with:
python-version: '3.10.0'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
poetry install
- name: Generate coverage report
run: |
poetry run pytest
- name: Fix coverage.xml for Sonar
run: |
sed -i 's/\/home\/runner\/work\/connect-extension-runner\/connect-extension-runner\//\/github\/workspace\//g' coverage.xml
- name: SonarCloud
uses: SonarSource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
- name: SonarQube Quality Gate check
uses: sonarsource/sonarqube-quality-gate-action@master
timeout-minutes: 5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
13 changes: 12 additions & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,24 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Docker meta
id: meta
uses: docker/metadata-action@v3
with:
images: cloudblueconnect/connect-extension-runner
tags: |
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
flavor: |
latest=false
- name: Build and push docker image
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
build-args: |
RUNNER_VERSION=${{ steps.tag.outputs.result }}
tags: cloudblueconnect/connect-extension-runner:${{ steps.tag.outputs.result }},cloudblueconnect/connect-extension-runner:latest
tags: ${{ steps.meta.outputs.tags }}
- name: Docker image digest
run: echo ${{ steps.docker_build.outputs.digest }}
11 changes: 11 additions & 0 deletions connect/eaas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,14 @@
MAX_RETRY_TIME_GENERIC_SECONDS = 15 * 60
MAX_RETRY_TIME_MAINTENANCE_SECONDS = 3 * 60 * 60
MAX_RETRY_DELAY_TIME_SECONDS = 5 * 60

DELAY_ON_CONNECT_EXCEPTION_SECONDS = 10

ORDINAL_SUFFIX = {
1: 'st',
2: 'nd',
3: 'rd',
11: 'th',
12: 'th',
13: 'th',
}
7 changes: 7 additions & 0 deletions connect/eaas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from connect.eaas.constants import (
BACKGROUND_TASK_MAX_EXECUTION_TIME,
INTERACTIVE_TASK_MAX_EXECUTION_TIME,
ORDINAL_SUFFIX,
SCHEDULED_TASK_MAX_EXECUTION_TIME,
TASK_TYPE_EXT_METHOD_MAP,
)
Expand Down Expand Up @@ -106,3 +107,9 @@ def get_version():
return get_distribution('connect-extension-runner').version
except DistributionNotFound:
return '0.0.0'


def to_ordinal(val):
if val > 14:
return f"{val}{ORDINAL_SUFFIX.get(int(str(val)[-1]), 'th')}"
return f"{val}{ORDINAL_SUFFIX.get(val, 'th')}"
181 changes: 85 additions & 96 deletions connect/eaas/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
import logging
from asyncio.exceptions import TimeoutError

import backoff
import websockets
from websockets.exceptions import (
ConnectionClosedError,
ConnectionClosedOK,
InvalidStatusCode,
WebSocketException,
)
import backoff


from connect.eaas.config import ConfigHelper
from connect.eaas.constants import (
DELAY_ON_CONNECT_EXCEPTION_SECONDS,
MAX_RETRY_DELAY_TIME_SECONDS,
MAX_RETRY_TIME_GENERIC_SECONDS,
MAX_RETRY_TIME_MAINTENANCE_SECONDS,
Expand All @@ -37,6 +38,7 @@
StopBackoffError,
)
from connect.eaas.handler import ExtensionHandler
from connect.eaas.helpers import to_ordinal
from connect.eaas.managers import (
BackgroundTasksManager,
InteractiveTasksManager,
Expand All @@ -59,28 +61,6 @@ def _get_max_retry_delay_time():
return MAX_RETRY_DELAY_TIME_SECONDS


_ORDINAL_DICT = {
1: 'st',
2: 'nd',
3: 'rd',
11: 'th',
12: 'th',
13: 'th',
}


def _on_communication_backoff(details):
if details['tries'] > 14:
ordinal_attempt = _ORDINAL_DICT.get(int(str(details['tries'])[-1]), 'th')
else:
ordinal_attempt = _ORDINAL_DICT.get(details['tries'], 'th')
logger.info(
f'{details["tries"]}{ordinal_attempt} communication attempt failed, backing off waiting '
f'{details["wait"]:.2f} seconds after next retry. Elapsed time: {details["elapsed"]:.2f}'
' seconds.',
)


class Worker:
"""
The Worker is responsible to handle the websocket connection
Expand Down Expand Up @@ -126,18 +106,50 @@ def get_url(self):
url = f'{url}?running_tasks={self.background_manager.running_tasks}'
return f'{url}&running_scheduled_tasks={self.scheduled_manager.running_tasks}'

async def ensure_connection(self):
async def ensure_connection(self): # noqa: CCR001
"""
Ensure that a websocket connection is established.
"""
if self.ws is None or self.ws.closed:
url = self.get_url()
self.ws = await websockets.connect(
url,
extra_headers=self.config.get_headers(),
)
await (await self.ws.ping())
logger.info(f'Connected to {url}')
@backoff.on_exception(
backoff.expo,
CommunicationError,
max_time=_get_max_retry_time_generic,
max_value=_get_max_retry_delay_time,
on_backoff=self._backoff_log,
giveup=self._backoff_shutdown,
)
@backoff.on_exception(
backoff.expo,
MaintenanceError,
max_time=_get_max_retry_time_maintenance,
max_value=_get_max_retry_delay_time,
on_backoff=self._backoff_log,
giveup=self._backoff_shutdown,
)
async def _connect():
if self.ws is None or self.ws.closed:
try:
url = self.get_url()
self.ws = await websockets.connect(
url,
extra_headers=self.config.get_headers(),
)
await (await self.ws.ping())
logger.info(f'Connected to {url}')
except InvalidStatusCode as ic:
if ic.status_code == 502:
logger.warning('Maintenance in progress...')
raise MaintenanceError()
else:
logger.warning(
f'Received an unexpected status from server: {ic.status_code}...',
)
raise CommunicationError()
except Exception as e:
logger.warning(f'Received an unexpected exception: {e}...')
raise CommunicationError()

await _connect()

async def send(self, message):
"""
Expand Down Expand Up @@ -169,82 +181,51 @@ def get_capabilities(self):
),
)

async def communicate(self): # noqa: CCR001
@backoff.on_exception(
backoff.expo,
CommunicationError,
factor=10,
max_time=_get_max_retry_time_generic,
max_value=_get_max_retry_delay_time,
jitter=backoff.random_jitter,
on_backoff=_on_communication_backoff,
giveup=self._backoff_shutdown,
)
@backoff.on_exception(
backoff.expo,
MaintenanceError,
factor=10,
max_time=_get_max_retry_time_maintenance,
max_value=_get_max_retry_delay_time,
jitter=backoff.random_jitter,
on_backoff=_on_communication_backoff,
giveup=self._backoff_shutdown,
)
async def _do_communicate():
async def run(self): # noqa: CCR001
"""
Main loop for the websocket connection.
Once started, this worker will send the capabilities message to
the websocket server and start a loop to receive messages from the
websocket server.
"""
await self.run_event.wait()
while self.run_event.is_set():
try:
await self.ensure_connection()
await self.send(self.get_capabilities())
while self.run_event.is_set():
await self.ensure_connection()
message = await self.receive()
if not message:
continue
await self.process_message(message)
except ConnectionClosedError as e:
logger.warning(f'Connection closed with code {e.rcvd} from: {self.get_url()}')
raise CommunicationError()
except (ConnectionClosedOK, StopBackoffError):
break
except ConnectionClosedError:
logger.warning(
f'Disconnected from: {self.get_url()}'
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
)
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
except InvalidStatusCode as ic:
if ic.status_code == 502:
logger.warning('InvalidStatusCode 502 raised. Maintenance in progress.')
raise MaintenanceError()
logger.warning(
'Maintenance in progress'
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
)
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
else:
logger.warning(f'InvalidStatusCode {ic.status_code} raised.')
raise CommunicationError()
except WebSocketException as wse:
logger.warning(f'Unexpected websocket exception {wse}.')
raise CommunicationError()
logger.warning(
f'Received an unexpected status from server: {ic.status_code}'
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
)
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
except Exception as e:
logger.warning(f'Unexpected error in communicate: {e}.')
raise CommunicationError()

await _do_communicate()

async def run(self): # noqa: CCR001
"""
Main loop for the websocket connection.
Once started, this worker will send the capabilities message to
the websocket server and start a loop to receive messages from the
websocket server.
"""
await self.run_event.wait()
while self.run_event.is_set():
try:
await self.communicate()
except ConnectionClosedOK:
self.run_event.clear()
except CommunicationError:
logger.error(
f'Max retries exceeded after {MAX_RETRY_TIME_GENERIC_SECONDS} seconds',
logger.exception(
f'Unexpected exception {e}'
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
)
self.run_event.clear()
except MaintenanceError:
logger.error(
f'Max retries exceeded after {MAX_RETRY_TIME_MAINTENANCE_SECONDS} '
'seconds',
)
self.run_event.clear()
except StopBackoffError:
pass
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)

if self.ws:
await self.ws.close()

Expand Down Expand Up @@ -381,3 +362,11 @@ def _backoff_shutdown(self, _):
if not self.run_event.is_set():
logger.info('Worker exiting, stop backoff loop')
raise StopBackoffError()

def _backoff_log(self, details):
logger.info(
f'{to_ordinal(details["tries"])} communication attempt failed, backing off waiting '
f'{details["wait"]:.2f} seconds after next retry. '
f'Elapsed time: {details["elapsed"]:.2f}'
' seconds.',
)
Loading