# A Simple Workflow with Prefect

## Install required Python packages

In [None]:
!pip install prefect numpy

## Import packages

In [15]:
import random
import numpy as np
from time import sleep

from prefect import task, flow

## Define a set of functions as tasks

Adding the @task decorator is all it takes.

In [16]:
@task
def get_number(min=1, max=100) -> int:
    """Return a random number n, wit min<=n<max"""
    sleep(1) # simulate longer processing
    n = random.randint(min,max)
    print (f"random number: {n}")
    return n

In [17]:
@task
def add(n1:int, n2: int) -> int:
    """Return sum of two numbers"""
    sleep(1.5) # simulate longer processing
    s = n1 + n2
    print (f"sum: {s}")
    return s

In [18]:
@task
def multiply(n1: int, n2: int) -> int:
    """Return product of two numbers"""
    sleep(1) # simulate longer processing
    p = n1 * n2
    print (f"product: {p}")
    return p

In [19]:
@task
def mean(*args: int) -> float:
    """Return the mean of provided numbers"""
    sleep(3) # simulate longer processing
    m = np.mean(args)
    print (f"mean: {m}")
    return m

## Define the data pipeline as a flow

Adding the @flow decorator does the trick.

In [22]:
@flow
def analysis():
    """Return results of some arbitrary calculations"""
    n1 = get_number.submit()
    n2 = get_number.submit()
    temp_sum = add.submit(n1, n2)
    n3 = get_number.submit()
    r = multiply.submit(n3, temp_sum)
    m = mean.submit(n1,n2,n3)
    return r, m

# Run the data pipeline 

Local execution is triggered by invoking the function with the @flow decorator.

In the output you will see the logs of the Prefect execution engine, including a URL to the Prefect Server that observes flow and provides visualization of all task dependencies, execution states and logs.

In [23]:
r, m = analysis()
print (f"arbitrary calc: r={r}, mean={m}")

random number: 76


random number: 56random number: 15



sum: 91


product: 5096


mean: 49.0


arbitrary calc: r=Completed(), mean=Completed()
