[core]
load_examples = False
colored_console_log = False
executor = SequentialExecutor
logging_level = notset
[webserver]
enable_proxy_fix = True
expose_config = True
rbac = True
[scheduler]
scheduler_heartbeat_sec = 5
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow
statsd_host = airflow-statsd
run_duration = 41460
processor_poll_interval=5
dag_dir_list_interval=1
[kubernetes]
namespace = airflow
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
worker_container_repository = bdiregistry.com/bio-workflow
worker_container_tag = latest
worker_container_image_pull_policy = IfNotPresent
worker_service_account_name = airflow-worker-serviceaccount
image_pull_secrets = airflow-registry
dags_in_image = True
delete_worker_pods = False
[kubernetes_secrets]
AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-airflow-metadata=connection
AIRFLOW__CORE__FERNET_KEY = airflow-fernet-key=fernet-key
[kubernetes_labels]
tier = airflow
component = worker
release = airflow
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.kubernetes.pod import Port
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['tuandn8@vinbdi.org'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(dag_id="Whole_Genome_Pipeline",
default_args=default_args,
schedule_interval=timedelta(minutes=3)
)
dataset_volume_mount = VolumeMount(
name="input",
mount_path="/input",
sub_path=None,
read_only=True
)
dataset_volume = Volume(name='intput',
configs={
"hostPath": {
# "path": "/home/admin.toandd1/demo-pipelines/mash-pipeline/input",
"path": "/home/tuan/pipelines/mash-pipelines/dataset",
"type": "Directory"
}
})
reference_volume_mount = VolumeMount(
name="reference",
mount_path="/reference",
sub_path=None,
read_only=True
)
reference_volume = Volume(name='reference',
configs={
"hostPath": {
# "path": "/home/admin.toandd1/demo-pipelines/mash-pipeline/reference",
"path": "/home/tuan/pipelines/mash-pipelines/Ref",
"type": "Directory"
}
})
output_volume_mount = VolumeMount(
name="output",
mount_path="/output",
sub_path=None,
read_only=False
)
output_volume = Volume(name="output",
configs={
"hostPath": {
# "path": "/home/admin.toandd1/demo-pipelines/mash-pipeline/output",
"path": "/home/tuan/pipelines/mash-pipelines/Result",
"type": "Directory"
}
})
alignment = KubernetesPodOperator(
namespace="workflow",
image="bdiregistry.com/bio-workflow",
cmds=["bash", "-c"],
arguments=["bwa mem -M -Y -t 16 -K 100000000 -p /reference/Homo_sapiens_assembly38.fasta /input/mother_R1.fq.gz /input/mother_R2.fq.gz -o /output/mother.sam"],
volumes=[dataset_volume, reference_volume, output_volume],
volume_mounts=[dataset_volume_mount,
reference_volume_mount, output_volume_mount],
name="Alignment_Refgenome",
task_id="alignment-task",
is_delete_operator_pod=True,
hostnetwork=False,
dag=dag)
fixmate = KubernetesPodOperator(
namespace="workflow",
image="bdiregistry.com/bio-workflow",
cmds=["bash", "-c"],
arguments=[
"samtools fixmate /output/mother.sam - O bam /output/mother.lanefixed.bam"],
volumes=[dataset_volume, reference_volume, output_volume],
volume_mounts=[dataset_volume_mount,
reference_volume_mount, output_volume_mount],
name="Fixmate",
task_id="fixmate-task",
is_delete_operator_pod=True,
hostnetwork=False,
dag=dag)
alignment >> fixmate
No thing queue on Scheduler.

Apache Airflow version: apache/airflow:1.10.10.1-alpha2-python3.6
Kubernetes version (if you are using kubernetes) (use
kubectl version): 1.17.3Environment:
Rancher 2.4.2 on Ubuntu 20.04
uname -a):What happened:
I install Airflow from chart in repository.
Config here:
and my dag:
What you expected to happen:
It run each task on each pod.
No thing queue on Scheduler.
