Airflow about subDAGs, branching and xcom
When having to deal with real life DAGs (the python scripts that compose workflows in airflow) the communication among tasks, the possibility to define sub-DAGs and DAG branches are normal requirements. Airflow provide native support for all of these needs, although they aren't well documented.
This wiki page shows some examples of usage of 3 these features:
subDAG In order to keep low the complexity of the DAG definition a DAG can be decomposed in many subDAGs. a subDAG can be set as a upstream or downstream of another subDAG or a task in the same way of a task. A subDAG can also be used to run in parallel multiple tasks.
Branching the operator BranchingPythonOperator can be used to implement an operator which is able to choose if run a branch or another based on a condition
xcom the airflow intertask communication system: 2 tasks, t1 and t2 where t2 is a downstream of t1 (so t1 is executed first) can share messages accessing to the airflow database. Airflow provides a mechanism to push (save in the db) and pull (retrieve from db) those messages abstracting the db access.
We are going to experiment with all the 3 features using:
A plugin implementing some ad-hoc Operators for this exercise: see poc_plugin.py (poc stands for proof of concept) which contains:
- an operator called Pusher which is responsible only to push messages through xcom
- two operators called PullerFromDAG and PullerFromSubDAG which are responsible to pull messages through xcom there are 2 different operators that do the Pull in order to highlight some issuesin the xcom communication model.
A DAG file used to compose different DAGs layouts: see poc_dag.py
a file to create a subDAG called subdag_factory.py (more explanation later)
In the next 2 sections we will be focused only on the DAG composition, the 3rd section will analyze the outcome of the different DAG created.
SubDAGs allow the DAG developers to better organize complex DAG definitions. It is important that:
- The main DAG see and manage all the SubDAGs as normal tasks
- the Airflow admin GUI list only the main DAG in the main DAGs list, then it will be possible to "zoom-in" to the SubDAG in the Graph View section of the GUI.
In order to keep the previous properties valid the SubDAG must be
- created by a factory method
- associated to the main DAG using the built-in SubDagOperator
So let's create the factory method to compose the subDAG. In the
subdag_factory.py file there's the function
pushers_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval)
def pushers_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, ) push1 = Pusher(a_msg='message number 1', task_id='task_push1', dag=dag) push2 = Pusher(a_msg='message number 2', task_id='task_push2', dag=dag) push3 = Pusher(a_msg='message number 3', task_id='task_push3', dag=dag) push4 = Pusher(a_msg='message number 4', task_id='task_push4', dag=dag) push5 = Pusher(a_msg='message number 5', task_id='task_push5', dag=dag) push1 >> push2 >> push3 >> push4 >> push5 return dag
and in the
poc_dag.py can be found the definition of a SubDagOperator that take as input parameter that function
sub_dag = SubDagOperator( subdag = pushers_sub_dag('xcom_canvas_dag', 'xcom_canvas_subdag', date, interval), task_id='xcom_canvas_subdag', dag=main_dag, )
sub_dag is a task created from the SubDagOperator and it can be attached to the main DAG as a normal task.
Let's see how the Airflow Graph View shows this DAG:
Let's pretend for now that we have only the poc_canvas_subdag and the puller_task in our DAG.
The poc_canvas_subdag task is colored dark grey, it means that it is a sub dag.
If we didn't have used the factory method pushers_sub_dag to create it and if we just defined it as a normal DAG we would ended up to have another DAG listed in the homepage.
clicking on the SubDAG we can open the internal Graph View of the dag
which is basically a chain. Task 5 is downstream of task 4 which is downstream of task 3 ecc...
if we comment the following line in
[...] push4 = Pusher(a_msg='message number 4', task_id='task_push4', dag=dag) push5 = Pusher(a_msg='message number 5', task_id='task_push5', dag=dag) # I don't want specify any dependency among the tasks #push1 >> push2 >> push3 >> push4 >> push5 return dag
we end up to have a parallel SubDAG, there's no dependency among the tasks here.
That's mean that all of the tasks in the SubDAG can be executed in any order but before to execute the puller_task which is a downstream dependency of our SubDAG.
The parallel SubDAG is exactly what we need in order to implement the operator described in this issue
Let's have a look to the main DAG again
the first task is called branching because, as the name suggest, is able to generate 2 different branches depending on a condition.
In order to do create branching task the
BranchPythonOperator must be used. It is very similar to the Python operator but it takes as argument a lambda function in order to decide which branch must be executed.
[...] options = ['task_push1', 'poc_canvas_dag'] branching = BranchPythonOperator( task_id='branching', python_callable=lambda: random.choice(options), dag=main_dag) [...]
In this example we define a random function which choose which task of its dependency execute.
Then the DAG composition can be done as we simply wanted to fork the DAG
[...] branching >> single_pusher >> pull branching >> sub_dag >> pull [...]
But since the last node before the fork is a Branching task only one of the 2 branches will be executed
A DAG is composed of tasks so the structure of a DAG is defined by how the tasks are connected each other. Setting a downstream or upstream connection between tasks imply only the order with which the tasks are executed.
In order to allow tasks to communicate they can:
- Share information explicitly using a shared storage (FileSystem, S3 ecc...) which is bad if 2 tasks wants only to share text or numeric parameters.
- Use the airflow built-in XCom feature.
XCom allow airflow tasks of the same dag to send and receive messages. Since the Airflow workers can be spread out among different machines an in-memory implementation of XCom wouldn't make sense. XCom messages are stored in the airflow database and the Operator developer can use high level function to send and receive messages without the need for explicitly connect to the database.
Let's take a look to the execute function of our Pusher operator:
def execute(self, context): log.info("message to push: '%s'", self.msg) task_instance = context['task_instance'] task_instance.xcom_push(key="the_message", value=self.msg)
First it retrieve from the airflow context the information about itself with
context['task_instance'] then it send a message calling the
xcom_push function of the task_instance object. The parametrs of xcom_push are quite trivial: the message value (here stored in self.msg) and a key for the message.
Nothing bad on the push side, but let's see what happen when we want to retrieve a message. Let's look inside execute function of the Operator
PullerFromSubDAG we used as the final step of the poc_dag.
def execute(self, context): task_instance = context['task_instance'] for idx in [1,2,3,4,5]: msg = task_instance.xcom_pull(task_ids='task_push'+str(idx), key='the_message', dag_id='poc_canvas_dag.poc_canvas_subdag') log.info("the pulled message is: '%s'", msg)
Here there are the bad thing:
We are iterating explicitly over our previous tasks contained in the SubDAG that's mean that we have to know the structure of the whole DAG and the name of its tasks when we want to pull messages from certain tasks. Heuristics like "retrieve all the messages from the previous n tasks" seems to be not possible to implement.
Note that we had also to explicitly pass the name of the SubDAG since we want to pull from tasks contained there. The
dag_idparam is not required if the 2 comunicating tasks belong to the same "DAG level".
All the messages are stored in the DB and they remains there forever. Airflow automatically select the most recent when more than one is found for the same key/task_id couple.