From 50f6ab20bf5e22b825185cbca6311289f1b50600 Mon Sep 17 00:00:00 2001 From: Jonathan Rios Date: Tue, 15 Oct 2024 15:47:29 +0200 Subject: [PATCH] LITE-30582 Increase backoff factor between tasks generation --- connect_bi_reporter/constants.py | 2 +- connect_bi_reporter/scheduler.py | 3 +- connect_bi_reporter/uploads/services.py | 3 +- connect_bi_reporter/uploads/tasks.py | 28 +++++++++---- pyproject.toml | 3 ++ tests/uploads/test_tasks.py | 56 +++++++++++++++++++++++++ 6 files changed, 85 insertions(+), 10 deletions(-) diff --git a/connect_bi_reporter/constants.py b/connect_bi_reporter/constants.py index dea49e2..c0c9406 100644 --- a/connect_bi_reporter/constants.py +++ b/connect_bi_reporter/constants.py @@ -9,7 +9,7 @@ # Delay in seconds for schedule to process Upload task SECONDS_DELAY = 120 # Backoff factor in seconds between Upload tasks creation -SECONDS_BACKOFF_FACTOR = 10 +SECONDS_BACKOFF_FACTOR = 120 CREATE_UPLOADS_METHOD_NAME = 'create_uploads' PROCESS_UPLOADS_METHOD_NAME = 'process_upload' PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = { diff --git a/connect_bi_reporter/scheduler.py b/connect_bi_reporter/scheduler.py index 3630c27..76df7d5 100644 --- a/connect_bi_reporter/scheduler.py +++ b/connect_bi_reporter/scheduler.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from logging import Logger import enum +from logging import Logger from typing import Any, Dict, Optional from connect.client import ClientError, ConnectClient @@ -23,6 +23,7 @@ class TriggerTypeEnum(str, enum.Enum): class ResponseTypeEnum(str, enum.Enum): SUCCESS = 'done' ERROR = 'reschedule' + FAIL = 'fail' class TriggerType: diff --git a/connect_bi_reporter/uploads/services.py b/connect_bi_reporter/uploads/services.py index c896275..974c3e2 100644 --- a/connect_bi_reporter/uploads/services.py +++ b/connect_bi_reporter/uploads/services.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +import copy from sqlalchemy import util from connect.client import ClientError @@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds): def get_process_upload_task_payload(installation_id, upload_id, account_id): - payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD + payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD) payload.update({'name': f'Process Uploads - {account_id}'}) parameters = { 'installation_id': installation_id, diff --git a/connect_bi_reporter/uploads/tasks.py b/connect_bi_reporter/uploads/tasks.py index 394839d..2b7805d 100644 --- a/connect_bi_reporter/uploads/tasks.py +++ b/connect_bi_reporter/uploads/tasks.py @@ -1,5 +1,6 @@ import io from datetime import datetime +import time from zipfile import ZipFile from connect.client import ClientError @@ -18,7 +19,7 @@ from connect_bi_reporter.uploads.models import Upload from connect_bi_reporter.uploads.services import create_process_upload_tasks, create_uploads from connect_bi_reporter.uploads.storage_utils import upload_file -from connect_bi_reporter.scheduler import Scheduler +from connect_bi_reporter.scheduler import ResponseTypeEnum, Scheduler class UploadTaskApplicationMixin: @@ -78,6 +79,7 @@ def process_upload(self, schedule): if 'installation_id' not in schedule['parameter']: return ScheduledExecutionResponse.fail(output='Parameter installation_id is missing.') + begin_time = time.monotonic() instalation_client = self.get_installation_admin_client( schedule['parameter']['installation_id'], ) @@ -90,15 +92,16 @@ def process_upload(self, schedule): if not upload: return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.') - if upload.status != 'pending': + if upload.status != Upload.STATUSES.pending: return ScheduledExecutionResponse.fail( output=f'Cannot process upload in status `{upload.status}`.', ) - upload.status = 'processing' + upload.status = Upload.STATUSES.processing db.add(upload) db.commit() + execution_method_result = ResponseTypeEnum.SUCCESS try: report_data = download_report(instalation_client, upload.report_id) @@ -112,14 +115,25 @@ def process_upload(self, schedule): ) upload.size = uploaded_file_props.get('size', 0) - upload.status = 'uploaded' + upload.status = Upload.STATUSES.uploaded upload.name = file_name db.add(upload) db.commit() - return ScheduledExecutionResponse.done() except Exception: self.logger.exception(msg='Error processing upload') - upload.status = 'failed' + upload.status = Upload.STATUSES.failed db.add(upload) db.commit() - return ScheduledExecutionResponse.fail() + execution_method_result = ResponseTypeEnum.FAIL + + took = time.monotonic() - begin_time + self.logger.info( + 'Execution of `process_upload` task for Upload {0} finished (took "{1}"): ' + 'Upload status: `{2}`, Taks result: `{3}`.'.format( + upload.id, + took, + upload.status, + execution_method_result, + ), + ) + return getattr(ScheduledExecutionResponse, execution_method_result)() diff --git a/pyproject.toml b/pyproject.toml index 178073d..167337c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,9 @@ build-backend = "poetry.core.masonry.api" [tool.pytest.ini_options] testpaths = "tests" addopts = "--cov=connect_bi_reporter --cov-report=term-missing --cov-report=html --cov-report=xml" +filterwarnings = [ + "ignore::sqlalchemy.exc.SADeprecationWarning" +] [tool.coverage.run] relative_files = true diff --git a/tests/uploads/test_tasks.py b/tests/uploads/test_tasks.py index 2ae78cb..92fb59d 100644 --- a/tests/uploads/test_tasks.py +++ b/tests/uploads/test_tasks.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta, timezone import re from unittest.mock import call @@ -7,6 +8,7 @@ from connect.eaas.core.inject.models import Context from sqlalchemy.exc import DBAPIError +from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY from connect_bi_reporter.events import ConnectBiReporterEventsApplication @@ -16,6 +18,8 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker, logger, config={}, ) + p_time = mocker.patch('connect_bi_reporter.uploads.tasks.time') + p_time.monotonic.side_effect = [10, 12] ext.get_installation_admin_client = lambda self: connect_client with open('./tests/uploads/test-zip.zip', 'rb') as zf: @@ -41,6 +45,10 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker, assert re.match(feed.file_name + '_\\d{8} \\d{2}:\\d{2}:\\d{2}.csv', upload.name) assert upload.size == 1024 assert upload.status == upload.STATUSES.uploaded + assert logger.method_calls[0].args[0] == ( + f'Execution of `process_upload` task for Upload {upload.id} ' + f'finished (took "2"): Upload status: `uploaded`, Taks result: `done`.' + ) def test_process_upload_report_download_failed( @@ -229,6 +237,11 @@ def test_create_upload_schedule_task( ), ) ext.get_installation_admin_client = lambda self: connect_client + + _now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc) + p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime') + p_datetime.utcnow = lambda: _now + mocker.patch( 'connect_bi_reporter.uploads.tasks.get_extension_owner_client', return_value=connect_client, @@ -245,6 +258,9 @@ def test_create_upload_schedule_task( 'connect_bi_reporter.scheduler.create_schedule_task', return_value=eaas_schedule_task, ) + p_get_task_payload = mocker.patch( + 'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload', + ) feed1 = feed_factory( schedule_id=report_schedule['id'], account_id=installation['owner']['id'], @@ -274,6 +290,44 @@ def test_create_upload_schedule_task( ), ], ) + delay = SECONDS_DELAY + new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR + p_get_task_payload.assert_has_calls( + [ + call( + trigger_type='onetime', + trigger_data={ + 'date': (_now + timedelta(seconds=delay)).isoformat(), + }, + method_payload={ + 'method': 'process_upload', + 'description': 'This task will download the report from' + ' connect and published it in the respective storage.', + 'parameter': { + 'installation_id': 'EIN-8436-7221-8308', + 'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000', + }, + 'name': 'Process Uploads - PA-000-000', + }, + ), + call( + trigger_type='onetime', + trigger_data={ + 'date': (_now + timedelta(seconds=new_delay)).isoformat(), + }, + method_payload={ + 'method': 'process_upload', + 'description': 'This task will download the report from' + ' connect and published it in the respective storage.', + 'parameter': { + 'installation_id': 'EIN-8436-7221-8308', + 'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000', + }, + 'name': 'Process Uploads - PA-000-000', + }, + ), + ], + ) for idx, zipped in enumerate(zip(uploads, [feed1, feed2])): upload, feed = zipped assert result.status == 'success' @@ -298,6 +352,8 @@ def test_create_upload_schedule_task( f' created for Upload `{uploads[1].id}`: ' f'Will process Report File `{report_file[1]["id"]}`' ) + assert delay == 120 + assert new_delay == 240 def test_create_upload_schedule_task_no_feeds(