In [1]:
from prefect import Flow; 
f = Flow('empty'); 
f.run()

[2019-06-16 07:24:54,008] INFO - prefect.FlowRunner | Beginning Flow run for 'empty'
[2019-06-16 07:24:54,016] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:24:54,020] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">

In [6]:
from prefect import task, Parameter, Flow


@task
def return_param(p):
    print('Got %3d' % p)
    return p


with Flow("parameter-example") as flow:
    p = Parameter("p", default=42)
    result = return_param(p)


flow.run() # uses the value 42
flow.run(p=99) # uses the value 99

[2019-06-16 07:34:12,591] INFO - prefect.FlowRunner | Beginning Flow run for 'parameter-example'
[2019-06-16 07:34:12,595] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:34:12,603] INFO - prefect.TaskRunner | Task 'p': Starting task run...
[2019-06-16 07:34:12,607] INFO - prefect.TaskRunner | Task 'p': finished task run for task with final state: 'Success'
[2019-06-16 07:34:12,607] INFO - prefect.TaskRunner | Task 'return_param': Starting task run...
[2019-06-16 07:34:12,611] INFO - prefect.TaskRunner | Task 'return_param': finished task run for task with final state: 'Success'
[2019-06-16 07:34:12,615] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2019-06-16 07:34:12,615] INFO - prefect.FlowRunner | Beginning Flow run for 'parameter-example'
[2019-06-16 07:34:12,619] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:34:12,627] INFO - prefect.TaskRunner | Task 'p': Starting task run...
[2019-06-16 07:34:12,631] INFO - prefect.Ta

Got  42
Got  99


<Success: "All reference tasks succeeded.">

In [7]:
from prefect import task, Flow


@task
def create_list():    
    return [1, 1, 2, 3]    
                    
@task                
def add_one(x):     
    return x + 1    

@task
def get_sum(x):
    return sum(x)
                    
with Flow("simple-map") as f:    
    plus_one = add_one.map(create_list)
    plus_two = add_one.map(plus_one)
    result = get_sum(plus_two)
                    
f.run()        

[2019-06-16 07:34:29,200] INFO - prefect.FlowRunner | Beginning Flow run for 'simple-map'
[2019-06-16 07:34:29,204] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:34:29,216] INFO - prefect.TaskRunner | Task 'create_list': Starting task run...
[2019-06-16 07:34:29,220] INFO - prefect.TaskRunner | Task 'create_list': finished task run for task with final state: 'Success'
[2019-06-16 07:34:29,220] INFO - prefect.TaskRunner | Task 'add_one': Starting task run...
[2019-06-16 07:34:29,224] INFO - prefect.TaskRunner | Task 'add_one[2]': Starting task run...
[2019-06-16 07:34:29,228] INFO - prefect.TaskRunner | Task 'add_one[2]': finished task run for task with final state: 'Success'
[2019-06-16 07:34:29,232] INFO - prefect.TaskRunner | Task 'add_one[0]': Starting task run...
[2019-06-16 07:34:29,232] INFO - prefect.TaskRunner | Task 'add_one[0]': finished task run for task with final state: 'Success'
[2019-06-16 07:34:29,236] INFO - prefect.TaskRunner | Task 'add_one[3]': Start

<Success: "All reference tasks succeeded.">

In [14]:
from prefect import task

@task
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

say_hello

<Task: say_hello>

In [15]:
@task
def add(x, y=1):
    return x + y

add

<Task: add>

In [17]:
from prefect import Task

class AddTask(Task):

    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int=None) -> int:
        if y is None:
            y = self.default
        return x + y

# initialize the task instance
add = AddTask(default=1)

add

<Task: AddTask>

In [18]:
from prefect import Flow

with Flow("My first flow!") as flow:
    first_result = add(1, y=2)
    second_result = add(x=first_result, y=100)

In [19]:
state = flow.run()

assert state.is_successful()

first_task_state = state.result[first_result]
assert first_task_state.is_successful()
assert first_task_state.result == 3

second_task_state = state.result[second_result]
assert second_task_state.is_successful()
assert second_task_state.result == 103

[2019-06-16 07:45:38,620] INFO - prefect.FlowRunner | Beginning Flow run for 'My first flow!'
[2019-06-16 07:45:38,624] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:45:38,636] INFO - prefect.TaskRunner | Task '2': Starting task run...
[2019-06-16 07:45:38,640] INFO - prefect.TaskRunner | Task '2': finished task run for task with final state: 'Success'
[2019-06-16 07:45:38,644] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-06-16 07:45:38,648] INFO - prefect.TaskRunner | Task '1': finished task run for task with final state: 'Success'
[2019-06-16 07:45:38,648] INFO - prefect.TaskRunner | Task 'AddTask': Starting task run...
[2019-06-16 07:45:38,652] INFO - prefect.TaskRunner | Task 'AddTask': finished task run for task with final state: 'Success'
[2019-06-16 07:45:38,656] INFO - prefect.TaskRunner | Task '100': Starting task run...
[2019-06-16 07:45:38,660] INFO - prefect.TaskRunner | Task '100': finished task run for task with final state: 'Success'
[

In [20]:
from prefect import Parameter

with Flow("Say hi!") as flow:
    name = Parameter("name")
    say_hello(name)

In [21]:
flow.run(name="Marvin")

[2019-06-16 07:48:52,855] INFO - prefect.FlowRunner | Beginning Flow run for 'Say hi!'
[2019-06-16 07:48:52,859] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:48:52,871] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2019-06-16 07:48:52,875] INFO - prefect.TaskRunner | Task 'name': finished task run for task with final state: 'Success'
[2019-06-16 07:48:52,875] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2019-06-16 07:48:52,879] INFO - prefect.TaskRunner | Task 'say_hello': finished task run for task with final state: 'Success'
[2019-06-16 07:48:52,883] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


Hello, Marvin!


<Success: "All reference tasks succeeded.">

In [22]:
flow = Flow("My imperative flow!")

# define some new tasks
name = Parameter("name")
second_add = add.copy()

# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)

# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)

# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
say_hello.bind(person=name, flow=flow)

<Task: say_hello>

In [23]:
flow.run(name="Marvin")

[2019-06-16 07:50:57,371] INFO - prefect.FlowRunner | Beginning Flow run for 'My imperative flow!'
[2019-06-16 07:50:57,371] INFO - prefect.FlowRunner | Starting flow run.
[2019-06-16 07:50:57,387] INFO - prefect.TaskRunner | Task '2': Starting task run...
[2019-06-16 07:50:57,391] INFO - prefect.TaskRunner | Task '2': finished task run for task with final state: 'Success'
[2019-06-16 07:50:57,395] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-06-16 07:50:57,399] INFO - prefect.TaskRunner | Task '1': finished task run for task with final state: 'Success'
[2019-06-16 07:50:57,399] INFO - prefect.TaskRunner | Task 'AddTask': Starting task run...
[2019-06-16 07:50:57,407] INFO - prefect.TaskRunner | Task 'AddTask': finished task run for task with final state: 'Success'
[2019-06-16 07:50:57,407] INFO - prefect.TaskRunner | Task '100': Starting task run...
[2019-06-16 07:50:57,411] INFO - prefect.TaskRunner | Task '100': finished task run for task with final state: 'Succe

Hello, Marvin!


<Success: "All reference tasks succeeded.">