Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more extensibility to AWS Batch #10

Merged
merged 6 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
dist: xenial

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got tests running with just bumping the dist to focal

language: python
python:
- "3.6" # current default Python on Travis CI
- "3.7"
- "3.8"
before_install:
- sudo apt-get autoremove sqlite3
- sudo apt-get install python-software-properties
- sudo apt-add-repository -y ppa:linuxgndu/sqlite-nightly
- sudo apt-get -y update
- sudo apt-cache show sqlite3
install:
- pip install apache-airflow boto3 pylint isort marshmallow
env:
Expand Down
34 changes: 28 additions & 6 deletions airflow_aws_executors/batch_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,20 @@ def execute_async(self, key: TaskInstanceKeyType, command: CommandType, queue=No
"""
if executor_config and 'command' in executor_config:
raise ValueError('Executor Config should never override "command"')
job_id = self._submit_job(command, executor_config or {})
job_id = self._submit_job(key, command, queue, executor_config or {})
self.active_workers.add_job(job_id, key)

def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
def _submit_job(
self,
key: TaskInstanceKeyType,
cmd: CommandType, queue: str,
exec_config: ExecutorConfigType
) -> str:
"""
The command and executor config will be placed in the container-override section of the JSON request, before
calling Boto3's "run_task" function.
calling Boto3's "submit_job" function.
"""
submit_job_api = deepcopy(self.submit_job_kwargs)
submit_job_api['containerOverrides'].update(exec_config)
submit_job_api['containerOverrides']['command'] = cmd
submit_job_api = self._submit_job_kwargs(key, cmd, queue, exec_config)
boto_run_task = self.batch.submit_job(**submit_job_api)
try:
submit_job_response = BatchSubmitJobResponseSchema().load(boto_run_task)
Expand All @@ -148,6 +151,25 @@ def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
)
return submit_job_response['job_id']

def _submit_job_kwargs(
self,
key: TaskInstanceKeyType,
cmd: CommandType,
queue: str, exec_config: ExecutorConfigType
) -> dict:
"""
This modifies the standard kwargs to be specific to this task by overriding the airflow command and updating
the container overrides.

One last chance to modify Boto3's "submit_job" kwarg params before it gets passed into the Boto3 client.
For the latest kwarg parameters:
.. seealso:: https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
"""
submit_job_api = deepcopy(self.submit_job_kwargs)
submit_job_api['containerOverrides'].update(exec_config)
submit_job_api['containerOverrides']['command'] = cmd
return submit_job_api

def end(self, heartbeat_interval=10):
"""
Waits for all currently running tasks to end, and doesn't launch any tasks
Expand Down
43 changes: 32 additions & 11 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ For `AWS ECS/Fargate`: [Getting Started with AWS ECS/Fargate ReadMe](getting_sta


## How Airflow Executors Work
Every time Apache Airflow wants to run a task, the Scheduler generates a CLI command that needs to be executed **somewhere**.
Every time Apache Airflow wants to run a task, the Scheduler generates a shell command that needs to be executed **somewhere**.
Under the hood this command will run Python code, and it looks something like this:
```bash
airflow run <DAG_ID> <TASK_ID> <EXECUTION_DATE>
Expand Down Expand Up @@ -61,28 +61,29 @@ The Celery Backend and worker queue also need attention and maintenance. I've tr
triggering CloudWatch Events, triggering capacity providers, triggering Application Autoscaling groups,
and it was a mess that I never got to work properly.

#### The Case for AWS Fargate
#### The Case for AWS Batch on AWS Fargate, and AWS Fargate
If you're on the Fargate executor it may take ~2.5 minutes for a task to pop up, but at least it's a constant O(1) time.
This way, the concept of tracking DAG Landing Times becomes unnecessary.
If you have more than 2000 concurrent tasks (which is a lot) then you can always contact AWS to provide an increase in this soft-limit.


## AWS Batch v AWS ECS v AWS Fargate?
**I almost always recommend that you go the AWS Batch route**. Especially since, as of Dec 2020, AWS Batch supports Fargate deployments. So unless you need some very custom flexibility provided by ECS, or have a particular reason to use AWS Fargate directly, then go with AWS Batch.

`AWS Batch` - Is built on top of ECS, but has additional features for Batch-Job management. Including auto-scaling up and down servers on an ECS cluster based on jobs submitted to a queue. Generally easier to configure and setup than either option.

`AWS Fargate` - Is a serverless container orchestration service; comparable to a proprietary AWS version of Kubernetes. Launching a Fargate Task is like saying "I want these containers to be launched somewhere in the cloud with X CPU and Y memory, and I don't care about the server". AWS Fargate is built on top of AWS ECS, and is easier to manage and maintain. However, it provides less flexibility.

`AWS ECS` - Is known as "Elastic Container Service", which is a container orchestration service that uses a designated cluster of EC2 instances that you operate, own, and maintain.

I almost always recommend that you go the AWS Batch or AWS Fargate route unless you need some very custom flexibility provided by ECS.

| | Batch | Fargate | ECS |
|-------------------|-------------------------------------------------------------------------------------|---------------------------------------------|---------------------------------------------------|
| Start-up per task | Instantaneous 3s, if capacity available; otherwise 2-3 minutes to launch new server | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
| Maintenance | You patch the own, operate, and patch the servers | Serverless | You patch the own, operate, and patch the servers |
| Start-up per task | Combines both, depending on if the job queue is Fargate serverless | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
| Maintenance | You patch the own, operate, and patch the servers OR Serverless (as of Dec 2020) | Serverless | You patch the own, operate, and patch the servers |
| Capacity | Autoscales to configurable Max vCPUs in compute environment | ~2000 containers. See AWS Limits | Fixed. Not auto-scaling. |
| Flexibility | High. Almost anything that you can do on an EC2 | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
| Fractional CPUs? | No. Each task has 1 vCPU. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |
| Flexibility | Combines both, depending on if the job queue is Fargate serverless | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
| Fractional CPUs? | Yes, as of Dec 2020 a task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |


## Optional Container Requirements
Expand All @@ -98,10 +99,8 @@ task = PythonOperator(
python_callable=lambda *args, **kwargs: print('hello world'),
task_id='say_hello',
executor_config=dict(
containerOverrides=dict(
vcpus=1, # no fractional CPUs
memory=512
)
vcpus=1,
memory=512
),
dag=dag
)
Expand Down Expand Up @@ -220,6 +219,17 @@ CUSTOM_SUBMIT_JOB_KWARGS['retryStrategy'] = {'attempts': 3}
CUSTOM_SUBMIT_JOB_KWARGS['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
```

"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"

```python
class CustomBatchExecutor(AwsBatchExecutor):
def _submit_job_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
submit_job_api = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
submit_job_api['retryStrategy'] = {'attempts': 3}
submit_job_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return submit_job_api
```

#### AWS ECS/Fargate
In this example we will modify the default `submit_job_kwargs`. Note, however, there is nothing that's stopping us
Expand All @@ -244,6 +254,17 @@ CUSTOM_RUN_TASK_KWARGS['overrides']['containerOverrides'][0]['environment'] = [
]
```

"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"

```python
class CustomFargateExecutor(AwsFargateExecutor):
def _run_task_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
run_task_api = super()._run_task_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
run_task_api['retryStrategy'] = {'attempts': 3}
run_task_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return run_task_api
```

## Issues & Bugs
Please file a ticket in GitHub for issues. Be persistent and be polite.
Expand Down