[参考記事](https://qiita.com/koji_mats/items/0533fbdeb9012a7e1494#appendix-prefect%E3%82%92k8s%E3%81%B8%E3%83%87%E3%83%97%E3%83%AD%E3%82%A4%E3%81%99%E3%82%8B%E6%89%8B%E9%A0%86)

# メモ

* flowやtaskにはユニークな名前を付ける必要がある(そうでないと警告発生)

# 1. Hello World!!

In [None]:
# ver1での書き方
# 2系で実施しているので、実行エラー
# from prefect import Flow, task

# @task
# def get_name():
#     return "world"

# @task
# def hello(who):
#     print("hello, {}!".format(who))

# with Flow("HelloWorld") as flow:
#     who = get_name()
#     hello_world = hello(who)

# flow.run()

In [3]:
# 記事の内容を2系で書き換え
from prefect import task, flow

@task(name="get name task")
def get_name():
    return("World")

@task(name="hello task")
def hello(who):
    print(f"Hello, {who}!")

@flow(name="greeting flow")
def greeting_flow():
    who = get_name()
    hello(who)

if __name__ == "__main__":
    greeting_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello, World!


# 2. パラメータ化

* flow実行時にパラメータを渡して、タスク内で使用する

In [4]:
# "name"という引数をflow側で用意して、taskに渡して実行する
from prefect import task, flow

@task(name="get name task")
def get_name(name):
    return name

@task(name="hello task")
def hello(who):
    print(f"Hello, {who}!")

@flow(name="greeting flow")
def greeting_flow(name):
    who = get_name(name)
    hello(who)

if __name__ == "__main__":
    greeting_flow("KOH")


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello, KOH!


# 3. 依存関係の定義

* task1 -> task2の順に実行したいようなケース

In [None]:
# def my_flow():
#     x = task_1.submit()
#     # task 2 will wait for task_1 to complete
#     # y = task_2.submit(wait_for=[x])

In [1]:
from prefect import task, flow

@task(name="hello task")
def task1():
    print("Hello")


@task(name="world task")
def task2():
    print("World!!")

@flow(name="hello world flow")
def hello_world():
    x = task1.submit()
    y = task2.submit(wait_for=[x])

if __name__ == "__main__":
    hello_world()

flowの実行完了


Hello


World!!


# 4. 動的Flow
* タスクのmapメソッドを使用して、実行時に上流タスクから渡された値に応じて動的にタスクを生成
  * mapの引数にリスト(を返すタスクの結果)を渡すと、```リストの各要素ごとにタスクを実行```
  * mapで実行されるタスクは可能であれば並列実行されるよう

In [23]:
from random import randint
from prefect import task, flow

@task
def random_list():
    return [i for i in range(randint(1, 4))]

@task
def squared(x):
    return x ** 2

@task
def sum_up(l):
    print(f"Squared results: {l}")
    print(f"Sum: {sum(l)}")


@flow(name="dynamic flow")
def dynamic_flow():
    l = random_list()
    squared_res = squared.map(l)
    sum_res = sum_up(squared_res)
    print(f"squared_res：{squared_res}")
    print(f"squared_resのタイプ：{type(squared_res)}")

if __name__ == "__main__":
    dynamic_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Squared results: [0, 1, 4, 9]
Sum: 14


squared_res：[PrefectFuture('squared-0'), PrefectFuture('squared-1'), PrefectFuture('squared-2'), PrefectFuture('squared-3')]
squared_resのタイプ：<class 'list'>


# 5. 条件分岐
* 1系では、タスク（case、merge、ifelse、switch）は、prefect.tasks.control_flowからインポートして使用していた。
* 2系では、Pythonネイティブのif/else文を使って条件ロジックを指定することができます。

In [None]:
# from random import randint
# from prefect import task, Flow
# from prefect.tasks.control_flow.conditional import ifelse

# @task
# def check_even():
#     return randint(0, 10) % 2 == 0

# @task
# def even():
#     print("It's Even!!")

# @task
# def odd():
#     print("It's Odd!!")

# with Flow("IfElseFlow") as flow:
#     cond = check_even()
#     result = ifelse(cond, even, odd)

# flow.run()


In [11]:
from random import randint
from prefect import task, flow

@task
def check_even():
    return randint(0, 10) % 2 == 0

@task
def even():
    print("It's Even!!")

@task
def odd():
    print("It's Odd!!")

@flow
def is_else_flow():
    cond = check_even.submit()
    print(f"cond: {cond}")
    print(f"cond.result(): {cond.result()}")
    if cond.result():
        even.submit(wait_for=[cond])
    else:
        odd.submit(wait_for=[cond])

if __name__ == "__main__":
    is_else_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


cond: PrefectFuture('check_even-0')


cond.result(): True


It's Even!!


In [None]:
from random import randint
from prefect import task, flow

@task
def check_even():
    return randint(0, 10) % 2 == 0

@task
def even():
    print("It's Even!!")

@task
def odd():
    print("It's Odd!!")

@flow
def is_else_flow():
    cond = check_even()
    if cond:
        even()
    else:
        odd()

if __name__ == "__main__":
    is_else_flow()

# 6. Flowの成功・失敗の条件
* 事前準備 -> 本処理 -> 後片付けのようなフローで、本処理の結果にかかわらず、後片付けをしたいケース

In [16]:
import random
from prefect import task, flow

@task
def do_something_important():
    bool_ = random.random() > 0.5
    print(f"Is the number > 0.5? {bool_}")
    if bool_:
        raise ValueError("Non-deterministic error has occured.")

@task
def fail():
    print("Failure")

@task
def succeed():
    print("Success")

@task
def always_run():
    print("Running regardless of upstream task's state")

@flow(log_prints=True)
def main_flow():
    a = do_something_important.submit()
    # equivalent of all_failed or any_failed trigger in Prefect 1.0:
    if a.wait() == "Failed":
        fail.submit()  # the task is skipped if the condition is not true
    # equivalent of all_succesful trigger in Prefect 1.0
    else:
        succeed.submit()  # the task is skipped on upstream failure

    # equivalent of always_run or all_finished trigger in Prefect 1.0:
    always_run.submit()

if __name__ == "__main__":
    main_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


ValueError: Non-deterministic error has occured.