## Ezmsg Introduction

If this is your first time using `ezmsg`, you're in the right place. This notebook will walk through the basics of creating a very simple `ezmsg` system.

`ezmsg` is ideal for creating modular processing pipelines whose steps can be arranged as a directed acyclic graph. In this notebook, we will walk through a very simple graph which generates a count of numbers, adds 1 to each number, and prints to standard output.

# ![simple graph](simple-graph.png)

In `ezmsg` syntax, this graph would look like this:

# ![ezmsg graph](ezmsg-graph.png)

We will write an `ezmsg` `Unit` for each discrete step of our pipeline, and connect them together with a `System`.

In [70]:
import ezmsg.core as ez
from typing import AsyncGenerator

Create a message type to pass between the `Units`. The message type should inherit from `Message`.

In [71]:
class CountMessage(ez.Message):
    value: int

We also need a way to tell the `Unit` how many numbers to generate.

In [72]:
class CountSettings(ez.Settings):
   iterations: int

Next, create a `Unit` that will generate a count. Every `Unit` should contain inputs and/or outputs and at least one function which subscribes to the inputs or publishes to the outputs.

For `Count`, we create an `OutputStream` and a publishing function which will perform the number calculation and yield `CountMessages` to the `OutputStream`.

In [73]:
class Count(ez.Unit):

    SETTINGS: CountSettings

    OUTPUT_COUNT = ez.OutputStream(CountMessage)

    @ez.publisher(OUTPUT_COUNT)
    async def count(self) -> AsyncGenerator:
        count = 0
        while count < self.SETTINGS.iterations:
            yield (self.OUTPUT_COUNT, CountMessage(
                value=count
            ))
            count = count + 1

        raise ez.NormalTermination

The `ez.NormalTermination` exception lets the system know that no further messages will be sent from the `Count` unit. The system will gracefully shut down once all messages have been passed through.

The next `Unit` in the chain should accept a `CountMessage` from the first `Unit`, add 1 to its value, and yield a new CountMessage. To do this, we create a new `Unit` which contains a function which both subscribes and publishes. We will connect this `Unit` to `Count` later on, when we create a `System`.

The subscribing function will be called anytime the `Unit` receives a message to the `InputStream` that the function subscribes to. In this case, `INPUT_COUNT`.

In [74]:
class AddOne(ez.Unit):

    INPUT_COUNT = ez.InputStream(CountMessage)
    OUTPUT_PLUS_ONE = ez.OutputStream(CountMessage)

    @ez.subscriber(INPUT_COUNT)
    @ez.publisher(OUTPUT_PLUS_ONE)
    async def on_message(self, message) -> AsyncGenerator:
        yield (self.OUTPUT_PLUS_ONE, CountMessage(
            value=message.value + 1
        ))

Finally, the last unit should print the value of any messages it receives.

In [75]:
class PrintValue(ez.Unit):

    INPUT = ez.InputStream(CountMessage)

    @ez.subscriber(INPUT)
    async def on_message(self, message) -> None:
        print(message.value)

The last thing to do before we have a fully functioning `ezmsg` pipeline is to define any `Settings` that have been declared and to connect all of the units together. We do this using a `System`. The `configure()` and `network()` functions are special functions that define `System` behavior.

In [76]:
class CountSystem(ez.System):

    # Define member units
    COUNT = Count()
    ADD_ONE = AddOne()
    PRINT = PrintValue()

    # Use the configure function to apply settings to member Units
    def configure(self) -> None:
        self.COUNT.apply_settings(
            CountSettings(iterations=20)
        )

    # Use the network function to connect inputs and outputs of Units
    def network(self) -> ez.NetworkDefinition:
        return (
            (self.COUNT.OUTPUT_COUNT, self.ADD_ONE.INPUT_COUNT),
            (self.ADD_ONE.OUTPUT_PLUS_ONE, self.PRINT.INPUT)
        )


Finally, instantiate and run the system!

In [77]:
system = CountSystem()
ez.run_system(system)