# 初めてのMaize
簡単な例として2つの文字列を受け取って結合するというフローを構築しましょう

In [None]:
from maize.core.interface import Parameter, Output, MultiInput
from maize.core.node import Node
from maize.core.workflow import Workflow

### Nodeを定義する
文字列を受け取るノード(Example)と文字列を結合し出力するノード(ConcatAndPrint)を定義しましょう

- Example
  - パラメータとして文字を受け取ります。デフォルトはHelloです。
  - アウトプットとして受け取ったデータをそのまま次のノードに渡します。
- ConcatAndPrint
  - ConcatAndPrintは名前の通り、複数の入力を受取り、それをループで回し . を区切りにしてジョイン、最後にLoggerのにConcatした文字を渡します。
 

In [None]:
class Example(Node):
    data: Parameter[str] = Parameter(default="Hello")
    out: Output[str] = Output()

    def run(self) -> Node:
        self.out.send(self.data.value)

class ConcatAndPrint(Node):
    inp: MultiInput[str] = MultiInput()

    def run(self):
        result = " ".join(inp.receive() for inp in self.inp)
        print("###########################")
        print(f"{result}")
        print("###########################")
        self.logger.info("Received: '%s'", result)

### WorlFlowを定義する
- 最初にWorkflowオブジェクトを作成します。
- 続いてワークフローにノードを追加します。Exampleノードは動作が異なるものを2つ足します。
- さらにConcatAndPrintノードを追加します。
- 最後に各ノードを結合させます。
- フローに問題がないかcheckメソッドで確認します（問題がある場合はエラーが出ます）

In [None]:
flow = Workflow(name='hello')
ex1 = flow.add(Example, name='ex1', parameters=dict(data='nice to meet you!'))
ex2 = flow.add(Example, name='ex2', parameters=dict(data='maize'))
concat = flow.add(ConcatAndPrint)
flow.connect(ex1.out, concat.inp)
flow.connect(ex2.out, concat.inp)
flow.check()

###　作成したワークフローを可視化してみましょう
- visualizeメソッドによりワークフローはグラフで表示されます。

In [None]:
flow.visualize()

### 実際に実行します


In [None]:
flow.execute()

##　FBPの大まかな流れ

#### step 1 workflow objectを作りNodeを追加する

- levelはロギングの冗長性を指定し（pythonのloggingモジュールを参照）、 cleanup_tempは実行中に作成されたディレクトリをクリーンアップすべきかどうかを指定し、 default_channel_sizeは一度にノード間チャネルに置くことができるアイテムの数を決定し、 logfileはSTDOUTではなくファイルにログを書き込むことを許可します。
```python
from pathlib import Path
flow = Workflow(
   name="Example",
   level="debug",
   cleanup_temp=False,
   default_channel_size=5,
   logfile=Path("out.log")
)
```

- 作成したWorkflowにはNodeを適宜追加できます。
```python
node = flow.add(Example)
node2 = flow.add(Example, name="other_example")
```
- また様々なオプションを渡すことも可能です。
```python
other = flow.add(
   OtherExample,
   name="other",
   parameters=dict(value=42),
   loop=True,
   fail_ok=True,
   n_attempts=3,
   max_loops=5
)
```

#### step2　ノードをつなげます
```python
flow.connect(node.output, other.input)
```

#### step3　実行
```python
flow.execute()
```

# Nodeを作ってみる-1
- カスタムノードを作る場合はNodeを継承して作成します。
- 以下の例では、入力としてStringを受取り、出力として受け取ったデータの値を返すという挙動を取ります。

In [None]:
class Example(Node):
    out: Output[str] = Output()
    data: Parameter[str] = Parameter(default="hello")

    def run(self) -> None:
        self.out.send(self.data.value)

- File IOのノードを見てみます。
- このノードではPathを受取り、ファイルの有無を確認し、Pathを次の処理に渡すという流れを担います。
 
```python
P = TypeVar("P", bound=Path)
class LoadFile(Node, Generic[P]):
    """Provides a file specified as a parameter on an output."""

    file: FileParameter[P] = FileParameter()
    """Path to the input file"""

    out: Output[P] = Output(mode="copy")
    """File output"""

    def run(self) -> None:
        path = self.file.filepath
        if not path.exists():
            raise FileNotFoundError(f"File at path {path.as_posix()} not found")

        self.out.send(path)
```


# Nodeを作ってみる-2
- カスタムノードではファイルの受け渡しもできます。
- 中間ファイルを出力しそれを次に渡すと行った処理を考える際に便利です。
- パラメータの型としてpathlib.Pathを指定しましょう。
- これはMaize-contribのSmilesのファイルを読むクラスのコードです。

```python
from typing import Annotated, Any, Callable, Iterable, List, Literal, TypeVar
from maize.core.node import Node
from maize.core.interface import Input, Output, Parameter, FileParameter, Suffix, Flag, MultiInput

class LoadSmiles(Node):
    """Load SMILES codes from a ``.smi`` file."""

    path: FileParameter[Annotated[Path, Suffix("smi")]] = FileParameter()
    """SMILES file input"""

    out: Output[list[str]] = Output()
    """SMILES output"""

    sample: Parameter[int] = Parameter(optional=True)
    """Take a sample of SMILES"""

    def run(self) -> None:
        with self.path.filepath.open() as file:
            smiles = [smi.strip("\n") for smi in file.readlines()]
            if self.sample.is_set:
                smiles = random.choices(smiles, k=self.sample.value)
            self.out.send(smiles)
```

- 上の例ではSmilesファイルを読むことに特化していましが、同じ要領でテキストファイルを読み込みそれをプリントするWFを書いてみましょう。
- データは../dataにあるのをデフォルトで利用していますが適宜変えてもらって構いません。

In [None]:
from typing import Annotated, Any, Callable, Iterable, List, Literal, TypeVar
from maize.core.node import Node
from maize.core.interface import Input, Output, Parameter, FileParameter, Suffix, Flag, MultiInput
from pathlib import Path

class LoadText(Node):
    path: FileParameter[Path] = FileParameter()
    out: Output[str] = Output()
    def run(self) -> None:
        with self.path.filepath.open() as file:
            text_data = "\n\n"+"".join([line for line in file.readlines()])+"\n\n"
            self.out.send(text_data)

class PrintTxt(Node):
    inp: Input[str] = Input()

    def run(self):
        result = self.inp.receive()
        self.logger.info("Received: '%s'", result)

In [None]:
flow = Workflow(
   name="file_handling",
   #level="debug",
   cleanup_temp=False,
   #logfile=Path("out.log")
)

In [None]:
filepath = Path("../data/loadfile_demo.txt")
load = flow.add(LoadText, parameters=dict(path=filepath))
printer = flow.add(PrintTxt)

In [None]:
flow.connect(load.out, printer.inp)

In [None]:
flow.check()
flow.visualize()

In [None]:
flow.execute()

# 条件により分岐するワークフロー
- ノードの処理の結果によって分岐させたいことも可能です。
- 下の例ではLoadDataのパラメータ（Dataが10未満か、否か）によって処理を分岐させています。

In [None]:
from maize.core.node import Node
from maize.core.interface import Input, Output
from maize.core.workflow import Workflow
from maize.steps.io import LoadData, LogResult
from maize.steps.plumbing import Merge

class Condition(Node):
    inp: Input[int] = Input()
    out_a: Output[int] = Output()
    out_b: Output[int] = Output()

    def run(self) -> None:
        data = self.inp.receive()
        if data < 10:
            self.out_a.send(data)
        else:
            self.out_b.send(data)

In [None]:
flow = Workflow(name="cycle")
load = flow.add(LoadData[int], parameters={"data": 17})
cond = flow.add(Condition)
out_a = flow.add(LogResult, name="out_a")
out_b = flow.add(LogResult, name="out_b")
flow.connect_all(
    (load.out, cond.inp),
    (cond.out_a, out_a.inp),
    (cond.out_b, out_b.inp)
)

In [None]:
flow.check()

In [None]:
flow.visualize()

In [None]:
flow.execute()

# 繰り返しを含むワークフロー
- Dealyは指定された秒数time.sleepによって処理を止めます
- Mergeは複数の入力を受取り、一つのポートからそれらを出力します。

In [None]:
from maize.core.graph import Graph
from maize.core.interface import Parameter, Input, Output
from maize.core.node import Node
from maize.core.workflow import Workflow

from maize.steps.plumbing import Delay, Merge
from maize.steps.io import Return
from typing import TypeVar, Generic
T = TypeVar("T")

class A(Node):
    out: Output[int] = Output()
    send_val: Parameter[int] = Parameter()

    def run(self) -> None:
        self.out.send(self.send_val.value)


class B(Node):
    inp: Input[int] = Input()
    out: Output[int] = Output()
    final: Output[int] = Output()

    def run(self) -> None:
        val = self.inp.receive()
        if val > 48:
            self.logger.debug("%s stopping", self.name)
            self.final.send(val)
            return
        print(val+2)
        self.out.send(val + 2)

class SubGraph(Graph):
    
    def build(self) -> None:
        a = self.add(A, parameters=dict(send_val=36))
        d = self.add(Delay[int], parameters=dict(delay=1))
        self.connect(a.out, d.inp)
        #map_portを使いサブグラフの出力ポートを作成　普通のノードはself.out.send(xxx) etc
        self.out = self.map_port(d.out)
        self.val = self.combine_parameters(a.send_val, name="val")


flow = Workflow(name="test")
sg = flow.add(SubGraph)
b = flow.add(B, loop=True)
merge = flow.add(Merge[int])
ret = flow.add(Return[int])
flow.connect(sg.out, merge.inp)
flow.connect(merge.out, b.inp)
flow.connect(b.out, merge.inp)
flow.connect(b.final, ret.inp)
flow.combine_parameters(sg.val, name="val")
flow.check()
flow.visualize()

In [None]:
flow.execute()