diff --git a/connect/eaas/runner/managers/transformation.py b/connect/eaas/runner/managers/transformation.py index 1b5fe89..a7d84fd 100644 --- a/connect/eaas/runner/managers/transformation.py +++ b/connect/eaas/runner/managers/transformation.py @@ -1,4 +1,5 @@ import asyncio +import functools import inspect import json import logging @@ -56,9 +57,6 @@ from connect.eaas.runner.managers.base import ( TasksManagerBase, ) -from connect.eaas.runner.managers.utils import ( - ResultStore, -) logger = logging.getLogger(__name__) @@ -190,7 +188,6 @@ async def _fail_task(self, task_data, message): async def process_transformation(self, task_data, tfn_request, method): extension_logger = self.get_extension_logger(task_data) - semaphore = asyncio.Semaphore(TRANSFORMATION_TASK_MAX_PARALLEL_LINES) input_file = await asyncio.get_running_loop().run_in_executor( self.executor, self.download_excel, @@ -203,7 +200,7 @@ async def process_transformation(self, task_data, tfn_request, method): ) read_queue = asyncio.Queue(TRANSFORMATION_TASK_MAX_PARALLEL_LINES) - result_store = ResultStore() + write_queue = asyncio.Queue() loop = asyncio.get_event_loop() @@ -220,7 +217,7 @@ async def process_transformation(self, task_data, tfn_request, method): self.executor, self.write_excel, output_file.name, - result_store, + write_queue, tfn_request['stats']['rows']['total'], tfn_request['transformation']['columns']['output'], task_data, @@ -228,9 +225,8 @@ async def process_transformation(self, task_data, tfn_request, method): loop, ) processor_task = asyncio.create_task(self.process_rows( - semaphore, read_queue, - result_store, + write_queue, method, tfn_request, extension_logger, @@ -255,7 +251,7 @@ async def process_transformation(self, task_data, tfn_request, method): await client.conversations[task_data.input.object_id].messages.create( payload={ 'type': 'message', - 'text': f'Transformation request processing failed: {str(e)}', + 'text': f'Transformation request processing failed: {str(e) or "timed out"}', }, ) return TransformationResponse.fail(output=str(e)) @@ -322,10 +318,11 @@ def read_excel(self, tfn_request, filename, queue, logger, loop): style.number_format = col_value.number_format row_styles[lookup_columns[col_idx]] = style - asyncio.run_coroutine_threadsafe( + future = asyncio.run_coroutine_threadsafe( queue.put((idx, row_data, row_styles)), loop, ) + future.result() if idx % delta == 0 or idx == total_rows: logger.info( f'Input file read progress for {tfn_request["id"]}:' @@ -335,112 +332,82 @@ def read_excel(self, tfn_request, filename, queue, logger, loop): logger.info(f'Input file read complete for {tfn_request["id"]}') wb.close() - async def process_rows(self, semaphore, read_queue, result_store, method, tfn_request, logger): + async def process_rows(self, read_queue, result_store, method, tfn_request, logger): rows_processed = 0 tasks = [] total_rows = tfn_request['stats']['rows']['total'] delta = 1 if total_rows <= 10 else round(total_rows / 10) while rows_processed < total_rows: - await semaphore.acquire() row_idx, row, row_styles = await read_queue.get() - if inspect.iscoroutinefunction(method): - tasks.append( - asyncio.create_task( - asyncio.wait_for( - self.async_process_row( - semaphore, - method, - row_idx, - row, - row_styles, - result_store, - ), - self.config.get_timeout('row_transformation'), - ), - ), - ) - else: - loop = asyncio.get_running_loop() - tasks.append( - asyncio.create_task( - asyncio.wait_for( - loop.run_in_executor( - self.executor, - self.sync_process_row, - semaphore, - method, - row_idx, - row, - row_styles, - result_store, - loop, - ), - self.config.get_timeout('row_transformation'), - ), + tasks.append( + asyncio.create_task( + self.transform_row( + method, + row_idx, + row, + row_styles, ), - ) + ), + ) rows_processed += 1 + if rows_processed % TRANSFORMATION_TASK_MAX_PARALLEL_LINES == 0: + group = asyncio.gather(*tasks) + try: + results = await group + for result in results: + await result_store.put(result) + except Exception as e: + logger.exception('Error during applying transformations.') + group.cancel() + raise e + tasks = [] if rows_processed % delta == 0 or rows_processed == total_rows: logger.info( f'Starting transformation tasks for {tfn_request["id"]}:' f' {rows_processed}/{total_rows} started', ) + group = asyncio.gather(*tasks) try: - logger.debug('gathering transformation tasks...') - await asyncio.gather(*tasks) + results = await group + for result in results: + await result_store.put(result) except Exception as e: logger.exception('Error during applying transformations.') - for task in tasks: - task.cancel() + group.cancel() raise e - async def async_process_row(self, semaphore, method, row_idx, row, row_styles, result_store): + async def transform_row(self, method, row_idx, row, row_styles): try: if ROW_DELETED_MARKER in list(row.values()): - await result_store.put(row_idx, RowTransformationResponse.delete()) - return + # await result_store.put((row_idx, RowTransformationResponse.delete())) + return RowTransformationResponse.delete() kwargs = {} if 'row_styles' in inspect.signature(method).parameters: kwargs['row_styles'] = row_styles - response = await method(row, **kwargs) - if not isinstance(response, RowTransformationResponse): - raise RowTransformationError(f'invalid row tranformation response: {response}') - if response.status == ResultType.FAIL: - raise RowTransformationError(f'row transformation failed: {response.output}') - await result_store.put(row_idx, response) - except Exception as e: - raise RowTransformationError( - f'Error applying transformation function {method.__name__} ' - f'to row #{row_idx}: {str(e)}.', - ) from e - finally: - semaphore.release() - - def sync_process_row(self, semaphore, method, row_idx, row, row_styles, result_store, loop): - try: - if ROW_DELETED_MARKER in list(row.values()): - asyncio.run_coroutine_threadsafe( - result_store.put(row_idx, RowTransformationResponse.delete()), loop, + if inspect.iscoroutinefunction(method): + awaitable = method(row, **kwargs) + else: + loop = asyncio.get_running_loop() + awaitable = loop.run_in_executor( + self.executor, + functools.partial(method, row, **kwargs), ) - return - kwargs = {} - if 'row_styles' in inspect.signature(method).parameters: - kwargs['row_styles'] = row_styles - response = method(row, **kwargs) + response = await asyncio.wait_for( + awaitable, + timeout=self.config.get_timeout('row_transformation'), + ) if not isinstance(response, RowTransformationResponse): raise RowTransformationError(f'invalid row tranformation response: {response}') if response.status == ResultType.FAIL: raise RowTransformationError(f'row transformation failed: {response.output}') - asyncio.run_coroutine_threadsafe(result_store.put(row_idx, response), loop) + return response except Exception as e: raise RowTransformationError( f'Error applying transformation function {method.__name__} ' f'to row #{row_idx}: {str(e)}.', ) from e - finally: - semaphore.release() def write_excel( self, filename, result_store, total_rows, output_columns, task_data, logger, loop, @@ -467,9 +434,9 @@ def write_excel( REPORT_EVERY_ROW_MAX, ) - for idx in range(2, total_rows + 2): + for _ in range(2, total_rows + 2): future = asyncio.run_coroutine_threadsafe( - result_store.get(idx), + result_store.get(), loop, ) response = future.result( diff --git a/connect/eaas/runner/managers/utils.py b/connect/eaas/runner/managers/utils.py deleted file mode 100644 index 1f0b4d8..0000000 --- a/connect/eaas/runner/managers/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio - - -class ResultStore: - def __init__(self): - self.lock = asyncio.Lock() - self.futures = {} - - async def put(self, idx, data): - async with self.lock: - future = self.futures.setdefault(idx, asyncio.Future()) - future.set_result(data) - - async def get(self, idx): - async with self.lock: - future = self.futures.setdefault(idx, asyncio.Future()) - data = await future - async with self.lock: - del self.futures[idx] - return data diff --git a/poetry.lock b/poetry.lock index 85a3201..a7cc2c2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "anvil-uplink" @@ -19,24 +19,24 @@ ws4py = "*" [[package]] name = "anyio" -version = "3.7.1" +version = "4.0.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "anyio-3.7.1-py3-none-any.whl", hash = "sha256:91dee416e570e92c64041bd18b900d1d6fa78dff7048769ce5ac5ddad004fbb5"}, - {file = "anyio-3.7.1.tar.gz", hash = "sha256:44a3c9aba0f5defa43261a8b3efb97891f2bd7d804e0e1f56419befa1adfc780"}, + {file = "anyio-4.0.0-py3-none-any.whl", hash = "sha256:cfdb2b588b9fc25ede96d8db56ed50848b0b649dca3dd1df0b11f683bb9e0b5f"}, + {file = "anyio-4.0.0.tar.gz", hash = "sha256:f7ed51751b2c2add651e5747c891b47e26d2a21be5d32d9311dfe9692f3e5d7a"}, ] [package.dependencies] -exceptiongroup = {version = "*", markers = "python_version < \"3.11\""} +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} idna = ">=2.8" sniffio = ">=1.1" [package.extras] -doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme (>=1.2.2)", "sphinxcontrib-jquery"] -test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] -trio = ["trio (<0.22)"] +doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] +trio = ["trio (>=0.22)"] [[package]] name = "argparse" @@ -68,30 +68,30 @@ tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"] [[package]] name = "asttokens" -version = "2.2.1" +version = "2.4.0" description = "Annotate AST trees with source code positions" optional = false python-versions = "*" files = [ - {file = "asttokens-2.2.1-py2.py3-none-any.whl", hash = "sha256:6b0ac9e93fb0335014d382b8fa9b3afa7df546984258005da0b9e7095b3deb1c"}, - {file = "asttokens-2.2.1.tar.gz", hash = "sha256:4622110b2a6f30b77e1473affaa97e711bc2f07d3f10848420ff1898edbe94f3"}, + {file = "asttokens-2.4.0-py2.py3-none-any.whl", hash = "sha256:cf8fc9e61a86461aa9fb161a14a0841a03c405fa829ac6b202670b3495d2ce69"}, + {file = "asttokens-2.4.0.tar.gz", hash = "sha256:2e0171b991b2c959acc6c49318049236844a5da1d65ba2672c4880c1c894834e"}, ] [package.dependencies] -six = "*" +six = ">=1.12.0" [package.extras] test = ["astroid", "pytest"] [[package]] name = "async-timeout" -version = "4.0.2" +version = "4.0.3" description = "Timeout context manager for asyncio programs" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" files = [ - {file = "async-timeout-4.0.2.tar.gz", hash = "sha256:2163e1640ddb52b7a8c80d0a67a08587e5d245cc9c553a74a847056bc2976b15"}, - {file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"}, + {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, + {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, ] [[package]] @@ -258,13 +258,13 @@ test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"] [[package]] name = "connect-eaas-core" -version = "28.12" +version = "28.14" description = "Connect Eaas Core" optional = false python-versions = ">=3.8,<4" files = [ - {file = "connect_eaas_core-28.12-py3-none-any.whl", hash = "sha256:70e6b6a5612c503e28a5db6510b71bd1dde43ed4084776dc2dfb0f186f5e071f"}, - {file = "connect_eaas_core-28.12.tar.gz", hash = "sha256:0f3ac2f840bc183cb44cb357881798aecfb4980bc9c8633f8986de7183c04898"}, + {file = "connect_eaas_core-28.14-py3-none-any.whl", hash = "sha256:70fbe68d3fe0f96d8ccb52d02b3c09dd6cc77039da5a0f5ba7acdf2c7e53535b"}, + {file = "connect_eaas_core-28.14.tar.gz", hash = "sha256:664a3ca516820565b4bad59131089b01eba27199baca3ad98a1914c46b34428b"}, ] [package.dependencies] @@ -296,13 +296,13 @@ rich = ">=12.4.4,<13" [[package]] name = "connect-openapi-client" -version = "28.0" +version = "28.1" description = "Connect Python OpenAPI Client" optional = false python-versions = ">=3.8,<4" files = [ - {file = "connect_openapi_client-28.0-py3-none-any.whl", hash = "sha256:15cdfefd2fe18311e3840c1e8ab01835475ed2582110fcec06a965872a129554"}, - {file = "connect_openapi_client-28.0.tar.gz", hash = "sha256:ed82c3755860e2f5fd47ede8cc353dac1d3d91f699d65926a157937fdc8b4219"}, + {file = "connect_openapi_client-28.1-py3-none-any.whl", hash = "sha256:111cd62268d94fb40db244e9af8e631762f41092387b77aa6af5aeb1f67e711e"}, + {file = "connect_openapi_client-28.1.tar.gz", hash = "sha256:8a8bc2b8fda4f4c2476e4f20e42eea0b7be476e3d8d8a94badd97b560d0384cf"}, ] [package.dependencies] @@ -415,13 +415,13 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.1.2" +version = "1.1.3" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" files = [ - {file = "exceptiongroup-1.1.2-py3-none-any.whl", hash = "sha256:e346e69d186172ca7cf029c8c1d16235aa0e04035e5750b4b95039e65204328f"}, - {file = "exceptiongroup-1.1.2.tar.gz", hash = "sha256:12c3e887d6485d16943a309616de20ae5582633e0a2eda17f4e10fd61c1e8af5"}, + {file = "exceptiongroup-1.1.3-py3-none-any.whl", hash = "sha256:343280667a4585d195ca1cf9cef84a4e178c4b6cf2274caef9859782b567d5e3"}, + {file = "exceptiongroup-1.1.3.tar.gz", hash = "sha256:097acd85d473d75af5bb98e41b61ff7fe35efe6675e4f9370ec6ec5126d160e9"}, ] [package.extras] @@ -570,13 +570,12 @@ flake8 = "*" [[package]] name = "flake8-isort" -version = "6.0.0" +version = "6.1.0" description = "flake8 plugin that integrates isort ." optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "flake8-isort-6.0.0.tar.gz", hash = "sha256:537f453a660d7e903f602ecfa36136b140de279df58d02eb1b6a0c84e83c528c"}, - {file = "flake8_isort-6.0.0-py3-none-any.whl", hash = "sha256:aa0cac02a62c7739e370ce6b9c31743edac904bae4b157274511fc8a19c75bbc"}, + {file = "flake8-isort-6.1.0.tar.gz", hash = "sha256:d4639343bac540194c59fb1618ac2c285b3e27609f353bef6f50904d40c1643e"}, ] [package.dependencies] @@ -650,6 +649,7 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, + {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -658,6 +658,7 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, + {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -687,6 +688,7 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, + {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -695,6 +697,7 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, + {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -1037,13 +1040,13 @@ files = [ [[package]] name = "pluggy" -version = "1.2.0" +version = "1.3.0" description = "plugin and hook calling mechanisms for python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pluggy-1.2.0-py3-none-any.whl", hash = "sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"}, - {file = "pluggy-1.2.0.tar.gz", hash = "sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"}, + {file = "pluggy-1.3.0-py3-none-any.whl", hash = "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7"}, + {file = "pluggy-1.3.0.tar.gz", hash = "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12"}, ] [package.extras] @@ -1137,13 +1140,13 @@ files = [ [[package]] name = "pygments" -version = "2.15.1" +version = "2.16.1" description = "Pygments is a syntax highlighting package written in Python." optional = false python-versions = ">=3.7" files = [ - {file = "Pygments-2.15.1-py3-none-any.whl", hash = "sha256:db2db3deb4b4179f399a09054b023b6a586b76499d36965813c71aa8ed7b5fd1"}, - {file = "Pygments-2.15.1.tar.gz", hash = "sha256:8ace4d3c1dd481894b2005f560ead0f9f19ee64fe983366be1a21e171d12775c"}, + {file = "Pygments-2.16.1-py3-none-any.whl", hash = "sha256:13fc09fa63bc8d8671a6d247e1eb303c4b343eaee81d861f3404db2935653692"}, + {file = "Pygments-2.16.1.tar.gz", hash = "sha256:1daff0494820c69bc8941e407aa20f577374ee88364ee10a98fdbe0aece96e29"}, ] [package.extras] @@ -1151,13 +1154,13 @@ plugins = ["importlib-metadata"] [[package]] name = "pytest" -version = "7.4.0" +version = "7.4.2" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.7" files = [ - {file = "pytest-7.4.0-py3-none-any.whl", hash = "sha256:78bf16451a2eb8c7a2ea98e32dc119fd2aa758f1d5d66dbf0a59d69a3969df32"}, - {file = "pytest-7.4.0.tar.gz", hash = "sha256:b4bf8c45bd59934ed84001ad51e11b4ee40d40a1229d2c79f9c592b0a3f6bd8a"}, + {file = "pytest-7.4.2-py3-none-any.whl", hash = "sha256:1d881c6124e08ff0a1bb75ba3ec0bfd8b5354a01c194ddd5a0a870a48d99b002"}, + {file = "pytest-7.4.2.tar.gz", hash = "sha256:a766259cfab564a2ad52cb1aae1b881a75c3eb7e34ca3779697c23ed47c47069"}, ] [package.dependencies] @@ -1244,13 +1247,13 @@ dev = ["pre-commit", "pytest-asyncio", "tox"] [[package]] name = "pytest-randomly" -version = "3.13.0" +version = "3.15.0" description = "Pytest plugin to randomly order tests and control random.seed." optional = false python-versions = ">=3.8" files = [ - {file = "pytest_randomly-3.13.0-py3-none-any.whl", hash = "sha256:e78d898ef4066f89744e5075083aa7fb6f0de07ffd70ca9c4435cda590cf1eac"}, - {file = "pytest_randomly-3.13.0.tar.gz", hash = "sha256:079c78b94693189879fbd7304de4e147304f0811fa96249ea5619f2f1cd33df0"}, + {file = "pytest_randomly-3.15.0-py3-none-any.whl", hash = "sha256:0516f4344b29f4e9cdae8bce31c4aeebf59d0b9ef05927c33354ff3859eeeca6"}, + {file = "pytest_randomly-3.15.0.tar.gz", hash = "sha256:b908529648667ba5e54723088edd6f82252f540cc340d748d1fa985539687047"}, ] [package.dependencies] @@ -1273,20 +1276,20 @@ six = ">=1.5" [[package]] name = "python-socks" -version = "2.3.0" +version = "2.4.2" description = "Core proxy (SOCKS4, SOCKS5, HTTP tunneling) functionality for Python" optional = false python-versions = "*" files = [ - {file = "python-socks-2.3.0.tar.gz", hash = "sha256:2002f20f32366861bec90e6b520e4696671587b3dd56e476477e5a07ec49a094"}, - {file = "python_socks-2.3.0-py3-none-any.whl", hash = "sha256:eb385a33bcc3909401aa29fc0a277cdd0c2ce586a99b61be1ddd1a4939f23061"}, + {file = "python-socks-2.4.2.tar.gz", hash = "sha256:96c096c46470a52f088ff1a6227a5141116955c9081080b06d4462391467713f"}, + {file = "python_socks-2.4.2-py3-none-any.whl", hash = "sha256:e3a47fd8debf61695040465fd8de4099e18a83e43b5818ca0ff9289a1f0bd7f8"}, ] [package.dependencies] async-timeout = {version = ">=3.0.1", optional = true, markers = "extra == \"asyncio\""} [package.extras] -anyio = ["anyio (>=3.3.4)"] +anyio = ["anyio (>=3.3.4,<4.0.0)"] asyncio = ["async-timeout (>=3.0.1)"] curio = ["curio (>=1.4)"] trio = ["trio (>=0.16.0)"] @@ -1303,6 +1306,7 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, + {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -1310,8 +1314,15 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, + {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, + {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, + {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, + {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -1328,6 +1339,7 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, + {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -1335,6 +1347,7 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, + {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, @@ -1363,20 +1376,20 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] [[package]] name = "responses" -version = "0.23.1" +version = "0.23.3" description = "A utility library for mocking out the `requests` Python library." optional = false python-versions = ">=3.7" files = [ - {file = "responses-0.23.1-py3-none-any.whl", hash = "sha256:8a3a5915713483bf353b6f4079ba8b2a29029d1d1090a503c70b0dc5d9d0c7bd"}, - {file = "responses-0.23.1.tar.gz", hash = "sha256:c4d9aa9fc888188f0c673eff79a8dadbe2e75b7fe879dc80a221a06e0a68138f"}, + {file = "responses-0.23.3-py3-none-any.whl", hash = "sha256:e6fbcf5d82172fecc0aa1860fd91e58cbfd96cee5e96da5b63fa6eb3caa10dd3"}, + {file = "responses-0.23.3.tar.gz", hash = "sha256:205029e1cb334c21cb4ec64fc7599be48b859a0fd381a42443cdd600bfe8b16a"}, ] [package.dependencies] pyyaml = "*" -requests = ">=2.22.0,<3.0" +requests = ">=2.30.0,<3.0" types-PyYAML = "*" -urllib3 = ">=1.25.10" +urllib3 = ">=1.25.10,<3.0" [package.extras] tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-requests"] @@ -1402,19 +1415,19 @@ jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"] [[package]] name = "setuptools" -version = "68.0.0" +version = "68.2.2" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "setuptools-68.0.0-py3-none-any.whl", hash = "sha256:11e52c67415a381d10d6b462ced9cfb97066179f0e871399e006c4ab101fc85f"}, - {file = "setuptools-68.0.0.tar.gz", hash = "sha256:baf1fdb41c6da4cd2eae722e135500da913332ab3f2f5c7d33af9b492acb5235"}, + {file = "setuptools-68.2.2-py3-none-any.whl", hash = "sha256:b454a35605876da60632df1a60f736524eb73cc47bbc9f3f1ef1b644de74fd2a"}, + {file = "setuptools-68.2.2.tar.gz", hash = "sha256:4ac1475276d2f1c48684874089fefcd83bd7162ddaafb81fac866ba0db282a87"}, ] [package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] -testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.1)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] [[package]] name = "six" @@ -1486,7 +1499,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"} +greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} [package.extras] aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] @@ -1771,4 +1784,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "419c44e6f811bf6793e49de8e9cf7e30615ae626fb2343fa178e4524d11a549b" +content-hash = "55c426b5577eaec71da51d10dc89418040862ceda75645a4903d17103c6afbc3" diff --git a/pyproject.toml b/pyproject.toml index 91e4db7..a698b02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ cextrun = 'connect.eaas.runner.main:main' [tool.poetry.dependencies] python = ">=3.8,<4" websockets = "10.*" -connect-openapi-client = ">=28.0" +connect-openapi-client = ">=28.1" logzio-python-handler = "^3.1.1" backoff = "^2.2.1" connect-eaas-core = ">=28.12,<29" diff --git a/tests/conftest.py b/tests/conftest.py index 42d737d..06b4da5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -165,3 +165,23 @@ def _task_payload( def responses(): with sentry_responses.RequestsMock() as rsps: yield rsps + + +@pytest.fixture +def default_env(mocker, unused_port): + mocker.patch( + 'connect.eaas.runner.config.get_environment', + return_value={ + 'ws_address': f'127.0.0.1:{unused_port}', + 'api_address': f'127.0.0.1:{unused_port}', + 'api_key': 'SU-000:XXXX', + 'environment_id': 'ENV-000-0001', + 'instance_id': 'INS-000-0002', + 'background_task_max_execution_time': 300, + 'interactive_task_max_execution_time': 120, + 'scheduled_task_max_execution_time': 43200, + 'transformation_task_max_execution_time': 300, + 'transformation_write_queue_timeout': 600, + 'row_transformation_task_max_execution_time': 60, + }, + ) diff --git a/tests/managers/test_transformation.py b/tests/managers/test_transformation.py index a9e5baa..9560223 100644 --- a/tests/managers/test_transformation.py +++ b/tests/managers/test_transformation.py @@ -40,28 +40,18 @@ @pytest.mark.parametrize('task_type_prefix', ('billing', 'pricing')) +@pytest.mark.parametrize('max_parallel_lines', (1, 20)) @pytest.mark.flaky(max_runs=3, min_passes=1) @pytest.mark.asyncio async def test_submit( - mocker, tfn_settings_payload, responses, httpx_mock, unused_port, task_type_prefix, + mocker, default_env, tfn_settings_payload, responses, + httpx_mock, unused_port, task_type_prefix, + max_parallel_lines, ): mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - 'transformation_write_queue_timeout': 600, - 'row_transformation_task_max_execution_time': 60, - }, + 'connect.eaas.runner.managers.transformation.TRANSFORMATION_TASK_MAX_PARALLEL_LINES', + max_parallel_lines, ) - api_address = f'https://127.0.0.1:{unused_port}' api_url = f'{api_address}/public/v1' mocker.patch.object(ConfigHelper, 'get_api_url', return_value=api_url) @@ -191,7 +181,7 @@ def transform_it(self, row): ) await manager.submit(task) - await asyncio.sleep(1) + await asyncio.sleep(.5) task.output = TaskOutput( result=ResultType.SUCCESS, @@ -208,28 +198,18 @@ def transform_it(self, row): @pytest.mark.parametrize('task_type_prefix', ('billing', 'pricing')) +@pytest.mark.parametrize('max_parallel_lines', (1, 20)) @pytest.mark.flaky(max_runs=3, min_passes=1) @pytest.mark.asyncio async def test_submit_with_error_in_tfn_function( - mocker, tfn_settings_payload, responses, httpx_mock, unused_port, task_type_prefix, + mocker, default_env, tfn_settings_payload, responses, + httpx_mock, unused_port, task_type_prefix, + max_parallel_lines, ): mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - 'transformation_write_queue_timeout': 0.2, - 'row_transformation_task_max_execution_time': 60, - }, + 'connect.eaas.runner.managers.transformation.TRANSFORMATION_TASK_MAX_PARALLEL_LINES', + max_parallel_lines, ) - api_address = f'https://127.0.0.1:{unused_port}' api_url = f'{api_address}/public/v1' mocker.patch.object(ConfigHelper, 'get_api_url', return_value=api_url) @@ -329,13 +309,13 @@ async def transform_it(self, row): }) mocker.patch.object(TfnApp, 'load_application', return_value=MyExtension) - mocker.patch('connect.eaas.runner.managers.transformation.Workbook') mocked_time = mocker.patch('connect.eaas.runner.managers.transformation.time') - mocked_time.sleep = time.sleep mocked_time.monotonic.side_effect = (1.0, 2.0) handler = TfnApp(config) result_queue = asyncio.Queue() + mocker.patch.object(TransformationTasksManager, 'send_stat_update') + mocker.patch.object(TransformationTasksManager, 'write_excel') manager = TransformationTasksManager(config, handler, result_queue.put) task = Task( @@ -352,7 +332,7 @@ async def transform_it(self, row): ) await manager.submit(task) - await asyncio.sleep(1) + await asyncio.sleep(.5) task.output = TaskOutput( result=ResultType.FAIL, @@ -368,25 +348,9 @@ async def transform_it(self, row): @pytest.mark.flaky(max_runs=3, min_passes=1) @pytest.mark.asyncio async def test_submit_with_error_in_tfn_function_sync( - mocker, tfn_settings_payload, responses, httpx_mock, unused_port, task_type_prefix, + mocker, default_env, tfn_settings_payload, responses, + httpx_mock, unused_port, task_type_prefix, ): - mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - 'transformation_write_queue_timeout': 0.2, - 'row_transformation_task_max_execution_time': 60, - }, - ) - api_address = f'https://127.0.0.1:{unused_port}' api_url = f'{api_address}/public/v1' mocker.patch.object(ConfigHelper, 'get_api_url', return_value=api_url) @@ -486,9 +450,9 @@ def transform_it(self, row): }) mocker.patch.object(TfnApp, 'load_application', return_value=MyExtension) - mocker.patch('connect.eaas.runner.managers.transformation.Workbook') + mocker.patch.object(TransformationTasksManager, 'send_stat_update') + mocker.patch.object(TransformationTasksManager, 'write_excel') mocked_time = mocker.patch('connect.eaas.runner.managers.transformation.time') - mocked_time.sleep = time.sleep mocked_time.monotonic.side_effect = (1.0, 2.0) handler = TfnApp(config) @@ -509,7 +473,7 @@ def transform_it(self, row): ) await manager.submit(task) - await asyncio.sleep(1) + await asyncio.sleep(.5) task.output = TaskOutput( result=ResultType.FAIL, @@ -524,24 +488,8 @@ def transform_it(self, row): @pytest.mark.parametrize('task_type_prefix', ('billing', 'pricing')) @pytest.mark.asyncio async def test_build_response_exception( - mocker, task_payload, httpx_mock, unused_port, task_type_prefix, + mocker, default_env, task_payload, httpx_mock, unused_port, task_type_prefix, ): - mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - 'transformation_write_queue_timeout': 0.2, - 'row_transformation_task_max_execution_time': 60, - }, - ) api_address = f'https://127.0.0.1:{unused_port}' api_url = f'{api_address}/public/v1' httpx_mock.add_response( @@ -579,24 +527,8 @@ async def test_build_response_exception( @pytest.mark.parametrize('task_type_prefix', ('billing', 'pricing')) @pytest.mark.asyncio async def test_build_response_exception_fail_failing_trans_req( - mocker, task_payload, httpx_mock, unused_port, caplog, task_type_prefix, + mocker, default_env, task_payload, httpx_mock, unused_port, caplog, task_type_prefix, ): - mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - 'transformation_write_queue_timeout': 0.2, - 'row_transformation_task_max_execution_time': 60, - }, - ) api_address = f'https://127.0.0.1:{unused_port}' api_url = f'{api_address}/public/v1' httpx_mock.add_response( @@ -636,23 +568,8 @@ async def test_build_response_exception_fail_failing_trans_req( @pytest.mark.parametrize('task_type_prefix', ('billing', 'pricing')) @pytest.mark.asyncio async def test_send_skip_response( - mocker, task_payload, unused_port, tfn_settings_payload, httpx_mock, task_type_prefix, + mocker, default_env, unused_port, tfn_settings_payload, httpx_mock, task_type_prefix, ): - mocker.patch( - 'connect.eaas.runner.config.get_environment', - return_value={ - 'ws_address': f'127.0.0.1:{unused_port}', - 'api_address': f'127.0.0.1:{unused_port}', - 'api_key': 'SU-000:XXXX', - 'environment_id': 'ENV-000-0001', - 'instance_id': 'INS-000-0002', - 'background_task_max_execution_time': 300, - 'interactive_task_max_execution_time': 120, - 'scheduled_task_max_execution_time': 43200, - 'transformation_task_max_execution_time': 300, - }, - ) - api_url = f'https://127.0.0.1:{unused_port}/public/v1' mocker.patch.object(ConfigHelper, 'get_api_url', return_value=api_url) @@ -685,7 +602,6 @@ def transform_it(self, row): mocker.patch.object(TfnApp, 'load_application', return_value=MyExtension) mocked_time = mocker.patch('connect.eaas.runner.managers.transformation.time') - mocked_time.sleep = time.sleep mocked_time.monotonic.side_effect = (1.0, 2.0) handler = TfnApp(config) @@ -705,7 +621,7 @@ def transform_it(self, row): ) await manager.submit(task) - await asyncio.sleep(1) + await asyncio.sleep(.5) task.output = TaskOutput( result=ResultType.SKIP, @@ -718,48 +634,44 @@ def transform_it(self, row): @pytest.mark.asyncio -async def test_async_process_row_invalid_response(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) +async def test_transform_row_invalid_response(mocker, default_env): + manager = TransformationTasksManager(ConfigHelper(), mocker.MagicMock(), mocker.MagicMock()) async def tfn(row): return 33 with pytest.raises(RowTransformationError) as cv: - await manager.async_process_row( - mocker.MagicMock(), + await manager.transform_row( tfn, 3, {'row': 'data'}, {}, - mocker.MagicMock(), ) assert str(cv.value).endswith('invalid row tranformation response: 33.') @pytest.mark.asyncio -async def test_async_process_row_fail_response(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) +async def test_transform_row_fail_response(mocker, default_env): + manager = TransformationTasksManager(ConfigHelper(), mocker.MagicMock(), mocker.MagicMock()) async def tfn(row): return RowTransformationResponse.fail(output='Failed by me') with pytest.raises(RowTransformationError) as cv: - await manager.async_process_row( - mocker.MagicMock(), + await manager.transform_row( tfn, 3, {'row': 'data'}, {}, - mocker.MagicMock(), ) assert str(cv.value).endswith('row transformation failed: Failed by me.') @pytest.mark.asyncio -async def test_async_process_row_new_version(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) +async def test_transform_row_new_version(mocker, default_env): + manager = TransformationTasksManager(ConfigHelper(), mocker.MagicMock(), mocker.MagicMock()) response = RowTransformationResponse.done( {'row': 'row'}, @@ -772,125 +684,29 @@ async def tfn(row, row_styles): result_store_mock = mocker.MagicMock() result_store_mock.put = mocker.AsyncMock() - await manager.async_process_row( - mocker.MagicMock(), - tfn, - 3, - {'row': 'data'}, - {'row': 'style'}, - result_store_mock, - ) - result_store_mock.put.assert_called_with(3, response) - - -def test_sync_process_row_invalid_response(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) - - def tfn(row): - return 33 - - with pytest.raises(RowTransformationError) as cv: - manager.sync_process_row( - mocker.MagicMock(), - tfn, - 3, - {'row': 'data'}, - {}, - mocker.MagicMock(), - mocker.MagicMock(), - ) - - assert str(cv.value).endswith('invalid row tranformation response: 33.') - - -def test_sync_process_row_fail_response(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) - - def tfn(row): - return RowTransformationResponse.fail(output='Failed by me') - - with pytest.raises(RowTransformationError) as cv: - manager.sync_process_row( - mocker.MagicMock(), - tfn, - 3, - {'row': 'data'}, - {}, - mocker.MagicMock(), - mocker.MagicMock(), - ) - - assert str(cv.value).endswith('row transformation failed: Failed by me.') - - -def test_sync_process_row_deleted_row(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) - - tfn = mocker.MagicMock() - tfn.__name__ = 'my_func' - - result_store = mocker.AsyncMock() - - manager.sync_process_row( - mocker.MagicMock(), - tfn, - 3, - {'row': ROW_DELETED_MARKER}, - {}, - result_store, - mocker.MagicMock(), - ) - - assert result_store.put.mock_calls[0].args[1].status == ResultType.DELETE - tfn.assert_not_called() - - -def test_sync_process_row_new_version(mocker): - mocker.patch('asyncio.run_coroutine_threadsafe') - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) - - response = RowTransformationResponse.done( - {'row': 'row'}, - {'row': 'style'}, - ) - - def tfn(row, row_styles): - return response - - result_store_mock = mocker.MagicMock() - result_store_mock.put = mocker.MagicMock() - - manager.sync_process_row( - mocker.MagicMock(), + assert await manager.transform_row( tfn, 3, {'row': 'data'}, {'row': 'style'}, - result_store_mock, - mocker.MagicMock(), - ) - result_store_mock.put.assert_called_with(3, response) + ) == response @pytest.mark.asyncio -async def test_async_process_row_deleted_row(mocker): - manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) +async def test_transform_row_deleted_row(mocker, default_env): + manager = TransformationTasksManager(ConfigHelper(), mocker.MagicMock(), mocker.MagicMock()) tfn = mocker.AsyncMock() tfn.__name__ = 'my_func' - result_store = mocker.AsyncMock() - - await manager.async_process_row( - mocker.MagicMock(), + result = await manager.transform_row( tfn, 3, {'row': ROW_DELETED_MARKER}, {}, - result_store, ) - assert result_store.put.mock_calls[0].args[1].status == ResultType.DELETE + assert result.status == ResultType.DELETE tfn.assert_not_awaited()