Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connect_bi_reporter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Delay in seconds for schedule to process Upload task
SECONDS_DELAY = 120
# Backoff factor in seconds between Upload tasks creation
SECONDS_BACKOFF_FACTOR = 10
SECONDS_BACKOFF_FACTOR = 120
CREATE_UPLOADS_METHOD_NAME = 'create_uploads'
PROCESS_UPLOADS_METHOD_NAME = 'process_upload'
PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = {
Expand Down
3 changes: 2 additions & 1 deletion connect_bi_reporter/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from logging import Logger
import enum
from logging import Logger
from typing import Any, Dict, Optional

from connect.client import ClientError, ConnectClient
Expand All @@ -23,6 +23,7 @@ class TriggerTypeEnum(str, enum.Enum):
class ResponseTypeEnum(str, enum.Enum):
SUCCESS = 'done'
ERROR = 'reschedule'
FAIL = 'fail'


class TriggerType:
Expand Down
3 changes: 2 additions & 1 deletion connect_bi_reporter/uploads/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
import copy

from sqlalchemy import util
from connect.client import ClientError
Expand Down Expand Up @@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds):


def get_process_upload_task_payload(installation_id, upload_id, account_id):
payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD
payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD)
payload.update({'name': f'Process Uploads - {account_id}'})
parameters = {
'installation_id': installation_id,
Expand Down
28 changes: 21 additions & 7 deletions connect_bi_reporter/uploads/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
from datetime import datetime
import time
from zipfile import ZipFile

from connect.client import ClientError
Expand All @@ -18,7 +19,7 @@
from connect_bi_reporter.uploads.models import Upload
from connect_bi_reporter.uploads.services import create_process_upload_tasks, create_uploads
from connect_bi_reporter.uploads.storage_utils import upload_file
from connect_bi_reporter.scheduler import Scheduler
from connect_bi_reporter.scheduler import ResponseTypeEnum, Scheduler


class UploadTaskApplicationMixin:
Expand Down Expand Up @@ -78,6 +79,7 @@ def process_upload(self, schedule):
if 'installation_id' not in schedule['parameter']:
return ScheduledExecutionResponse.fail(output='Parameter installation_id is missing.')

begin_time = time.monotonic()
instalation_client = self.get_installation_admin_client(
schedule['parameter']['installation_id'],
)
Expand All @@ -90,15 +92,16 @@ def process_upload(self, schedule):
if not upload:
return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.')

if upload.status != 'pending':
if upload.status != Upload.STATUSES.pending:
return ScheduledExecutionResponse.fail(
output=f'Cannot process upload in status `{upload.status}`.',
)

upload.status = 'processing'
upload.status = Upload.STATUSES.processing
db.add(upload)
db.commit()

execution_method_result = ResponseTypeEnum.SUCCESS
try:
report_data = download_report(instalation_client, upload.report_id)

Expand All @@ -112,14 +115,25 @@ def process_upload(self, schedule):
)
upload.size = uploaded_file_props.get('size', 0)

upload.status = 'uploaded'
upload.status = Upload.STATUSES.uploaded
upload.name = file_name
db.add(upload)
db.commit()
return ScheduledExecutionResponse.done()
except Exception:
self.logger.exception(msg='Error processing upload')
upload.status = 'failed'
upload.status = Upload.STATUSES.failed
db.add(upload)
db.commit()
return ScheduledExecutionResponse.fail()
execution_method_result = ResponseTypeEnum.FAIL

took = time.monotonic() - begin_time
self.logger.info(
'Execution of `process_upload` task for Upload {0} finished (took "{1}"): '
'Upload status: `{2}`, Taks result: `{3}`.'.format(
upload.id,
took,
upload.status,
execution_method_result,
),
)
return getattr(ScheduledExecutionResponse, execution_method_result)()
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
testpaths = "tests"
addopts = "--cov=connect_bi_reporter --cov-report=term-missing --cov-report=html --cov-report=xml"
filterwarnings = [
"ignore::sqlalchemy.exc.SADeprecationWarning"
]

[tool.coverage.run]
relative_files = true
Expand Down
56 changes: 56 additions & 0 deletions tests/uploads/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, timezone
import re
from unittest.mock import call

Expand All @@ -7,6 +8,7 @@
from connect.eaas.core.inject.models import Context
from sqlalchemy.exc import DBAPIError

from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY
from connect_bi_reporter.events import ConnectBiReporterEventsApplication


Expand All @@ -16,6 +18,8 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
logger,
config={},
)
p_time = mocker.patch('connect_bi_reporter.uploads.tasks.time')
p_time.monotonic.side_effect = [10, 12]
ext.get_installation_admin_client = lambda self: connect_client

with open('./tests/uploads/test-zip.zip', 'rb') as zf:
Expand All @@ -41,6 +45,10 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
assert re.match(feed.file_name + '_\\d{8} \\d{2}:\\d{2}:\\d{2}.csv', upload.name)
assert upload.size == 1024
assert upload.status == upload.STATUSES.uploaded
assert logger.method_calls[0].args[0] == (
f'Execution of `process_upload` task for Upload {upload.id} '
f'finished (took "2"): Upload status: `uploaded`, Taks result: `done`.'
)


def test_process_upload_report_download_failed(
Expand Down Expand Up @@ -229,6 +237,11 @@ def test_create_upload_schedule_task(
),
)
ext.get_installation_admin_client = lambda self: connect_client

_now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc)
p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime')
p_datetime.utcnow = lambda: _now

mocker.patch(
'connect_bi_reporter.uploads.tasks.get_extension_owner_client',
return_value=connect_client,
Expand All @@ -245,6 +258,9 @@ def test_create_upload_schedule_task(
'connect_bi_reporter.scheduler.create_schedule_task',
return_value=eaas_schedule_task,
)
p_get_task_payload = mocker.patch(
'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload',
)
feed1 = feed_factory(
schedule_id=report_schedule['id'],
account_id=installation['owner']['id'],
Expand Down Expand Up @@ -274,6 +290,44 @@ def test_create_upload_schedule_task(
),
],
)
delay = SECONDS_DELAY
new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR
p_get_task_payload.assert_has_calls(
[
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=new_delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
],
)
for idx, zipped in enumerate(zip(uploads, [feed1, feed2])):
upload, feed = zipped
assert result.status == 'success'
Expand All @@ -298,6 +352,8 @@ def test_create_upload_schedule_task(
f' created for Upload `{uploads[1].id}`: '
f'Will process Report File `{report_file[1]["id"]}`'
)
assert delay == 120
assert new_delay == 240


def test_create_upload_schedule_task_no_feeds(
Expand Down
Loading