In [2]:
# !pip install prefect

## Develop

### Flows

- Là đối tượng Prefect cơ bản nhất
- Các flows là Prefect abstraction mà có thể tương tác, hiển thị, và chạy mà không cùng các thành phần nào khác của Prefect engine.
- Một flow bao hàm một workflow logic, cho phép người dùng tương tác và biết trạng thái các workflows 

In [6]:
from prefect import flow, task

@task
def print_hello(name):
    print(f"Hello: {name}")

@flow(name="hello flow", description="Hello bow")
def hello_world(name="world"):
    """hello bow"""
    print_hello(name)



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [7]:
# print_hello("Bow")
hello_world("bow")

Hello: bow


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

#### Naming flows 

In [9]:
import datetime
from prefect import flow

def generate_flow_run_name():
    date = datetime.datetime.utcnow()

    return f"{date:%A}-is-a-nice-day"

@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str):
    pass

# creates a flow run called 'Thursday-is-a-nice-day'
my_flow(name="marvin")



 `@flow(name='my_unique_name', ...)`


If you need access to information about the flow, use the prefect.runtime module

In [10]:
from prefect import flow
from prefect.runtime import flow_run

def generate_flow_run_name():
    flow_name = flow_run.flow_name

    parameters = flow_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-with-{name}-and-{limit}"

@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str, limit: int = 100):
    pass

# creates a flow run called 'my-flow-with-marvin-and-100'
my_flow(name="marvin")



 `@flow(name='my_unique_name', ...)`


#### Tách logic flow thành các tasks

In [11]:
from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Hello Flow")
def hello_world(name="world"):
    message = print_hello(name)

hello_world("Marvin")


Hello Marvin!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))]

#### Subflow

In [12]:
from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Subflow")
def my_subflow(msg):
    print(f"Subflow says: {msg}")

@flow(name="Hello Flow")
def hello_world(name="world"):
    message = print_hello(name)
    my_subflow(message)

hello_world("Marvin")



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello Marvin!


Subflow says: Hello Marvin!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

#### Parameters

In [14]:
from prefect import flow
from pydantic import BaseModel

class Model(BaseModel):
    a: int
    b: float
    c: str

@flow
def model_validator(model: Model):
    print(model)


Ví dụ: để tự động chuyển đổi thứ gì đó thành ngày giờ:"

In [15]:
from prefect import flow
from datetime import datetime

@flow
def what_day_is_it(date: datetime = None):
    if date is None:
        date = datetime.utcnow()
    print(f"It was {date.strftime('%A')} on {date.isoformat()}")

what_day_is_it("2021-01-01T02:00:19.180906")
# It was Friday on 2021-01-01T02:00:19.180906


It was Friday on 2021-01-01T02:00:19.180906


#### Final state determination

- If an exception is raised directly in the flow function, the flow run is marked as failed.
- If the flow does not return a value (or returns None), its state is determined by the states of all of the tasks and subflows within it.
- If any task run or subflow run failed, then the final flow run state is marked as FAILED.
- If any task run was cancelled, then the final flow run state is marked as CANCELLED.
- If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
- If the flow run returns any other object, then it is marked as completed.


#### Raise an exception

In [3]:
from prefect import flow

@flow
def always_fails_flow():
    raise ValueError("This flow immediately fails")

always_fails_flow()



 `@flow(name='my_unique_name', ...)`


ValueError: This flow immediately fails

#### Return None

In [4]:
from prefect import flow, task

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def always_fails_flow():
    always_fails_task.submit().result(raise_on_failure=False)
    always_succeeds_task()

always_fails_flow()



 `@flow(name='my_unique_name', ...)`


I'm fail safe!


ValueError: I fail successfully

#### Return a future

In [5]:
from prefect import flow, task

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def always_succeeds_flow():
    x = always_fails_task.submit().result(raise_on_failure=False)
    y = always_succeeds_task.submit(wait_for=[x])
    return y

always_succeeds_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`


I'm fail safe!


Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))

In [6]:
from prefect import task, flow

@task
def always_fails_task():
    raise ValueError("I am bad task")

@task
def always_succeeds_task():
    return "foo"

@flow
def always_succeeds_flow():
    return "bar"

@flow
def always_fails_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    z = always_succeeds_flow()
    return x, y, z

always_fails_flow()



 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


ValueError: I am bad task

In [7]:
from prefect import task, flow
from prefect.server.schemas.states import Completed, Failed

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def always_succeeds_flow():
    x = always_fails_task.submit()
    y = always_succeeds_task.submit()
    if y.result() == "success":
        return Completed(message="I am happy with this result")
    else:
        return Failed(message="How did this happen!?")

always_succeeds_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


I'm fail safe!


#### Return an object

In [8]:
from prefect import task, flow

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@flow
def always_succeeds_flow():
    always_fails_task().submit()
    return "foo"

always_succeeds_flow()



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


ValueError: I fail successfully

#### Pause a flow run

In [12]:
from prefect import task, flow, pause_flow_run, resume_flow_run

@task
async def marvin_setup():
    return "a raft of ducks walk into a bar..."
@task
async def marvin_punchline():
    return "it's a wonder none of them ducked!"
@flow
async def inspiring_joke():
    await marvin_setup()
    await pause_flow_run(timeout=600)  # pauses for 10 minutes
    await marvin_punchline()

await inspiring_joke()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))]

### Tasks

Prefect không cho phép kích hoạt task chạy từ các task khác. 

Nếu bạn muốn gọi trực tiếp chức năng của task, bạn có thể sử dụng task.fn()

In [13]:
from prefect import flow, task

@task
def my_first_task(msg):
    print(f"Hello, {msg}")

@task
def my_second_task(msg):
    my_first_task.fn(msg)

@flow
def my_flow():
    my_second_task("Trillian")
    
my_flow()


Hello, Trillian


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [14]:
@task(name="hello-task", 
    description="This task says hello.")
def my_task():
    print("Hello, I'm a task")


In [16]:
import datetime
from prefect import flow, task

@task(name="My Example Task", 
    description="An example task for a tutorial.",
    task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
    pass

@flow
def my_flow():
    # creates a run with a name like "hello-marvin-on-Thursday"
    my_task(name="marvin", date=datetime.datetime.utcnow())

my_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [18]:
import datetime
from prefect import flow, task

def generate_task_name():
    date = datetime.datetime.utcnow()
    return f"{date:%A}-is-a-lovely-day"

@task(name="My Example Task",
    description="An example task for a tutorial.",
    task_run_name=generate_task_name)
def my_task(name):
    pass

@flow
def my_flow():
    # creates a run with a name like "Thursday-is-a-lovely-day"
    my_task(name="marvin")
    
my_flow()



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [21]:
# Nếu bạn cần truy cập thông tin về nhiệm vụ, hãy sử dụng prefect.runtimemô-đun. Ví dụ:
from prefect import flow
from prefect.runtime import flow_run, task_run

def generate_task_name():
    flow_name = flow_run.flow_name
    task_name = task_run.task_name

    parameters = task_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-{task_name}-with-{name}-and-{limit}"

@task(name="my-example-task",
      description="An example task for a tutorial.",
      task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
    pass

@flow
def my_flow(name: str):
    # creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
    my_task(name)

my_flow("thai")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [22]:
@task(name="hello-task", tags=["test"])
def my_task():
    print("Hello, I'm a task")



 `@task(name='my_unique_name', ...)`


In [23]:
from prefect import flow, task
from prefect import tags

@task
def my_task():
    print("Hello, I'm a task")

@flow
def my_flow():
    with tags("test"):
        my_task()



 `@flow(name='my_unique_name', ...)`


### Results

In [24]:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    task_result = my_task()
    return task_result + 1

result = my_flow()
assert result == 2



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [25]:
result

2

In [28]:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    # state = my_task(return_state=True)
    # return state.result() + 1
    num = my_task()
    return num +1
state = my_flow(return_state=True)


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [32]:
state.result()

UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')

### Artifacts

In [33]:
# Link artifactes
from prefect import flow, task
from prefect.artifacts import create_link_artifact

@task
def my_first_task():
        create_link_artifact(
            key="irregular-data",
            link="https://nyc3.digitaloceanspaces.com/my-bucket-name/highly_variable_data.csv",
            description="## Highly variable data",
        )

@task
def my_second_task():
        create_link_artifact(
            key="irregular-data",
            link="https://nyc3.digitaloceanspaces.com/my-bucket-name/low_pred_data.csv",
            description="# Low prediction accuracy",
        )

@flow
def my_flow():
    my_first_task()
    my_second_task()

if __name__ == "__main__":
    my_flow()



 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [34]:
from prefect import flow
from prefect.artifacts import create_link_artifact

@flow
def my_flow():
    create_link_artifact(
        key="my-important-link",
        link="https://www.prefect.io/",
        link_text="Prefect",
    )

my_flow()


 `@flow(name='my_unique_name', ...)`


In [35]:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact

@task
def markdown_task():
    na_revenue = 500000
    markdown_report = f"""# Sales Report

## Summary

In the past quarter, our company saw a significant increase in sales, with a total revenue of $1,000,000. 
This represents a 20% increase over the same period last year.

## Sales by Region

| Region        | Revenue |
|:--------------|-------:|
| North America | ${na_revenue:,} |
| Europe        | $250,000 |
| Asia          | $150,000 |
| South America | $75,000 |
| Africa        | $25,000 |

## Top Products

1. Product A - $300,000 in revenue
2. Product B - $200,000 in revenue
3. Product C - $150,000 in revenue

## Conclusion

Overall, these results are very encouraging and demonstrate the success of our sales team in increasing revenue 
across all regions. However, we still have room for improvement and should focus on further increasing sales in 
the coming quarter.
"""
    create_markdown_artifact(
        key="gtm-report",
        markdown=markdown_report,
        description="Quarterly Sales Report",
    )

@flow()
def my_flow():
    markdown_task()

my_flow()



 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [38]:
from prefect.artifacts import create_table_artifact

def my_fn():
    highest_churn_possibility = [
       {'customer_id':'12345', 'name': 'John Smith', 'churn_probability': 0.85 }, 
       {'customer_id':'56789', 'name': 'Jane Jones', 'churn_probability': 0.65 } 
    ]

    create_table_artifact(
        key="personalized-reachout",
        table=highest_churn_possibility,
        description= "# Marvin, please reach out to these customers today!"
    )

my_fn()


  create_table_artifact(
