In [1]:
from prefect import flow, task,get_run_logger

In [4]:


@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)


hello_world()
hello_world("Mundo")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello world!


Hello Mundo!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [5]:
@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    logger = get_run_logger()
    logger.info("Iniciando el flujo con la palabra clave log_prints=True")
    print_hello(name)
    logger.info("Terminando el flujo")
    

hello_world()
hello_world("Mundo")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello world!


Hello Mundo!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [6]:


@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow",log_prints=True)
def hello_world(name="world"):
    print("Iniciando el flujo con la palabra clave log_prints=True")
    print_hello(name)
    print("Terminando el flujo")


#if __name__ == '__main__':
hello_world()
hello_world("Mundo")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]


# Logging vs Artifacts

The example above is for educational purposes. In general, it is better to use Prefect artifacts for storing metrics and output. Logs are best for tracking progress and debugging errors.

In [7]:
from prefect import flow, task
from prefect.artifacts import create_link_artifact

@task
def my_first_task():
        create_link_artifact(
            key="irregular-data",
            link="https://nyc3.digitaloceanspaces.com/my-bucket-name/highly_variable_data.csv",
            description="## Highly variable data",
        )

@task
def my_second_task():
        create_link_artifact(
            key="irregular-data",
            link="https://nyc3.digitaloceanspaces.com/my-bucket-name/low_pred_data.csv",
            description="# Low prediction accuracy",
        )

@flow
def my_flow():
    my_first_task()
    my_second_task()
#if __name__ == "__main__":
my_flow()

[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [8]:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact

@task
def markdown_task():
    na_revenue = 500000
    markdown_report = f"""# Sales Report

## Summary

In the past quarter, our company saw a significant increase in sales, with a total revenue of $1,000,000. 
This represents a 20% increase over the same period last year.

## Sales by Region

| Region        | Revenue |
|:--------------|-------:|
| North America | ${na_revenue:,} |
| Europe        | $250,000 |
| Asia          | $150,000 |
| South America | $75,000 |
| Africa        | $25,000 |

## Top Products

1. Product A - $300,000 in revenue
2. Product B - $200,000 in revenue
3. Product C - $150,000 in revenue

## Conclusion

Overall, these results are very encouraging and demonstrate the success of our sales team in increasing revenue 
across all regions. However, we still have room for improvement and should focus on further increasing sales in 
the coming quarter.
"""
    create_markdown_artifact(
        key="gtm-report",
        markdown=markdown_report,
        description="Quarterly Sales Report",
    )

@flow()
def my_flow():
    markdown_task()


#if __name__ == "__main__":
my_flow()


 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

# Retries

So far our script works, but in the future unexpected errors may occur. For example the GitHub API may be temporarily unavailable or rate limited. Retries help make our flow more resilient. Let's add retry functionality to our example above:

#https://docs.prefect.io/latest/tutorial/flows/

In [10]:
import numpy as np

In [16]:
np.random.seed(42)

for i in range(10):
    print( np.random.randint(0, 100, 10).mean() ) 

62.4
43.8
46.4
60.0
41.6
41.3
54.2
33.8
56.6
65.3


In [18]:
np.random.seed(42)

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_mean(repo_name: str = "PrefectHQ/prefect"):
    print("Iniciando un flujo de trabajo")
    num = np.random.randint(0, 100, 10).mean()
    if num > 50:
        raise ValueError("El número es mayor que 50")
    
    print(f"El número es {num}")
    
    

get_repo_mean()



 `@flow(name='my_unique_name', ...)`


# What is a task?

A task is any Python function decorated with a @task decorator called within a flow. You can think of a flow as a recipe for connecting a known sequence of tasks together. Tasks, and the dependencies between them, are displayed in the flow run graph, enabling you to break down a complex flow into something you can observe, understand and control at a more granular level.
When a function becomes a task, it can be executed concurrently and its return value can be cached.

https://docs.prefect.io/latest/tutorial/tasks/

In [29]:
np.random.seed(42)

@task()
def validate_mean(num: float):
    if num > 10:
        print("valor mayor que 10")
    num = np.random.randint(0, 100, 10).mean()
    return num

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_mean(repo_name: str = "PrefectHQ/prefect"):
    print("Iniciando un flujo de trabajo")
    num = validate_mean(5)
    num = validate_mean(10)
    print(f"El número es {num}")

get_repo_mean()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', artifact_type='result', artifact_description='Result of type `float64` persisted to: `C:\\Users\\Mich2020\\.prefect\\storage\\a46e9ded00ea475f94f84906cb79b577`', serializer_type='pickle', storage_block_id=UUID('ac4a20fe-820c-443b-9f2d-1f46dbaea089'), storage_key='a46e9ded00ea475f94f84906cb79b577')),
 Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', artifact_type='result', artifact_description='Result of type `float64` persisted to: `C:\\Users\\Mich2020\\.prefect\\storage\\021987e193e6491fb79ecc7510887c56`', serializer_type='pickle', storage_block_id=UUID('ac4a20fe-820c-443b-9f2d-1f46dbaea089'), storage_key='021987e193e6491fb79ecc7510887c56'))]

In [26]:
np.random.seed(42)

@task()
def validate_mean(num: float):
    if num > 50:
        raise ValueError("El número es mayor que 50")
    return num

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_mean(repo_name: str = "PrefectHQ/prefect"):
    print("Iniciando un flujo de trabajo")
    num = np.random.randint(0, 100, 10).mean()
    
    try:
        validate_mean(num)
    except ValueError as exc:
        print("Oh no! The task failed.")
        #raise ValueError(f"El número es mayor que 50: {exc}")
    
    print(f"El número es {num}")
    
    

get_repo_mean()



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', artifact_type='result', artifact_description='Result of type `float64` persisted to: `C:\\Users\\Mich2020\\.prefect\\storage\\6590e19ceb734c3c8b631cc5f17cea6e`', serializer_type='pickle', storage_block_id=UUID('b5f07b34-ce77-4add-9705-7e4ee7889680'), storage_key='6590e19ceb734c3c8b631cc5f17cea6e'))]

# States

States are rich objects that contain information about the status of a particular task run or flow run. While you don't need to know the details of the states to use Prefect, you can give your workflows superpowers by taking advantage of it.

https://docs.prefect.io/latest/concepts/states/


Return Data

By default, running a task will return data:

In [32]:
@task 
def add_one(x):
    return x + 1

@flow(log_prints=True) 
def my_flow():
    print("Iniciando  my_flow")
    
    result = add_one(1) # return int
    print(f"Resultado de add_one: {result}")
    print("fin de my_flow")
    
my_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))]

The same rule applies for a subflow:

In [33]:
@flow 
def subflow():
    return 42 

@flow 
def my_flow():
    result = subflow() # return data
    
my_flow()


 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))]

Return Prefect State

To return a State instead, add return_state=True as a parameter of your task call.

In [52]:
@task 
def add_one(x):
    print(f"Suma de x + 1: {x + 1}")
    return x + 1

@flow(log_prints=True) 
def my_flow():
    print("Iniciando  my_flow")
    
    state = add_one(1,return_state=True) 
    result = state.result() # return int
    
    print(F"El estado es {state}")
    print("-"*50)
    print(f"Resultado de add_one: {result}")
    print("fin de my_flow")
    
my_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', artifact_type='result', artifact_description='Result of type `int` persisted to: `C:\\Users\\Mich2020\\.prefect\\storage\\5059a420912b4c708d4f1776355e1b6d`', serializer_type='pickle', storage_block_id=UUID('f3033bb1-cd5d-4e96-9a9f-6ce2c44180fb'), storage_key='5059a420912b4c708d4f1776355e1b6d'))]

Return a PrefectFuture

To get a PrefectFuture, add .submit() to your task call.

In [51]:
@task 
def add_one(x):
    print(f"Suma de x + 1: {x + 1}")
    return x + 1

@flow(log_prints=True) 
def my_flow():
    print("Iniciando  my_flow")
    
    future = add_one.submit(1) # return PrefectFuture
    result = future.result() # return int
    
    print(F"El estado es {future}")
    print("-"*50)
    print(f"Resultado de add_one: {result}")
    print("fin de my_flow")
    
my_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))]