Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExternalTaskSensor: An option to use the data_interval variables rather than execution_date #26429

Open
2 tasks done
HeshamMeneisi opened this issue Sep 16, 2022 · 5 comments
Open
2 tasks done
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:feature Feature Requests

Comments

@HeshamMeneisi
Copy link

Description

Since execution_date is deprecated, it makes sense that this sensor or a new version of it (for backward compatibility) uses the data_interval variables.

Use case/motivation

I want to make sure that the data interval this DAG is about to operate on is covered by another DAG. For example, data was loaded up to this DAG's data_interval_end.

I believe the sensor itself is simple to write. It just uses data_interval_end rather than execution_date and it doesn't look for one specific task, any external.data_interval_end > self.data_interval_end is acceptable.

This is the equivalent of ExternalTaskSensor with execution_delta set to self.interval_start - closest_external_interval.interval_start which can be calculated manually but will fail later if the external DAG schedule is changed.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@HeshamMeneisi HeshamMeneisi added the kind:feature Feature Requests label Sep 16, 2022
@uranusjr
Copy link
Member

Makes sense. I believe there’s some discussion somewhere, but this write up is more complete. I think we should allow using either data_interval_start or data_interval_end; technically data_interval_start can be different from execution_date.

To simplify implementation, perhaps it’s a good idea to split the logic into three sensors, one for each value? The three sensors would inherit from the same class that implements most of the logic, but inherited to applied on different values.

@eladkal eladkal added the area:core-operators Operators, Sensors and hooks within Core Airflow label Sep 19, 2022
@qcha41
Copy link

qcha41 commented Oct 16, 2022

Totally interested too! Any development on this already ?

@potiuk
Copy link
Member

potiuk commented Oct 24, 2022

I assigned you too - so if you want ot help @qcha41 -> talk directly to @HeshamMeneisi

@HeshamMeneisi
Copy link
Author

HeshamMeneisi commented Jan 24, 2023

I haven't got to do this specifically yet, but in light of the new Dataset logic, this seems like a more intuitive solution for the use-case above, what do you think? I can open a PR.

class DatasetUpToDateSensor(BaseSensorOperator):
    """
    This sensor waits for the current data interval to be covered for a specific Dataset

    :param dataset: The target Dataset
    :param covered_to: The timestamp the dataset is desired to be covered to (updated at/after), defaults to data_interval_end (templated)
    """

    template_fields = ["covered_to"]

    def __init__(
        self, dataset: Dataset, covered_to: Optional[datetime.datetime] = None, **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.dataset = dataset
        self.covered_to = covered_to

    @provide_session
    def poke(self, context: Context, session=None) -> bool:
        dataset: DatasetModel = (
            session.query(DatasetModel)
            .filter(DatasetModel.uri == self.dataset.uri)
            .one_or_none()
        )

        if not dataset:
            raise DatasetNotFoundException(self.dataset)

        desired_covered_to = self.covered_to or context["data_interval_end"]

        return dataset.updated_at >= desired_covered_to

In this case, covered_to can also be set to something like {{ logical_date + macros.timedelta(hours=1) }}, it's quite flexible.

@potiuk
Copy link
Member

potiuk commented Feb 19, 2023

Opening a PR is a good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

5 participants