Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseBranchOperator should push to xcom by default #13704

Closed
HaloKo4 opened this issue Jan 15, 2021 · 2 comments · Fixed by #13763
Closed

BaseBranchOperator should push to xcom by default #13704

HaloKo4 opened this issue Jan 15, 2021 · 2 comments · Fixed by #13763
Assignees

Comments

@HaloKo4
Copy link

HaloKo4 commented Jan 15, 2021

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version):
Not relevant

Environment:
Not relevant

What happened:

BranchPythonOperator performs xcom push by default since this is the behavior of PythonOperator.
However BaseBranchOperator doesn't do xcom push.

Note: It's impossible to push to xcom manually because the BaseBranchOperator has no return in it's execute method. So even when using do_xcom_push=True it won't help
https://github.com/apache/airflow/blob/master/airflow/operators/branch.py#L52

What you expected to happen:
BaseBranchOperator to do xcom push of the branch it choose to follow as the default or at least to support the parameter of do_xcom_push=True

@HaloKo4 HaloKo4 added the kind:bug This is a clearly a bug label Jan 15, 2021
@turbaszek turbaszek added kind:feature Feature Requests and removed kind:bug This is a clearly a bug labels Jan 15, 2021
@turbaszek
Copy link
Member

It's impossible to push to xcom manually

Strictly speaking it is possible because you can override the execute method in your custom branch operator.

Apart from that your suggestion make sense to me - I don't see any drawbacks to that. Would you mind opening a PR?

@ashmeet13
Copy link
Contributor

ashmeet13 commented Jan 17, 2021

Hi!
I would like to pick this issue up if it's open and not being worked upon.

From what I understand the execute function in BaseBranchOperator does not return the value calculated by choose_branch.

A simple change from -

    def execute(self, context: Dict):
        self.skip_all_except(context['ti'], self.choose_branch(context))

to

    def execute(self, context: Dict):
        branches_to_execute = self.choose_branch(context)
        self.skip_all_except(context['ti'], branches_to_execute)
        return branches_to_execute

should do the trick and do_xcom_push=True should be usable
New to Airflow so please do correct me if I am wrong!

Thank you.

ashmeet13 added a commit to ashmeet13/airflow that referenced this issue Jan 19, 2021
This change will BaseBranchOperator to do xcom push of the branch it choose to follow.
It will also add support to use the do_xcom_push parameter.

The added change returns the result received by running choose_branch().
kaxil pushed a commit that referenced this issue Jan 21, 2021
This change will BaseBranchOperator to do xcom push of the branch it choose to follow.
It will also add support to use the do_xcom_push parameter.

The added change returns the result received by running choose_branch().

Closes: #13704
kaxil pushed a commit that referenced this issue Jan 21, 2021
This change will BaseBranchOperator to do xcom push of the branch it choose to follow.
It will also add support to use the do_xcom_push parameter.

The added change returns the result received by running choose_branch().

Closes: #13704

(cherry picked from commit 3e25795)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants