Skip to content

how to run a DAG in distrbuted airflow #14713

@super017

Description

@super017

The version of airflow I deployed is 1.10.14.A master and three workers.I want to complete several Workers to complete a DAG.
my code :
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
import time
import math

default_args = {
'owner':'airflow',
'retries':3,
'depends_on_past':False,
'start_date': days_ago(2),
'provide_context':True,
}
dag = DAG(
'xhcyeh',
default_args=default_args,
schedule_interval=timedelta(minutes=5)
)

def function1(**kwargs):
sum =2
#x = kwargs['dag_run'].conf['x']
#x = kwargs.get("dag_run").conf.get("x")
x = 5
for i in range(x):
sum +=i
print(sum)
f = open('/home/lis/airflow/dags/x.txt','w')
f.write(str(sum))
f.close()
kwargs['task_instance'].xcom_push(key='sea1',value=str(sum))
return sum

def function2(**kwargs):
sum = kwargs['task_instance'].xcom_pull(key='sea1',task_ids='function1')
f = open('/home/ls/airflow/dags/y.txt','w')
f.write(str(math.pow(int(sum),2)))
f.close()
return math.pow(int(sum),2)

run_this = PythonOperator(
task_id='function1',
python_callable=function1,
queue='cdh6-2-node93',
dag=dag,
)

t2 = PythonOperator(
task_id='function2',
python_callable= function2,
queue='cdh6-2-node94',
dag=dag,
)
run_this >> t2
I want to test two workers working together to complete a simple calculation.What happens is that the first function is finished and the second function stays in the scheduled state.What happens is that the first function is finished and the second function stays in the scheduled state.The flower log shows celery failed. There is also no record of parameter passing in the XCOM table of the background database.Is there something wrong with my code?What changes should I make

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions