Allow sharing task definitions amongst a worker process in order to conserve memory#219
Conversation
…t, the task is shared for the worker process and the worker process round robins between tasks.
coderabhigupta
left a comment
There was a problem hiding this comment.
I am not sure if this will work because get_task_definition_name is called at 3 places which are from inside __poll_task, __execute_task and __update_task. In run_once, we will end up operating on three task definitions in case we have multiple task definitions which will return erroneous results. Here is the aforementioned method
def run_once(self) -> None:
task = self.__poll_task()
if task != None and task.task_id != None:
task_result = self.__execute_task(task)
self.__update_task(task_result)
self.__wait_for_polling_interval()
I appreciate the optimization attempt but we need to implement something more robust here. Also, I am curious if your tests checked for accuracy and if you were able to confirm functional parity.
…rate on the same task definition
|
I think you are right -- I've pushed a refactor commit, I think now by leveraging cached_property and clearing it at the start of |
|
Actually -- I am trying to test this with the cached_property change and not quite working yet, let me give it another take. |
|
OK -- The latest commit I pushed, I was able to test a 13 simple task worker where 3 of the tasks I was trying to execute and they did execute successfully and within short time. |
coderabhigupta
left a comment
There was a problem hiding this comment.
Results might be accurate now but we still will end up referencing wrong task_definition_name for logging and metrics. See __execute_task and __update_task, they have this -
task_definition_name = self.worker.get_task_definition_name(). I think we might need to pass the task_definition_name from run_once and use it inside the two methods mentioned above.
| def compute_task_definition_name(self): | ||
| if isinstance(self.task_definition_name, list): | ||
| task_definition_name = self.task_definition_name[self.next_task_index] | ||
| self.next_task_index = (self.next_task_index + 1) % len(self.task_definition_name) |
There was a problem hiding this comment.
We should probably check for emptiness to make sure we don't run into modulo by 0 if len(self.task_definition_name) == 0.
There was a problem hiding this comment.
If someone instantiates a worker with an empty task list, would it be better to raise an error during initialization?
I think those only get executed from within run_once? That is why I added the caching, so it remains consistent for each iteration of the loop -- but maybe I am not seeing the issue yet. |
Can you turn on debug as check the logs to confirm what task_defination_name is getting logged. |
|
@coderabhigupta It wasn't quite working as I found when writing a unit test, but I simplified how I was doing the caching and now it is working. The logs were consistent with this change, but integration wise I am testing with a private worker and should not disclose what those logs look like. |
|
|
||
| # Append task with the right shift operator `>>` | ||
| def __rshift__(self, task: TaskInterface | List[TaskInterface] | List[List[TaskInterface]]) -> Self: | ||
| def __rshift__(self, task: Union[TaskInterface, List[TaskInterface], List[List[TaskInterface]]]) -> Self: |
There was a problem hiding this comment.
pytest couldn't collect these tests because the type hint is wrong here.
There was a problem hiding this comment.
Thanks for fixing this.
| @@ -1,13 +1,11 @@ | |||
| from conductor.client.http.models.workflow_def import WorkflowDef | |||
| from conductor.client.http.models.workflow_task import WorkflowTask | |||
|
Also fwiw I think the integration tests failed on the last Actions run because I lack some required env variables in my fork settings. |
|
|
||
| # Append task with the right shift operator `>>` | ||
| def __rshift__(self, task: TaskInterface | List[TaskInterface] | List[List[TaskInterface]]) -> Self: | ||
| def __rshift__(self, task: Union[TaskInterface, List[TaskInterface], List[List[TaskInterface]]]) -> Self: |
There was a problem hiding this comment.
Thanks for fixing this.
|
@coderabhigupta Thanks for your approval -- is anything else needed from me to see this change make it into a release? |
@matteius I will merge and push this as part of our next release which should happen sometime next week. |
Background: When invoking a conductor worker that has many tasks, some of which have work to do infrequently, and leveraging a Django monolith to utilize the ORM, it ends up consuming a lot of memory. With this optimization we can group tasks into lists that are shared and round-robined for that worker process, thus conserving memory.
Running an example worker set of tasks on a large Django monolith today:

Running the same worker tasks all within one worker using this optimization:
