RouteMaster is a .NET library for writing stateful workflows on top of a message bus. It exists to make the implementation of long running business processes easy in event driven systems. Find out more of the history at https://blog.mavnn.co.uk/routemaster-master-your-messaging-routes/
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
.paket
.vscode
Core
EasyNetQ
MemoryBus
MemoryStores
PostgreSQL
Tests.CSharp
Tests
.gitignore
.travis.yml
LICENSE.txt
README.html
README.org
RouteMaster.png
Settings.FSharpLint
build.sh
docker-compose.yml
email_sender.svg
integration.sh
paket.dependencies
paket.lock

README.org

RouteMaster

What is it?

RouteMaster is a .NET library for writing stateful workflows on top of a message bus. It exists to make the implementation of long running business processes easy in event driven systems.

Much of the thinking behind RouteMaster is inspired by the book Enterprise Integration Patterns, and it works best when:

**RouteMaster is currently at an Alpha state and is not ready for production usage**

Would you like to see faster progress on RouteMaster? Contact us@mavnn.co.uk to discuss sponsorship.

What does it look like?

Imagine a workflow for sending emails. It is triggered with an email address to send to and a template name to use.

digraph { 
  node [shape=box,style="filled",color="#aaffaa"]

  trigger [label="Trigger email send"]
  template [label="Request template"]
  getInfo [label="Request user info"]
  render [label="Render HTML"]
  send [label="Send Email"]

  trigger -> template -> getInfo -> render -> send
}

There are microservices already written to perform each of these individual functions (there is a template store, a user info store, etc.).

We use the following .NET types as the message types available in our overall system; these are effectively the contract by which we communicate with our services. Here they are defined as F# records to capture the fact that they are immutable value types, without needing to write additional equality checks:

// For the template store service

// Request
type GetTemplate =
    { cid : Guid
      templateId : string }

// Response
type Template =
    { cid : Guid
      template : string }

// For user info store

// Request, keyed on email address
type LookupUserInfo =
    { cid : Guid
      address : string }

// Response
type UserInfo =
    { cid : Guid
      name : string }

// HTML template renderer

// Request with template and "user info"
type RenderEmail =
    { cid : Guid
      template : string
      name : string }

// Response
type EmailRendered =
    { cid : Guid
      content : string }

// Email sending service

// Request
type SendEmail =
    { cid : Guid
      address : string
      content : string }

// Response
type EmailSent =
    { cid : Guid
      success : bool }

All of these types contain a cid field - a convention in our system for storing a Correlation Identifier. All of the applications within our system (or at least, all of the Message Translators exposing these messages) know to publish response messages including the same correlation identifier as received in the triggering message.

To run through this work flow, we’ll need a stateful process manager; we’ll need to retrieve the current template and user info, combine the two with the rendering service and then use the result (and our initial email address) to actually send our email.

Our state will need to look something like this:

type SendEmailProcessState =
    { ToAddress : string
      Template : string option
      Content : string option
      UserInfo : string option }

We’ll always have the ToAddress, which is the information given to start the process off, but all of the other information will be filled in as we go through.

Now we need to define each step in our workflow. Firstly, let’s create a function which takes our initial state and return an Async<StepResult> which:

  • Records the fact that we expect a Template message soon with a particular correlation ID
  • Requests that RouteMaster sends a GetTemplate message with the same correlation ID
let startSendEmailRoute ttl timeout receivedTemplate initialState =
    async {
        let getTemplate =
            { cid = Guid.NewGuid()
              templateId = "My template" }
        let cid = getTemplate.cid.ToString() |> CorrelationId
        return StepResult.pipeline ttl timeout getTemplate cid receivedTemplate
    }

But wait! That function takes four arguments - what are the other three?

tll (“time to live”) is a simple TimeSpan. To avoid issues with stale messages and unbounded backlogs, RouteMaster requires that all messages sent and all expected responses have a time limit. For a simple “pipeline” step like this (sends one message, expects one response) the time to live of the message and how long we’ll wait for the expected result are defined to be equal.

We cannot define the timeout and receivedTemplate steps within the function, as the steps to continue a workflow must be “registered” before being used. So for now we’ll leave them as function parameters to be passed in later.

Next, we’ll be receiving a Template message; we need a Step which knows how to extract the correlation ID from the message, and what logic to invoke when we receive one we’ve been expecting.

let receivedTemplate timeout receivedUserInfo =
    let extract (t : Template) =
        t.cid.ToString()
        |> CorrelationId
        |> Some
    let invoke (access : StateAccess<_>) (template : Template) =
        async {
            let state = access.Update (fun state -> { state with Template = Some template.template })
            match state with
            | Some { ToAddress = a } ->
                let lookupUserInfo =
                    { cid = Guid.NewGuid()
                      address = a }
                let cid = lookupUserInfo.cid.ToString() |> CorrelationId
                return StepResult.pipeline ttl timeout lookupUserInfo cid receivedUserInfo
            | _ ->
                printfn "Failed to retrieve state!"
                return StepResult.cancel
        }
    Step.create
        (StepName "template received")
        extract
        invoke

We’ll need a more steps to cover each of the stages of the process, and finally we’ll add a timeout step which will receive a TimeoutMessage if any step along the way times out. Let’s put those together:

let receivedUserInfo receivedEmailRendered timeout =
    let extract (u : UserInfo) =
        u.cid.ToString()
        |> CorrelationId
        |> Some
    let invoke (access : StateAccess<_>) (u : UserInfo) =
        async {
            let state = access.Update id
            match state with
            | Some { Template = Some t } ->
                let renderEmail =
                    { cid = Guid.NewGuid()
                      template = t
                      name = u.name }
                let cid = renderEmail.cid.ToString() |> CorrelationId
                return
                    StepResult.pipeline
                        ttl timeout renderEmail cid receivedEmailRendered
            | _ ->
                printfn "Failed to retrieve state!"
                return StepResult.cancel
        }
    Step.create
        (StepName "user info received")
        extract
        invoke

let receivedEmailRendered receivedEmailSent timeout =
    let extract (er : EmailRendered) =
        er.cid.ToString()
        |> CorrelationId
        |> Some
    let invoke (access : StateAccess<_>) (er : EmailRendered) =
        async {
            let state = access.Update id
            match state with
            | Some { ToAddress = a } ->
                let sendEmail =
                    { cid = Guid.NewGuid()
                      address = a
                      content = er.content }
                let cid = sendEmail.cid.ToString() |> CorrelationId
                return
                    StepResult.pipeline
                        ttl timeout sendEmail cid receivedEmailSent
            | _ ->
                printfn "Failed to retrieve state!"
                return StepResult.cancel
        }
    Step.create
        (StepName "an email was rendered")
        extract
        invoke

let receivedEmailSent =
    Step.create
        (StepName "anEmailWasSent")
        (fun (es : EmailSent) ->
            es.cid.ToString()
            |> CorrelationId
            |> Some)
        (fun access (_ : EmailSent) -> async {
            printfn "Yay! Record I'm done somewhere"
            printfn "The console sounds a great place!"
            return StepResult.cancel
        })

let receivedTimeout =
    Step.createTimeout (StepName "timeout") (fun _ _ -> async {
        printfn "I should probably tell someone this happened."
        printfn "But I'm only demo code."
        return StepResult.cancel
    })

Now we have all of the steps required to build our “Route”. To actually connect everything up (persistent storage, connect to the message bus, etc) we need to activate the route.

Activation is the process by which we register all of these lovely pieces of logic with the underlying infrastructure. And to help us remember to do so, our StepResult output from each step requires that any subsequent steps are registered.

How does this all work? Well, we call the RouteMaster.activate function with two arguments:

  • a configuration representing the underlying infrastructure
  • a builder function which will be called with a RouteBuilder

Out of all this, we need to return a function that knows how to take the initial state of the this route and kick off the first step. Luckily we already have that above - the = startSendEmailRoute= function.

So let’s get on with it; because earlier steps in the route require later steps to already be registered before they can be registered themselves, we end up building up the route in reverse order:

// default time to live
let ttl = TimeSpan.FromMinutes 5.

let buildFunc builder =
    let timeout =
        receivedTimeout
        |> Step.register builder
    let registeredEmailSent =
        Step.register builder receivedEmailSent
    let registeredEmailRendered =
        receivedEmailRendered registeredEmailSent timeout
        |> Step.register builder
    let registeredUserInfo =
        receivedUserInfo registeredEmailRendered timeout
        |> Step.register builder
    let registeredTemplate =
        receivedTemplate registeredUserInfo timeout
        |> Step.register builder
    (fun initial state -> startSendEmailRoute ttl timeout registeredTemplate)

use sendEmailRoute =
    RouteMaster.activate config buildFunc

After running this code, sendEmailRoute is now active and will start receiving messages from the message bus. Because expected messages are shared between RouteMaster nodes, it will start processing messages which have been expected by any node on the same infrastructure with matching route and step names (route name is part of the config object).

To initiate a new run of the route, we simply call startRoute with a initial starter state.

let nextPerson =
    { ToAddress = "bob@example.com"
      Template = None
      Content = None
      UserInfo = None }
RouteMaster.startRoute sendEmailRoute nextPerson

So there you have it - a simple (but stateful) pipeline workflow in RouteMaster. Things get more interesting from here!

What’s next?

  • C# friendly API
  • Documentation
  • Nicer abstractions over common patterns such as fork and join
  • Support for more State and Transport backends