# Prefect

## Installation

In [None]:
!pip install prefect
!pip install 'prefect[viz]

## Elements:

### Task(Functions):
Define Functions with Decorator
<br>
* tasks could have single or multiple function

<br>

توی پرفکت به فانکشن های پایتون Task می گیم ( بدون در نظر گرفتن اینکه اون فانکشن چقد کار انجام میده و ... ) 
این تسک هارو میشه کاملا برنامه زمانی ( Schedule  ) داد که بر خلاف WorkFlow قابلیت تکرار شدن و چندبار اجرا شدن و حتی هم زمان اجرا شدن این تسک ها وجود داره.
مثال:
بعضی وقتا برای اینکه به بیشتر از یه فانکشن نیاز داریم می تونیم کلاس تعریف کنیم و خود لایبرری تسک رو که از پرفکت Import کردیم رو زیر کلاس می کنیم و توی اون می تونیم دو تا تابع __init__ و run تعریف کنیم. مثال:



In [1]:

from prefect import task

@task
def say_hello(name):
    print(f"Hello, {name}!")
    
    logger = prefect.context.get("logger")
    logger.info("An info message.")
    logger.warning("A warning message.")


In [2]:
from prefect import Task

class AddTask(Task):

    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int=None) -> int:
        if y is None:
            y = self.default
        return x + y

# initialize the task instance
add = AddTask(default=1)



### Flows and States
Creating Prefect Flow:

* Write a Task Which is basically where you define core logic
* Add a decorator over the task which defines the rules.
* write a Flow which is grouping and managing the tasks.

In [3]:
from prefect import Flow

with Flow("My first flow!") as flow:
    first_result = add(1, y=2)
    second_result = add(x=first_result, y=100)


to use graphical view

In [6]:
flow.visualize()
# flow.register(project_name="prefect_example")

AttributeError: module 'graphviz.backend' has no attribute 'ENCODING'

In [17]:
state = flow.run()

assert state.is_successful()

first_task_state = state.result[first_result]
assert first_task_state.is_successful()
assert first_task_state.result == 3 

second_task_state = state.result[second_result]
assert second_task_state.is_successful()
assert second_task_state.result == 103



[2022-02-20 16:49:27+0330] INFO - prefect.FlowRunner | Beginning Flow run for 'My first flow!'
[2022-02-20 16:49:27+0330] INFO - prefect.TaskRunner | Task 'AddTask': Starting task run...
[2022-02-20 16:49:27+0330] INFO - prefect.TaskRunner | Task 'AddTask': Finished task run for task with final state: 'Success'
[2022-02-20 16:49:27+0330] INFO - prefect.TaskRunner | Task 'AddTask': Starting task run...
[2022-02-20 16:49:27+0330] INFO - prefect.TaskRunner | Task 'AddTask': Finished task run for task with final state: 'Success'
[2022-02-20 16:49:27+0330] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


### Parameters

امکان دیگه ای که پرفکت بهمون میده اینه که میشه توی فلو از یه چیزی به نام  parameter استفاده کرد. این پارامتر در واقع تابع هایی ان که توی خود فلو می تونیم بنویسیم و به مقاصد مختلف ازشون استفاده کنیم.

نوشتن و روش استفاده ازشون:


In [29]:
from prefect import Parameter

with Flow("Say hi!") as flow:
    name = Parameter("name")
    x = say_hello(name)

state = flow.run(name="Marvin")
task_state = state.result[x]
assert task_state.result == 'Hello, Marvin!'
assert task_state.result == 'hello Marvin!' 


# ... [logs]
# "Hello, Marvin!"
# ... [logs]


[2022-02-20 17:19:14+0330] INFO - prefect.FlowRunner | Beginning Flow run for 'Say hi!'
[2022-02-20 17:19:14+0330] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2022-02-20 17:19:14+0330] INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
[2022-02-20 17:19:14+0330] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Hello, Marvin!
[2022-02-20 17:19:14+0330] ERROR - prefect.TaskRunner | Task 'say_hello': Exception encountered during task execution!
Traceback (most recent call last):
  File "C:\Users\Romina\.conda\envs\myclone\lib\site-packages\prefect\engine\task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "C:\Users\Romina\.conda\envs\myclone\lib\site-packages\prefect\utilities\executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "C:\Users\Romina\AppData\Local\Temp/ipykernel_24224/256774

AssertionError: 

In [20]:
flow = Flow("My imperative flow!")

# define some new tasks
name = Parameter("name")
second_add = add.copy()

# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)

# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)

# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
# say_hello.bind(person=name, flow=flow)


<Task: AddTask>