-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow version: 1.10.9 (apache-airflow[crypto,celery,postgres,hive,jdbc,mysql,ssh,slack]==1.10.9)
Python version: 3.7.6 (docker: python:3.7.6-slim-buster)
Kubernetes version (if you are using kubernetes) (use kubectl version):
Environment:
- Cloud provider or hardware configuration: Tested locally and on AWS
- OS (e.g. from /etc/os-release): Debian 10 (buster) and Ubuntu 18.04.4 LTS (Bionic Beaver)
- Kernel (e.g.
uname -a): 5.7.8 and 4.15.0 - Install tools:
- Others:
What happened:
pool_slots appears to be fixed at 1.
If I set a higher slot count for tasks that are in a 10-slot pool, Airflow still runs 10 of them at once.
What you expected to happen:
Sum(pool_slots) for all tasks currently executing in some pool should not exceed the total number of slots in that pool
How to reproduce it:
-
Start Airflow 1.10.9 (this may occur on other versions too, but was only tested on 1.10.9).
-
Create pool:
airflow pool -s pool_slots_test 10 'pool_slots test pool'
- Enable+trigger DAG:
import time
from datetime import timedelta
import dateutil.parser
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id=f'__pool_slots_test',
start_date=dateutil.parser.parse('2020-01-01T00:00:00Z'),
schedule_interval=None,
catchup=False,
default_args={}
)
for x in range(2, 15):
PythonOperator(
dag=dag,
task_id=f'slots_{x}',
execution_timeout=timedelta(minutes=5),
op_args=[200],
op_kwargs={},
python_callable=lambda secs: time.sleep(secs),
pool='pool_slots_test',
pool_slots=x,
)
Enable + trigger the DAG, then notice that 10 tasks run at a time in parallel, and Admin⇒Pools summary shows 10 slots in use for the pool (out of 10 slots).
It appears that each task in the DAG requires 1 slot from Airflow's point of view; pool_slots value has been ignored.
Anything else we need to know:
airflow list_dags gives me a lot of warnings like so:
/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/sagemaker_base_operator.py:50: PendingDeprecationWarning: Invalid arguments were passed to SageMakerTuningOperator (task_id: xxxxx). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'pool_slots': 2}
super(SageMakerBaseOperator, self).__init__(*args, **kwargs)
Output of airflow version:
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.9
Output of python --version:
Python 3.7.6