From d063dee39238760ea24c95ce06cf9b35d5a32f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Fri, 27 Dec 2019 22:57:58 +0100 Subject: [PATCH] [AIRFLOW-6376] Extract order_queued_tasks_by_priority method --- airflow/executors/base_executor.py | 17 +++++++++++++---- airflow/executors/celery_executor.py | 13 +------------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7978a6608f2c9..2eb9a65067c90 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -150,16 +150,25 @@ def heartbeat(self) -> None: self.log.debug("Calling the %s sync method", self.__class__) self.sync() - def trigger_tasks(self, open_slots: int) -> None: + def order_queued_tasks_by_priority(self) -> List[Tuple[TaskInstanceKeyType, QueuedTaskInstanceType]]: """ - Triggers tasks + Orders the queued tasks by priority. - :param open_slots: Number of open slots + :return: List of tuples from the queued_tasks according to the priority. """ - sorted_queue = sorted( + return sorted( [(k, v) for k, v in self.queued_tasks.items()], # pylint: disable=unnecessary-comprehension key=lambda x: x[1][1], reverse=True) + + def trigger_tasks(self, open_slots: int) -> None: + """ + Triggers tasks + + :param open_slots: Number of open slots + """ + sorted_queue = self.order_queued_tasks_by_priority() + for _ in range(min((open_slots, len(self.queued_tasks)))): key, (command, _, _, simple_ti) = sorted_queue.pop(0) self.queued_tasks.pop(key) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 1ce20d5aa865b..d59a83b65cfd2 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -30,7 +30,7 @@ from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.executors.base_executor import BaseExecutor, CommandType, QueuedTaskInstanceType +from airflow.executors.base_executor import BaseExecutor, CommandType from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string @@ -234,17 +234,6 @@ def trigger_tasks(self, open_slots: int) -> None: self.tasks[key] = result self.last_state[key] = celery_states.PENDING - def order_queued_tasks_by_priority(self) -> List[Tuple[TaskInstanceKeyType, QueuedTaskInstanceType]]: - """ - Orders the queued tasks by priority. - - :return: List of tuples from the queued_tasks according to the priority. - """ - return sorted( - [(k, v) for k, v in self.queued_tasks.items()], # pylint: disable=unnecessary-comprehension - key=lambda x: x[1][1], - reverse=True) - def sync(self) -> None: num_processes = min(len(self.tasks), self._sync_parallelism) if num_processes == 0: