Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add run_if param to TaskSettings #53

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions brickflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def get_bundles_project_env() -> str:
TaskSettings,
TaskResponse,
BrickflowTriggerRule,
TaskRunCondition,
BrickflowTaskEnvVars,
StorageBasedTaskLibrary,
JarTaskLibrary,
Expand Down Expand Up @@ -301,6 +302,7 @@ def get_bundles_project_env() -> str:
"TaskSettings",
"TaskResponse",
"BrickflowTriggerRule",
"TaskRunCondition",
"BrickflowTaskEnvVars",
"StorageBasedTaskLibrary",
"JarTaskLibrary",
Expand Down
12 changes: 12 additions & 0 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ class TaskType(Enum):
NOTEBOOK_TASK = "notebook_task"


class TaskRunCondition(Enum):
grigoriy835 marked this conversation as resolved.
Show resolved Hide resolved
ALL_SUCCESS = "ALL_SUCCESS"
AT_LEAST_ONE_SUCCESS = "AT_LEAST_ONE_SUCCESS"
NONE_FAILED = "NONE_FAILED"
ALL_DONE = "ALL_DONE"
AT_LEAST_ONE_FAILED = "AT_LEAST_ONE_FAILED"
ALL_FAILED = "ALL_FAILED"


@dataclass(frozen=True)
class TaskLibrary:
@staticmethod
Expand Down Expand Up @@ -240,6 +249,7 @@ class TaskSettings:
max_retries: Optional[int] = None
min_retry_interval_millis: Optional[int] = None
retry_on_timeout: Optional[bool] = None
run_if: Optional[TaskRunCondition] = None

def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings":
# overrides top level values
Expand All @@ -252,6 +262,7 @@ def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings":
other.max_retries or self.max_retries,
other.min_retry_interval_millis or self.min_retry_interval_millis,
other.retry_on_timeout or self.retry_on_timeout,
other.run_if or self.run_if,
)

def to_tf_dict(
Expand Down Expand Up @@ -280,6 +291,7 @@ def to_tf_dict(
"max_retries": self.max_retries,
"min_retry_interval_millis": self.min_retry_interval_millis,
"retry_on_timeout": self.retry_on_timeout,
**({"run_if": self.run_if.value} if self.run_if else {}),
}


Expand Down
24 changes: 24 additions & 0 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,30 @@ def all_success_task():
1. NONE_FAILED - use this if you want to trigger the task irrespective of the upstream tasks success or failure state
2. ALL_SUCCESS - use this if you want to trigger the task only if all the upstream tasks are all having success state


### Tasks conditional run

Adding condition for task running based on result of parent tasks

```python title="task_conditional_run"
from brickflow import Workflow, TaskRunCondition, TaskSettings
wf = Workflow(...)

@wf.task(
task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED)
)
def none_failed_task():
pass
```

This option is determining whether the task is run once its dependencies have been completed. Available options:
1. `ALL_SUCCESS`: All dependencies have executed and succeeded
2. `AT_LEAST_ONE_SUCCESS`: At least one dependency has succeeded
3. `NONE_FAILED`: None of the dependencies have failed and at least one was executed
4. `ALL_DONE`: All dependencies completed and at least one was executed
5. `AT_LEAST_ONE_FAILED`: At least one dependency failed
6. `ALL_FAILED`: ALl dependencies have failed

### Airflow Operators

We have adopted/extended certain airflow operators that might be needed to run as a task in databricks workflows.
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: some/path/to/root/test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,39 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: brickflows==0.1.0
repo: null
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: local
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: WORKSPACE
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: local
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: WORKSPACE
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
10 changes: 10 additions & 0 deletions tests/codegen/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
TaskResponse,
DLTPipeline,
NotebookTask,
TaskSettings,
TaskRunCondition,
)
from brickflow.engine.workflow import Workflow, WorkflowPermissions, User

Expand Down Expand Up @@ -95,6 +97,14 @@ def task_function_4():
return "hello world"


@wf.task(
depends_on="task_function_4",
task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED),
)
def task_function_5():
return "hello world"


@wf.task(
task_type=TaskType.CUSTOM_PYTHON_TASK,
trigger_rule=BrickflowTriggerRule.NONE_FAILED,
Expand Down