Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add in the ability to use multiple queues in AwsBatchExecutor #9

Closed
wants to merge 5 commits into from

Conversation

leonsmith
Copy link

@leonsmith leonsmith commented Mar 10, 2021

RE: #8

Bit of a work in progress but shows the small tweak we need to be able to send to multiple Batch job queues.

  • Update documentation

@leonsmith leonsmith changed the title WIP: Add in the ability to use queues in AwsBatchExecutor WIP: Add in the ability to use multiple queues in AwsBatchExecutor Mar 10, 2021
@@ -6,7 +7,7 @@ python:
install:
- pip install apache-airflow boto3 pylint isort marshmallow
env:
- AIRFLOW__BATCH__REGION=us-west-1 AIRFLOW__BATCH__JOB_NAME=some-job-name AIRFLOW__BATCH__JOB_QUEUE=some-job-queue AIRFLOW__BATCH__JOB_DEFINITION=some-job-def AIRFLOW__ECS_FARGATE__REGION=us-west-1 AIRFLOW__ECS_FARGATE__CLUSTER=some-cluster AIRFLOW__ECS_FARGATE__CONTAINER_NAME=some-container-name AIRFLOW__ECS_FARGATE__TASK_DEFINITION=some-task-def AIRFLOW__ECS_FARGATE__LAUNCH_TYPE=FARGATE AIRFLOW__ECS_FARGATE__PLATFORM_VERSION=LATEST AIRFLOW__ECS_FARGATE__ASSIGN_PUBLIC_IP=DISABLED AIRFLOW__ECS_FARGATE__SECURITY_GROUPS=SG1,SG2 AIRFLOW__ECS_FARGATE__SUBNETS=SUB1,SUB2
- AIRFLOW__BATCH__REGION=us-west-1 AIRFLOW__BATCH__JOB_NAME=some-job-name AIRFLOW__OPERATORS__DEFAULT_QUEUE=some-job-queue AIRFLOW__BATCH__JOB_DEFINITION=some-job-def AIRFLOW__ECS_FARGATE__REGION=us-west-1 AIRFLOW__ECS_FARGATE__CLUSTER=some-cluster AIRFLOW__ECS_FARGATE__CONTAINER_NAME=some-container-name AIRFLOW__ECS_FARGATE__TASK_DEFINITION=some-task-def AIRFLOW__ECS_FARGATE__LAUNCH_TYPE=FARGATE AIRFLOW__ECS_FARGATE__PLATFORM_VERSION=LATEST AIRFLOW__ECS_FARGATE__ASSIGN_PUBLIC_IP=DISABLED AIRFLOW__ECS_FARGATE__SECURITY_GROUPS=SG1,SG2 AIRFLOW__ECS_FARGATE__SUBNETS=SUB1,SUB2
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this may work on airflow 2.0, it will not on airflow 1.0 :(

@@ -99,6 +99,27 @@ def sync(self):
task_key = self.active_workers.pop_by_id(job.job_id)
self.success(task_key)

def trigger_tasks(self, open_slots: int) -> None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed upstream to base Airflow. The major reasoning is that order_queued_tasks_by_priority is an Airflow 2.0 method. So this introduction would break backwards compatibility with Airflow 1.10.14 and lower.


# We know we will always have a queue here but we keep the same signature
# of the BaseExecutor so make the type system happy by asserting its not None
assert queue is not None
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the default value from the keyword arg instead of having this line. They would both result in an exception, but at least the reason for failure would be more clear.

if executor_config and 'command' in executor_config:
raise ValueError('Executor Config should never override "command"')
job_id = self._submit_job(command, executor_config or {})

job_id = self._submit_job(command, executor_config or {}, queue=queue)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice

"""
The command and executor config will be placed in the container-override section of the JSON request, before
calling Boto3's "run_task" function.
"""
submit_job_api = deepcopy(self.submit_job_kwargs)
submit_job_api['containerOverrides'].update(exec_config)
submit_job_api['containerOverrides']['command'] = cmd

submit_job_api['jobQueue'] = queue
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double-noice

self,
key: TaskInstanceKeyType,
command: CommandType,
queue: Optional[str] = None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem. Suppose you declare an operator with no queue kwarg (as most operators are created). The default queue is based off of your airflow cfg value for celery.default_queue.
That means that instead of using your batch.job_queue config, you'll end up using celery.default_queue; which is not reasonable. I see you're experimenting with operator.default_queue; I'm not quite familiar with this?

vcpus=1, # no fractional CPUs
memory=512
)
vcpus=1, # no fractional CPUs
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thank you for catching this

@aelzeiny
Copy link
Owner

aelzeiny commented Mar 15, 2021

First off, I didn't realize that it was going to be this complicated from the offset. I think part of the reason is that queues are just purely a Celery concept in the current version of Airflow, and even if they fix it it'll still be a problem in older versions of Airflow (less-than-v2). If we decide to support queues the code is going to look much less elegant with conditionals on versions; which I'm not strongly opposed to -- form follows function.

I see that in Airflow 2.0 that queue is not being passed in; which is strange because that was not an issue in Airflow v1.14 and lower. Here's a screenshot from airflow v1.10.5

@aelzeiny
Copy link
Owner

aelzeiny commented Mar 15, 2021

Or, hear me out, I think the larger issue here is that this plugin is extremely rigid with its configurability. I allow you to override containerOverrides but not jobQueue. What's the difference? Why does this plugin even care what you override? If a programmer wants to do something, well-designed code should just move out of the way.

So basically what's wrong with this? _submit_job will always call _submit_job_kwargs, and the programmer decides what the structure looks like. Infinite flexibility to do whatever you want when you implement your plugin?

class CustomBatchExecutor(AwsBatchExecutor):
    def _submit_job_kwargs(self, task_id: TaskInstanceKeyType, cmd: CommandType, queue: str, exec_config: ExecutorConfigType) -> dict:
        kwargs = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
        job_queue = kwargs.pop('job_queue', None) or kwargs.pop('jobQueue', None)
        if job_queue:
            kwargs['jobQueue'] = job_queue
        return kwargs

@leonsmith
Copy link
Author

Cheers for the comments & your thoughts! I didn't realise you was targeting 1.0 still.

More than happy to go with allowing submit_job_kwargs to be overridden!

@aelzeiny
Copy link
Owner

Cool, I have a PR up, but the newest version of airflow requires a certain version of sql-alchemy, and Travis CI is fighting me on this one. I'll get it done though

@leonsmith
Copy link
Author

Closing in favour of #10

@leonsmith leonsmith closed this Mar 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants