Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 50 additions & 83 deletions connect/eaas/runner/managers/transformation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import functools
import inspect
import json
import logging
Expand Down Expand Up @@ -56,9 +57,6 @@
from connect.eaas.runner.managers.base import (
TasksManagerBase,
)
from connect.eaas.runner.managers.utils import (
ResultStore,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -190,7 +188,6 @@ async def _fail_task(self, task_data, message):

async def process_transformation(self, task_data, tfn_request, method):
extension_logger = self.get_extension_logger(task_data)
semaphore = asyncio.Semaphore(TRANSFORMATION_TASK_MAX_PARALLEL_LINES)
input_file = await asyncio.get_running_loop().run_in_executor(
self.executor,
self.download_excel,
Expand All @@ -203,7 +200,7 @@ async def process_transformation(self, task_data, tfn_request, method):
)

read_queue = asyncio.Queue(TRANSFORMATION_TASK_MAX_PARALLEL_LINES)
result_store = ResultStore()
write_queue = asyncio.Queue()

loop = asyncio.get_event_loop()

Expand All @@ -220,17 +217,16 @@ async def process_transformation(self, task_data, tfn_request, method):
self.executor,
self.write_excel,
output_file.name,
result_store,
write_queue,
tfn_request['stats']['rows']['total'],
tfn_request['transformation']['columns']['output'],
task_data,
extension_logger,
loop,
)
processor_task = asyncio.create_task(self.process_rows(
semaphore,
read_queue,
result_store,
write_queue,
method,
tfn_request,
extension_logger,
Expand All @@ -255,7 +251,7 @@ async def process_transformation(self, task_data, tfn_request, method):
await client.conversations[task_data.input.object_id].messages.create(
payload={
'type': 'message',
'text': f'Transformation request processing failed: {str(e)}',
'text': f'Transformation request processing failed: {str(e) or "timed out"}',
},
)
return TransformationResponse.fail(output=str(e))
Expand Down Expand Up @@ -322,10 +318,11 @@ def read_excel(self, tfn_request, filename, queue, logger, loop):
style.number_format = col_value.number_format
row_styles[lookup_columns[col_idx]] = style

asyncio.run_coroutine_threadsafe(
future = asyncio.run_coroutine_threadsafe(
queue.put((idx, row_data, row_styles)),
loop,
)
future.result()
if idx % delta == 0 or idx == total_rows:
logger.info(
f'Input file read progress for {tfn_request["id"]}:'
Expand All @@ -335,112 +332,82 @@ def read_excel(self, tfn_request, filename, queue, logger, loop):
logger.info(f'Input file read complete for {tfn_request["id"]}')
wb.close()

async def process_rows(self, semaphore, read_queue, result_store, method, tfn_request, logger):
async def process_rows(self, read_queue, result_store, method, tfn_request, logger):
rows_processed = 0
tasks = []
total_rows = tfn_request['stats']['rows']['total']
delta = 1 if total_rows <= 10 else round(total_rows / 10)
while rows_processed < total_rows:
await semaphore.acquire()
row_idx, row, row_styles = await read_queue.get()
if inspect.iscoroutinefunction(method):
tasks.append(
asyncio.create_task(
asyncio.wait_for(
self.async_process_row(
semaphore,
method,
row_idx,
row,
row_styles,
result_store,
),
self.config.get_timeout('row_transformation'),
),
),
)
else:
loop = asyncio.get_running_loop()
tasks.append(
asyncio.create_task(
asyncio.wait_for(
loop.run_in_executor(
self.executor,
self.sync_process_row,
semaphore,
method,
row_idx,
row,
row_styles,
result_store,
loop,
),
self.config.get_timeout('row_transformation'),
),
tasks.append(
asyncio.create_task(
self.transform_row(
method,
row_idx,
row,
row_styles,
),
)
),
)

rows_processed += 1
if rows_processed % TRANSFORMATION_TASK_MAX_PARALLEL_LINES == 0:
group = asyncio.gather(*tasks)
try:
results = await group
for result in results:
await result_store.put(result)
except Exception as e:
logger.exception('Error during applying transformations.')
group.cancel()
raise e
tasks = []
if rows_processed % delta == 0 or rows_processed == total_rows:
logger.info(
f'Starting transformation tasks for {tfn_request["id"]}:'
f' {rows_processed}/{total_rows} started',
)

group = asyncio.gather(*tasks)
try:
logger.debug('gathering transformation tasks...')
await asyncio.gather(*tasks)
results = await group
for result in results:
await result_store.put(result)
except Exception as e:
logger.exception('Error during applying transformations.')
for task in tasks:
task.cancel()
group.cancel()
raise e

async def async_process_row(self, semaphore, method, row_idx, row, row_styles, result_store):
async def transform_row(self, method, row_idx, row, row_styles):
try:
if ROW_DELETED_MARKER in list(row.values()):
await result_store.put(row_idx, RowTransformationResponse.delete())
return
# await result_store.put((row_idx, RowTransformationResponse.delete()))
return RowTransformationResponse.delete()
kwargs = {}
if 'row_styles' in inspect.signature(method).parameters:
kwargs['row_styles'] = row_styles
response = await method(row, **kwargs)
if not isinstance(response, RowTransformationResponse):
raise RowTransformationError(f'invalid row tranformation response: {response}')
if response.status == ResultType.FAIL:
raise RowTransformationError(f'row transformation failed: {response.output}')
await result_store.put(row_idx, response)
except Exception as e:
raise RowTransformationError(
f'Error applying transformation function {method.__name__} '
f'to row #{row_idx}: {str(e)}.',
) from e
finally:
semaphore.release()

def sync_process_row(self, semaphore, method, row_idx, row, row_styles, result_store, loop):
try:
if ROW_DELETED_MARKER in list(row.values()):
asyncio.run_coroutine_threadsafe(
result_store.put(row_idx, RowTransformationResponse.delete()), loop,
if inspect.iscoroutinefunction(method):
awaitable = method(row, **kwargs)
else:
loop = asyncio.get_running_loop()
awaitable = loop.run_in_executor(
self.executor,
functools.partial(method, row, **kwargs),
)
return
kwargs = {}
if 'row_styles' in inspect.signature(method).parameters:
kwargs['row_styles'] = row_styles
response = method(row, **kwargs)
response = await asyncio.wait_for(
awaitable,
timeout=self.config.get_timeout('row_transformation'),
)
if not isinstance(response, RowTransformationResponse):
raise RowTransformationError(f'invalid row tranformation response: {response}')
if response.status == ResultType.FAIL:
raise RowTransformationError(f'row transformation failed: {response.output}')
asyncio.run_coroutine_threadsafe(result_store.put(row_idx, response), loop)
return response
except Exception as e:
raise RowTransformationError(
f'Error applying transformation function {method.__name__} '
f'to row #{row_idx}: {str(e)}.',
) from e
finally:
semaphore.release()

def write_excel(
self, filename, result_store, total_rows, output_columns, task_data, logger, loop,
Expand All @@ -467,9 +434,9 @@ def write_excel(
REPORT_EVERY_ROW_MAX,
)

for idx in range(2, total_rows + 2):
for _ in range(2, total_rows + 2):
future = asyncio.run_coroutine_threadsafe(
result_store.get(idx),
result_store.get(),
loop,
)
response = future.result(
Expand Down
20 changes: 0 additions & 20 deletions connect/eaas/runner/managers/utils.py

This file was deleted.

Loading