Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] TaskDependencySensor improvements #101

Open
4 tasks done
maxim-mityutko opened this issue Mar 11, 2024 · 0 comments
Open
4 tasks done

[FEATURE] TaskDependencySensor improvements #101

maxim-mityutko opened this issue Mar 11, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@maxim-mityutko
Copy link
Contributor

maxim-mityutko commented Mar 11, 2024

Is your feature request related to a problem? Please describe.

  1. Consistent approach is required across all sensors to determine the time that will be used as a base for calculating whether or not upstream criteria are fulfilled. This approach should make sensor executions reproducable if brickflow_start_time / brickflow_start_date custom parameters are set manually or job is scheduled and the execution date will be derived from CRON expression. Currently TaskDependencySensor is using datetime.now() approach:

    execution_window_tz = (datetime.now() + execution_delta).strftime(
    "%Y-%m-%dT%H:%M:%SZ"
    )

  2. From the sensor usage perspective, the user does not care if the upstream DAG failed or not, what matters is the ability to trigger downstream tasks when upstream dependency is fulfilled, even if it was delayed due to the need to restart the failed DAG. That means that failing the task and the workflow is not a desired scenario for the operator usage.

    if status == "failed":
    log.error(
    f"Upstream dag {external_dag_id} failed at {external_task_id} task "
    )
    raise Exception("Upstream Dag Failed")

  3. Along the same lines as (2), failing the sensor if the upstream execution is not found is not a desirable flow:

    if response.status_code == 401:
    raise Exception(
    f"No Runs found for {external_dag_id} dag after {execution_window_tz}, Please check upstream dag"
    )
    The argument that the execution is always created by the Airflow, even if it not yet started, is not valid, because if the DAG paramers depends_on_past is used, new executions won't be created unless older ones are succesfull.

Cloud Information

  • AWS
  • Azure
  • GCP
  • Other

Describe the solution you'd like

  1. Use context["execution_date"] which is available for Airflow operators or use brickflow_start_time from the Brickflow context.
  2. Continue poking upstream.
  3. Log that upstream execution is not found and continue poking.

Describe alternatives you've considered

Additional context

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant