Skip to content
Permalink
Browse files

Merge pull request #1610 from PrefectHQ/environment_callbacks

Introducing: Environment Callbacks
  • Loading branch information...
joshmeek committed Oct 9, 2019
2 parents 2f5bc42 + a97cf0e commit 2f3c034ec0cf9a1e92fc94a55c14d727e2fe7d35
@@ -6,7 +6,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Features

- None
- Environments now allow for optional `on_start` and `on_exit` callbacks - [#1610](https://github.com/PrefectHQ/prefect/pull/1610)

### Enhancements

@@ -0,0 +1,90 @@
# Execution

Executing your flows using Prefect Cloud is accomplished through two powerful abstractions—storage and environments. By combining Prefect's concepts of storage and environments, flows are able to be saved, shared, and executed across various supported platforms.

## Storage

[Storage](https://docs.prefect.io/api/unreleased/environments/storage.html) objects are pieces of functionality which define how and where a flow should be stored. Prefect currently has support for storage options ranging from ephemeral in-memory storage to Docker images which can be stored in registries.

::: tip Cloud Acceptable Storage
Currently the only supported Storage class in Prefect Cloud is [Docker storage](https://docs.prefect.io/api/unreleased/environments/storage.html#docker). This is due to the fact that Prefect Cloud does not retrieve the storage object itself and only cares about metadata describing the location of the image.
:::

### How Storage is Used

To attach storage to your flows simply provide your storage object at initialization:

```python
from prefect.environments.storage import Docker
f = Flow("example-storage", storage=Docker(registry_url="prefecthq/storage-example"))
```

or assign it directly:

```python
from prefect.environments.storage import Docker
f = Flow("example-storage")
f.storage = Docker(registry_url="prefecthq/storage-example")
```

When you deploy your flow to Prefect Cloud the storage object attached to the flow will be built. At this step the flow is serialized to byte code and placed inside of the storage. For added convenience, `flow.deploy` will accept arbitrary keyword arguments which will then be passed to the initialization method of your configured default storage class (which is `Docker` by default). Consequently, the following code will actually create a `Docker` object for you at deploy-time and push that image to your specified registry:

```python
from prefect import Flow
f = Flow("example-easy-storage")
# all other init kwargs to `Docker` are accepted here
f.deploy("My First Project", registry_url="prefecthq/storage-example")
```

::: tip Pre-Build Storage
You are also able to optionally build your storage separate from the `deploy` command and specify that you do not want to build it again at deploy time:

```python
from prefect.environments.storage import Docker
f = Flow("example-storage")
f.storage = Docker(registry_url="prefecthq/storage-example")
# Pre-build storage
f.storage.build()
# Deploy but don't rebuild storage
f.deploy("My First Project", build=False)
```

:::

## Environments

While Storage objects provide a way to save and retrieve flows, [Environments](https://docs.prefect.io/api/unreleased/environments/execution.html) are a mechanism for specifying execution information about _how your Flow should be run_. For example, what executor should be used and are there any auxiliary infrastructure requirements for your Flow's execution? For example, if you want to run your flow on Kubernetes using an auto-scaling Dask cluster then you're going to want to use an environment for that!

### How Environments are Used

By default, Prefect will attach a `RemoteEnvironment` with your local default executor to every Flow you create. To specify a different environment, simply provide it to your Flow at initialization:

```python
from prefect.environments import RemoteEnvironment
f = Flow("example-env", environment=RemoteEnvironment(executor="prefect.engine.executors.LocalExecutor"))
```

or assign it directly:

```python
from prefect.environments import RemoteEnvironment
f = Flow("example-env")
f.environment = RemoteEnvironment(executor="prefect.engine.executors.LocalExecutor")
```

### Setup & Execute

The two main environment functions are `setup` and `execute`. The `setup` function is responsible for creating or prepping any infrastructure requirements before the flow is executed. This could take the form of functionality such as spinning up a Dask cluster or checking available platform resources. The `execute` function is responsible for actually telling the flow where and how it needs to run. This could take the form of functionality such as running the flow in process, as per the [`RemoteEnvironment`](https://docs.prefect.io/api/unreleased/environments/execution.html##remoteenvironment), or registering a new Fargate task, as per the [`FargateTaskEnvironment`](https://docs.prefect.io/api/unreleased/environments/execution.html#fargatetaskenvironment).

### Environment Callbacks

Each Prefect environment has two optional arugments `on_start` and `on_exit` that function as callbacks which act as extra customizable functionality outside of infrastructure or flow related processes. Users can provide an `on_start` function which will execute before the flow starts in the main process and an `on_exit` function which will execute after the flow finishes.
@@ -0,0 +1,44 @@
# PIN: Environment Callbacks

Date: October 1, 2019
Author: Josh Meek

# Status
Accepted

# Context
Prefect Environments currently support a `setup` and `execute` paradigm when it comes to creation of the environment and using it to execute a flow (outlined in [PIN-3](https://docs.prefect.io/core/PINs/PIN-03-Agent-Environment.html#process-details)). These two primary functions are key for infrastructure related processes which may arise during the execution of a flow. While they mainly deal with infrastructure creation there is another request which has arisen through use of the system in the form of pre/post-processing of the flow execution itself from the environment level.

# Proposal
Implement `on_start` and `on_exit` callback options to environments which users can use to provide _optional_ functions which execute before the flow is run and after the run has ended (or the job has exited). These functions will provide a mechanism for users to create custom hooks and notifications for their environment execution. The callbacks are meant more as a tool for execution and are not intended to replace aspects such as state handlers.

The new Environment class signature would look like:
```python
Environment(labels: Iterable[str] = None,
on_start: Callable = None,
on_exit: Callable = None)
```

### Callback Examples
`on_start`
- Broadcast info to a Dask cluster (e.g. `register_worker_callbacks`)
- Collect available resources for worker allocation

`on_exit`
- Send custom notification if a task became a zombie
- Collect metrics / logs from resources created by the environment

# Consequences

Users will be able to specify `on_start` and `on_exit` callbacks for their environments. One new requirement would be that users of these callbacks have to be aware of where these callbacks occur because it will be different for each environment. While they all do still run a flow in a main process somewhere they differ in where that main process happens.

An example of this is the `RemoteEnvironment`, deployed on Kubernetes, will run the main process on the initial Prefect job while the `KubernetesJobEnvironment` will run the main process on a separate custom job that spawns after the initial Prefect job.

Another consequence that still falls into the awareness category is that of knowing which callbacks are actually useful for the specified method of execution. One example to keep in mind is how a custom Dask `on_start` callback will not be entirely useful if a flow is not using a Dask executor.

# Actions
Implement #1574

Decide whether or not the failure of a callback will be allowed to fail the main process of the Environment. For example, if the `on_start` callback fails does the flow execution still occur or is the failure logged as a failure for that particular flow run?

Add example callbacks which users could use out of the box.
@@ -2,21 +2,18 @@
Environments are JSON-serializable objects that fully describe how to run a flow. Serialization
schemas are contained in `prefect.serialization.environment.py`.
Different Environment objects correspond to different computation environments -- currently
the only allowed environment is a `CloudEnvironment`. This is subject to change in the very
near future.
Environments that are written on top of a type of infrastructure also define how to
set up and execute that environment. e.g. the `CloudEnvironment` is an environment which
runs a flow on Kubernetes using a Dask cluster.
Different Environment objects correspond to different computation environments. Environments
that are written on top of a type of infrastructure also define how to set up and execute
that environment. e.g. the `DaskKubernetesEnvironment` is an environment which
runs a flow on Kubernetes using the `dask-kubernetes` library.
Some of the information that the environment requires to run a flow -- such as the flow
itself -- may not available when the Environment class is instantiated. Therefore, Environments
are accompanied with a Storage objects to specify how and where the flow is stored. For example,
the `CloudEnvironment` requires the flow to be stored in a `Docker` storage object.
the `DaskKubernetesEnvironment` requires the flow to be stored in a `Docker` storage object.
"""

from typing import Any, Iterable
from typing import Any, Callable, Iterable

import prefect
from prefect.environments.storage import Storage
@@ -37,10 +34,19 @@ class Environment:
Args:
- labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect
Agents when polling for work
- on_start (Callable, optional): a function callback which will be called before the flow begins to run
- on_exit (Callable, optional): a function callback which will be called after the flow finishes its run
"""

def __init__(self, labels: Iterable[str] = None) -> None:
def __init__(
self,
labels: Iterable[str] = None,
on_start: Callable = None,
on_exit: Callable = None,
) -> None:
self.labels = set(labels) if labels else set()
self.on_start = on_start
self.on_exit = on_exit
self.logger = logging.get_logger(type(self).__name__)

def __repr__(self) -> str:
@@ -2,7 +2,7 @@
import json
import uuid
from os import path
from typing import Any, List
from typing import Any, Callable, List

import cloudpickle
import yaml
@@ -54,6 +54,8 @@ class DaskKubernetesEnvironment(Environment):
`"docker-username"`, `"docker-password"`, and `"docker-email"`.
- labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect
Agents when polling for work
- on_start (Callable, optional): a function callback which will be called before the flow begins to run
- on_exit (Callable, optional): a function callback which will be called after the flow finishes its run
- scheduler_spec_file (str, optional): Path to a scheduler spec YAML file
- worker_spec_file (str, optional): Path to a worker spec YAML file
"""
@@ -65,6 +67,8 @@ def __init__(
private_registry: bool = False,
docker_secret: str = None,
labels: List[str] = None,
on_start: Callable = None,
on_exit: Callable = None,
scheduler_spec_file: str = None,
worker_spec_file: str = None,
) -> None:
@@ -82,7 +86,7 @@ def __init__(
# Load specs from file if path given, store on object
self._scheduler_spec, self._worker_spec = self._load_specs_from_file()

super().__init__(labels=labels)
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

def setup(self, storage: "Docker") -> None: # type: ignore
if self.private_registry:
@@ -227,6 +231,11 @@ def run_flow(self) -> None:
"""
Run the flow from specified flow_file_path location using a Dask executor
"""

# Call on_start callback if specified
if self.on_start:
self.on_start()

try:
from prefect.engine import get_default_flow_runner_class
from prefect.engine.executors import DaskExecutor
@@ -264,6 +273,10 @@ def run_flow(self) -> None:
"Unexpected error raised during flow run: {}".format(exc)
)
raise exc
finally:
# Call on_exit callback if specified
if self.on_exit:
self.on_exit()

################################
# Default YAML Spec Manipulation
@@ -1,5 +1,5 @@
import os
from typing import Any, List
from typing import Any, Callable, List

import cloudpickle

@@ -57,6 +57,8 @@ class FargateTaskEnvironment(Environment):
Defaults to the value set in the environment variable `REGION_NAME`.
- labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect
Agents when polling for work
- on_start (Callable, optional): a function callback which will be called before the flow begins to run
- on_exit (Callable, optional): a function callback which will be called after the flow finishes its run
- **kwargs (dict, optional): additional keyword arguments to pass to boto3 for
`register_task_definition` and `run_task`
"""
@@ -67,6 +69,8 @@ def __init__( # type: ignore
aws_secret_access_key: str = None,
region_name: str = None,
labels: List[str] = None,
on_start: Callable = None,
on_exit: Callable = None,
**kwargs
) -> None:
# Not serialized, only stored on the object
@@ -79,7 +83,7 @@ def __init__( # type: ignore
# Parse accepted kwargs for definition and run
self.task_definition_kwargs, self.task_run_kwargs = self._parse_kwargs(kwargs)

super().__init__(labels=labels)
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

def _parse_kwargs(self, user_kwargs: dict) -> tuple:
"""
@@ -251,6 +255,11 @@ def run_flow(self) -> None:
"""
Run the flow from specified flow_file_path location using the default executor
"""

# Call on_start callback if specified
if self.on_start:
self.on_start()

try:
from prefect.engine import (
get_default_flow_runner_class,
@@ -274,3 +283,7 @@ def run_flow(self) -> None:
"Unexpected error raised during flow run: {}".format(exc)
)
raise exc
finally:
# Call on_exit callback if specified
if self.on_exit:
self.on_exit()
@@ -1,6 +1,6 @@
import os
import uuid
from typing import Any, List
from typing import Any, Callable, List

import cloudpickle
import yaml
@@ -40,16 +40,24 @@ class KubernetesJobEnvironment(Environment):
- job_spec_file (str, optional): Path to a job spec YAML file
- labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect
Agents when polling for work
- on_start (Callable, optional): a function callback which will be called before the flow begins to run
- on_exit (Callable, optional): a function callback which will be called after the flow finishes its run
"""

def __init__(self, job_spec_file: str = None, labels: List[str] = None) -> None:
def __init__(
self,
job_spec_file: str = None,
labels: List[str] = None,
on_start: Callable = None,
on_exit: Callable = None,
) -> None:
self.identifier_label = str(uuid.uuid4())
self.job_spec_file = os.path.abspath(job_spec_file) if job_spec_file else None

# Load specs from file if path given, store on object
self._job_spec = self._load_spec_from_file()

super().__init__(labels=labels)
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

def execute( # type: ignore
self, storage: "Docker", flow_location: str, **kwargs: Any
@@ -110,6 +118,11 @@ def run_flow(self) -> None:
"""
Run the flow from specified flow_file_path location using the default executor
"""

# Call on_start callback if specified
if self.on_start:
self.on_start()

try:
from prefect.engine import (
get_default_flow_runner_class,
@@ -133,6 +146,10 @@ def run_flow(self) -> None:
"Unexpected error raised during flow run: {}".format(exc)
)
raise exc
finally:
# Call on_exit callback if specified
if self.on_exit:
self.on_exit()

###############################
# Custom YAML Spec Manipulation

0 comments on commit 2f3c034

Please sign in to comment.
You can’t perform that action at this time.