Skip to content

High Level Redesign Proposal

Tim Watson edited this page Nov 28, 2018 · 7 revisions

Raised for discussion Dec 2018, by Tim Watson

Proposal - tl;dr

The essence of this proposal is that I think we should re-architect Cloud Haskell to be less beholden to implementing Erlang for GHC, and make it more of a platform for building a wide range of concurrent, distributed applications, using various different styles and architectural approaches.

At the core, the technical side of this proposal is that we should create a core for fault tolerant, concurrent, distributed programming, which doesn't force you into the actor model - based on managed/supervised, typed channels, and a configurable scheduler strategy - and then provide an actor implementation on top of that. Some further motivating ideas are presented here

I have carefully considered the fact that, whilst some industrial users have moved away from Cloud Haskell (viz Tweag, Well Typed, and others), we still have commercial users of Cloud Haskell with production code at stake - I'm waving at you right now, adjoint.io. I am hopeful that any re-engineering will only make things better for existing users, and attract more users and contributors in the future.

Key Ideas

The key points of this proposal follow. I have separated them into the bits that should concern current users, and the bits that (hopefully!) will excite future prospective users.

Maintain the status quo for distributed-process, distributed-process-client-server, distributed-process-supervisor

  • The current 0.x.y epoch will be maintained, and continue to receive bug fixes and minor improvements as necessary
  • The current 0.x.y epoch should not hit End Of Service Life until we have reached a beta-quality implementation of whatever replaces it
  • Improvements discovered whilst engineering any replacement, will be back ported to the current 0.x.y epoch where possible

Keep what works

  • The network-transport abstraction is a good fit for inter-node communication, and should be kept as the primary interface between distributed-haskell and whatever underlying communications fabric users want to employ
  • The current implementation ties applications into a specific serialisation layer, which is less than ideal - try to make the choice of serialisation framework pluggable by defining an API for this
  • I would like to look at cborg, for example, which might subsume the direct use of binary
  • This might necessitate changes to the static/closure supporting libraries
  • Speaking of which, the current distributed-static library works well, but we could collapse distributed-closure into it

Break up the dependencies between layers

  • Apply a rather vigorous separation of concerns, and allow for plug-ability of each layer
  • I would like to independently be able to choose whether or not Actors fit my problem space, or whether I'd rather use some other concurrency model (reactive streams, for example)
  • I would like to be able to choose whether or not I need the distribution mechanism - an actor implementation (or concurrent streams, or whatever) may be useful without remote nodes being involved
  • Users should not have to rely on code they do not use - e.g. Supervisor forces users to define their {re}start specifications using Closure (Process ()), even if their child processes are never hosted remotely.
  • As per the above, it should be possible to choose a concurrency layer that is local and combine/compose this with remote layers, without losing functionality. Thus I should be able to choose to utilise actors and supervision, without having to take on the pain of remoting unless it's something I want to use
  • You should be able to implement your own node behaviour, for example, so that use of The Actor Model is a choice you make when building your node - hopefully different behaviours will emerge as people contribute; I can certainly think of several options that differ from the current approach

Make typed channels central

  • Re-design Cloud Haskell around the notion of typed channels as its core construct
  • Make typed channels an extensible and composable construct, such that we can layer them to form what today would be a Process holding multiple ReceivePort or SendPort handles
  • Make it possible to use typed channels without succumbing to the actor model, such that it's possible to pass a handle around and write to, or read from it, anywhere that you can run an stm transaction - technically that's possible today for reads, but with numerous caveats
  • Differentiate between local and remote channels at the type level, deliberately sacrificing location transparency, but gaining many advantages for in-process communication, as well as opening up extensibility for local channels to gain new features
  • Allow channel implementations to choose their own method of error reporting, whether Either a b or synchronous exceptions, or asynchronous exceptions (in the case of channels bound to a thread, which we will need to maintain the erlang style actor implementation) - note that being bound to a thread/Process is an option for this channel type, rather than a constraint, and currently a hidden invariant!
  • Create a typed channel implementation that maps directly to the current Process mailbox, type bound to Message, and life cycle bound to the creating thread
  • The above typed channel holds the current CQueue implementation mapped to it's read implementation, allowing us to continue supporting selective receive using pattern matching, so the current distributed-process implementation of Erlang-in-Haskell can continue to exist as a thing, and maintain its current API
  • Allow for composing typed channels into a pipeline, by connecting outputs to inputs
  • Have an API that supports taking this notion of pipes/conduits/whatever and applying concurrency to the connections between pipes in different ways
  • Ensure this composability of typed channels allows for combining and modifying send/receive behaviour - it should be possible to make a channel that is size bound, or blocks senders when full, for example
  • Ensure that in the case of a remote channel, the lightweight connection underlying these channels (which maps to a process) can be monitored, and that the channels themselves can be monitored - this is one of the most complex issues the proposal will need to address

Do the awesome things that {meta, monad}-par and Haxl do, in a distributed setting

This kind of covers itself...

Motivation

There are several motivating factors behind redesigning Cloud Haskell, some of the technical and some social/commercial. This document will examine the reasons why I feel we should consider redesign, and outline my proposals.

Social Drivers

The world has moved on since the project first gained popularity, and the user base remains quite small. I think there are a number of reasons for this...

  • The whole premise of CH is a bit mixed up - Erlang in Haskell. Or Distributed Haskell. Which is it?
  • The Erlang-in-Haskell thing has made the APIs very un-Haskell-ish. Having the ability to do selective receive is very cool mind you! I personally think that the Erlang style APIs should be optional, whilst the core should be built to feel more familiarly Haskell-ish. I think we've almost got to relatively normal looking code with distributed-process-fsm, but it's still a way off for combining processes. The same thing applies to supervision trees - you /really/ want a DSL for that.
  • Lots of things from Erlang are legacies we could do without - for example, the process registry - or could've been provided as a pluggable API so you can choose your poison (e.g. Akka makes all actors addressable via a URI and arranges child processes as sub-resources instead of using a flat namespace, and also makes supervision a built-in behaviour of all actors whilst allowing you to customise the inherited supervising behaviour for spawned children)
  • The actor model requires learning a new way of thinking about and dealing with concurrency, which is overhead unless you are really sold on the idea - despite this, Akka enjoys as much success as Erlang amount JVM aficionados, perhaps even more
  • The actor model is not a perfect fit for every problem, yet Cloud Haskell forces your square peg into the round hole anyway (yet distribution is useful without the actor model - see HdpH, for example)
  • The Cloud Haskell implementation of the actor model forces you to deal with the remote case even when writing intra-node local code, which is awkward, performs poorly (compared to hand crafted code to do the same job to pass data to another thread via a FIFO queue using STM, for example), and
  • There are numerous constraints in place to facilitate sending functions around the network, which is in fact something of an edge case
  • Creating closures from remote-able functions that take arguments and registering them in the the remote-tables is still pretty ugly without template haskell (could solve with static pointers, but they bring a different set of technical overheads)

Technical Drivers

Fluid GHC ecosystem surrounding distributed programming

The ongoing development of compiler + runtime capabilities for passing static values, as well as that of community maintained libraries handling serialisation (viz binary vs. cereal vs. cborg) makes for a rapidly moving landscape. Having the ability to plug in a serialisation layer would make it far easier to experiment with different options, and would give us a useful place to isolate general features such as caching deserialised values, logging failed deserialisation activities (e.g. calls to unclosure or deRefStatic that have failed), and so on.

Currently the choice is made at the distributed-process level, where we are bound to rank1dynamic, distributed-static, and binary.

Performance, Scalability, and Safety

There are numerous areas where Cloud Haskell suffers considerably under the current implementation. These include (but are probably not limited to)

Networking Complexities

There are some very tricky technical issues in this space. See for example, here, here, here, here, and here.

  • High likelihood of space leaks if you want ordering guarantees (since every interaction between two processes requires us to hold open a lightweight connection in order to do this)
  • Blocking behaviour in the transport layer can bring the whole system to a halt (!)
  • Heartbeats would consistently make the CH layer simpler to reason about in the remote case, but it's difficult to know at which layer to introduce these
  • There is no simple way to apply back-pressure to senders in the current architecture

This last point is perhaps a bit contentious. It may be desirable for some classes of application to apply tcp back-pressure, for example if we want to build an API for reactive streams (which I very much want to write, and which Haskell would be superbly suited for in so many ways). This is relatively easy to handle using Akka Streams, and the concurrent performance benchmarks of libraries like streamly tell me that Haskell should be super efficient here - not to mention the fantastic throughput the folks at Facebook have got out of GHC to do post/wall spam filtering - but the architecture of Cloud Haskell nodes precludes being able to adjust the way that a system behaves in order to allow for use cases such as these.

Another awkwardness of the way Cloud Haskell's networking implementation is directly tied into the node controller, is that we cannot use much by way of Cloud Haskell's features to manage the distribution protocol. For example, consider this discussion on asynchronous APIs in network-transport-tcp. Some discussion of Erlang's management of inter-node connections makes it clear that it keeps a supervision tree of processes around to manage connections, and therein lies a major difference between the architecture of a Cloud Haskell node, and that of an Erlang one. In Cloud Haskell, the networking and distribution are a core component of the node and its control plane, whilst in Erlang, the actor system is the core component, and the distribution mechanism is build on top of that, and leverages it (e.g. a process managing each inter-node connection, supervision, etc). This goes back to my thought that we might want an actor system without distribution - we equally might want distribution without actors, but actors might well be a great way to implement that distribution protocol.

Also, having spent quite some time looking at the networking layers from cardano-sl, I think we can optimise our use of network-transport more efficiently, but more on this later.

Bottlenecks and disasters in the local node control plane

The node controller can block on network writes, in places like this. In addition, the node controller can become a bottle neck for even local sends, when it becomes overloaded, which doesn't take too much. It is, after all, just one forkIO thread, and isn't getting preferential treatment from the runtime/scheduler.

Another major issue is that the NC unnecessarily deals with message decoding, calling unsafeCoerce which might fail quite spectacularly in one instance (for local sends). Elsewhere it decodes/deserialises data - which could take a long time in the case of a large message, and exposes a critical infrastructure thread to potential failure, and elsewhere does something similar by decoding a message bound to a typed channel.

The point about all this, is that we might want to move decoding out of the node controller's path of execution. The semantic arguments about whether we should accept a message that purports to be of type a based on its fingerprint and isn't, are certainly interesting. An attacker could easily craft such a payload. We might want to black list that connection, or even the endpoint at that point, but it would be nice to do so without our entire node coming down. Moreover, that failure would currently crash the node's internal control plane, or the network listener, but not the processes that are spawned by it, leaving actors running with no idea that they're not only disconnected from the outside world, but that their own execution environment has silently broken.

Other issues would be easier to solve if this conflating of responsibilities wasn't the case in the node controller. It would be easier to manage caching small byte strings, keeping a cache of de-serialized messages, handling decoding errors, and so on.

Awkwardness of adopting Erlang's type insensitive approach to messages in Haskell

Cloud Haskell code looks, well, odd, from a Haskell programmer's perspective. The insensitivity to types which is at the core of Erlang's receive and pattern matching approach, makes Cloud Haskell program's feel oddly out of place with other libraries. In particular, it is awkward (at best) to write Cloud Haskell code that composes well.

Once the handling of types, rules for invalid message types, mailbox handling semantics, and other aspects of writing a Plain Old Process were packaged up by layering distributed-process-client-server on top of distributed-process, it was relatively easy to write a more Haskell-ish API for building actors, in distributed-process-fsm. This does allow for the actor definition to look quite reasonable:

switchFsm :: Step State StateData
switchFsm = startState
         ^. ((event :: Event ButtonPush)
              ~> (  (On  ~@ (set_ (+1) >> enter Off)) -- on => off => on is possible with |> here...
                 .| (Off ~@ (set_ (+1) >> enter On))
                 ) |> (reply currentState))
         .| ((event :: Event Stop)
              ~> (  ((== ExitNormal) ~? (\_ -> timeout (seconds 3) Reset))
                    {- let's verify that we can't override a normal shutdown sequence... -}
                 .| ((== ExitShutdown) ~? (\_ -> timeout (seconds 3) Reset))
                 .| ((const True) ~? stop)
                 ))
         .| ((event :: Event Check) ~> reply stateData)
         .| (event :: Event Reset)
              ~> (allState $ \Reset -> put initCount >> enter Off)

-- or for example
genFSM :: SendPort () -> Step State ()
genFSM sp = initState Off ()
       ^. ( (whenStateIs Off)
            |> ((event :: Event ()) ~> (always $ \() -> postpone))
          )
       .| ( (((pevent 100) :: Event State) ~> (always $ \state -> enter state))
         .| ((event :: Event ()) ~> (always $ \() -> (lift $ sendChan sp ()) >> resume))
          )
       .| ( (event :: Event String)
             ~> ( (Off ~@ putBack)
               .| (On  ~@ (nextEvent ()))
                )
          )

It is still very awkward to wire together processes in this fashion however. What happens when we want to build a conduit style streaming program, or build an API that is more akin to Edward Kmett's machines?

I am not proposing that we drop the ability to send arbitrary data to other processes, simply that this ought not to be an underlying principle for Cloud Haskell.

Towards a better Haskell in the Cloud...?

It seems to me that a far more Haskell-ish approach would be to favour typed channels as the default mechanism for communicating with other Haskell threads. This is consistent with the concurrency and stm packages, and simply adds a remote- capability to these APIs that we're already familiar with. I will discuss this design principle in more depth in my proposal, but the key points are

  • We should be well typed by default, making typed channels the default communication primitive
  • Typed channels should be extensible, for example it should be possible to write a two-way/request-reply channel
  • Typed channels should have their own semantics which are verifiable and enforced through strict invariants
  • Typed channels should have behaviour which in the local case, is not less deterministic than using STM
  • Typed channels should have an API which is amenable to composition, both for reading and writing
  • Typed channels should allow you to re-implement their read/write strategies yourself if you want
  • A lot of optimisations become possible when typed channels are the default (caching, GC of unused channels, allowing the network stack to apply back-pressure to a single sender, size bounding, etc)

How this would affect Cloud Haskell is quite fundamental:

  • The Actor Model would be a layer on top of this capability
  • A baseline implementation of the actor model might look nothing like Erlang
  • Building Erlang In Haskell (aka the current distributed-process) should be an exercise in layering on top of the baseline, extending the behaviour of typed channels to allow for type Mailbox = TypeChannel Message, and to allow for attaching CQueue as an implementation behaviour of the Mailbox abstraction.
  • Building something that looked more like Akka would be plausible, layered on top of the baseline

All of this would also require making the local node control plane pluggable and potentially extensible. I see that as highly desirable, which we will discuss in more detail later on.

I would envisage distributed-process evolving into the API for Erlang-in-Haskell, with other APIs layered beneath (so a baseline actor model, which provides the core node behaviour built on network-transport, and extensibility to inject behaviour into the node control plane, and configure its concurrency, etc), and alongside it (such as an optional epmd-backend that actually implements Erlang's distribution approach using heartbeats and node-name <=> ip-port mapping). You could write the epmd-backend today, but there are reasons why I'm reluctant to delve into that given the current state of affairs...

Complexity and difficulty in consistent and reliable testing

  • Proper Integration testing requires nodes running in separate OS processes
  • Difficult to use a proper model based testing approach
  • Invariants cannot easily be specified statically (using types)

I would like to use semi-formal verification for Cloud Haskell, supplemented with integration testing that utilises multiple real nodes (i.e. separate OS processes, better still, separate machines), and obviously unit testing. I want to test the Cloud Haskell code this way, but also offer this as a framework for testing programs built on top of Cloud Haskell.

Doing this properly will mean making the points at which control flow departs from the calling thread pluggable. Currently approaches seem to involve writing your code in a different monad, as exemplified by quickcheck-state-machine-distributed for example, or having to expose low level concurrency primitives (as in deja-fu, for example), which won't allow for easily testing code much higher up the stack.

Making the control flow plane pluggable would allow us to easily inject a deterministic scheduler, such that we can use quickcheck to verify against a model as per quickcheck-state-machine-distributed, but just as the concurrent package provides a context in the form of MonadConc, which Deja Fu takes advantage of (providing ConcT as a convenience to allow for testing all kinds of concurrent code), we can provide a similar abstraction.

This leads us nicely on to...

The desire for different concurrency semantics, and difficulty utilising Cloud Haskell where architectures also employ different approaches to concurrency & distribution

As we've discussed already, the actor model is just one approach to concurrency and distribution, and Erlang's approach to that is just one option. There are others such as Distributed Parallel Haskell (HdpH), monad-par, meta-par, haxl, and so on.

Many people sit their Cloud Haskell code alongside code running in warp/wai/etc, and the options for communicating between processes and non-actor code are somewhat lacking. Making typed channels a cornerstone would alleviate this somewhat, insomuch as we might allow for an API which hides the spawning of processes and gives back a communication port/handle that is usable in IO land. This is far from impossible in the current Cloud Haskell - see the example below, which uses callSTM and handleCallExternal from distributed-process-client-server - but I think we can go much much further and do much better than this, taking inspiration from {monad, met}-par and Haxl while we're at it...

data StmServer = StmServer { serverPid  :: ProcessId
                           , writerChan :: TQueue String
                           , readerChan :: TQueue String
                           }

instance Resolvable StmServer where
  resolve = return . Just . serverPid

instance Killable StmServer where
  killProc StmServer{..} = kill serverPid
  exitProc StmServer{..} = exit serverPid

echoStm :: StmServer -> String -> Process (Either ExitReason String)
echoStm StmServer{..} = callSTM serverPid
                                (writeTQueue writerChan)
                                (readTQueue  readerChan)

launchEchoServer :: Process StmServer
launchEchoServer = do
  (inQ, replyQ) <- liftIO $ do
    cIn <- newTQueueIO
    cOut <- newTQueueIO
    return (cIn, cOut)

  let procDef = statelessProcess {
                  apiHandlers = [
                    handleCallExternal
                      (readTQueue inQ)
                      (writeTQueue replyQ)
                      (\st (msg :: String) -> reply msg st)
                  ]
                }

  pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
  return $ StmServer pid inQ replyQ

testExternalCall :: TestResult Bool -> Process ()
testExternalCall result = do
  let txt = "hello stm-call foo"
  srv <- launchEchoServer
  echoStm srv txt >>= stash result . (== Right txt)
  killProc srv "done"

This still requires us to be in the Process monad, since we need to avoid blocking indefinitely if the server process dies before replying, and thus a monitor is required. This means having to wrap the code in runProcess, which in turn requires synchronising on an MVar. Needless to say even if you write your own code to capture the output of a process running elsewhere, you're immediately exposed to the issues of lock based concurrency in order to coordinate with Cloud Haskell code from the outside world.

Making channels a core API construct could alleviate this altogether, since we could bake the error signalling telemetry into the channel implementation, giving for example, a waiting stm transaction an option to yield an Either a b for any request-reply call style interaction. Indeed, perhaps the choice between signalling failure with an asynchronous exception versus a reply error code, versus turning a reply error code into a synchronous exception is a design decision best left to the consumer. Moreover this choice of policy application could be still left to the user, but instead based on the channel implementation chosen.

We ought to be removing the need for dealing with locks and synchronising threads, since that is an oft quoted example of the benefit of using the actor model. Furthermore, the better we shield users from those issues and fortify and bug fix our library code, the more valuable it will be to users.

Not all actor implementations want Erlang's semantics. Akka for example, has several notable differences: it properly differentiates between the guarantees that are available for local versus remote interactions, for one. Erlang is not the first langauge/platform to fall foul of the myths surrounding location transparency. Akka makes it clear that guarantees which hold due to the java memory model, are different to guarantees that hold based on some distributed coordination mechanism.

Again, reimagining our user facing APIs around channels as the core concept, could help resolve this issue. Channel semantics could be allowed to vary, and channels that only work for local communications defined, which would not only offer different semantics but would also be free from the restriction on only passing Serialisable data.

None of this need change either the semantics of the current distributed-process implementation, nor its API.