Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running on a cluster of nodes #39

Closed
slashdotdash opened this issue Nov 29, 2016 · 19 comments
Closed

Support running on a cluster of nodes #39

slashdotdash opened this issue Nov 29, 2016 · 19 comments
Assignees

Comments

@slashdotdash
Copy link
Member

slashdotdash commented Nov 29, 2016

Commanded is currently restricted to run on a single node only.

Using a library such as Swarm would help to run on a cluster of nodes.

Must consider how to deal with split brain scenarios.

@edimoldovan
Copy link

Any plans on this one?

@slashdotdash
Copy link
Member Author

@edimoldovan I don't have immediate plans to work on this, but want to provide support for clusters at some point. Feel free to provide any input you have.

My initial thoughts were to look at an implementation using @bitwalker's Swarm library.

@bitwalker
Copy link

You'll have to make decisions about the various CAP tradeoffs, as Swarm makes tradeoffs for the use case it was designed for, and others like Riak Core do the same, so depending on your use case, you'll want to evaluate which of those fits best and then go check out building an implementation. Swarm was designed with being a high performance global process registry in mind - it is eventually consistent, and is willing to duplicate everything in both partitions to stay available during netsplits, and then resolve conflicts when the partition is healed - but it almost certainly means one side's changes are going to need to be dropped, but how that's handled is up to you (but the default behaviour is whichever node "owns" the key is kept and the other data dropped, so that the behavior is deterministic). So if that level of consistency is acceptable, then Swarm might be a good fit, otherwise Riak Core is where I'd look.

@slashdotdash
Copy link
Member Author

Thanks for the feedback @bitwalker. I was aware of Riak Core but hadn't considered it as an option, so will investigate further.

@aflatter
Copy link

Just a thought: Is direct coordination between commanded nodes required to implement this? Hosting a commanded-based application on Heroku would probably not be possible if instances need to talk to each other.

@slashdotdash
Copy link
Member Author

@aflatter Yes, it would be necessary for the nodes to communicate with each other to support running as a cluster.

@aflatter
Copy link

@slashdotdash Can you expand a bit on the features that are part of "running as a cluster"? Would it be enough to add optimistic writes for event streams and a reliable message bus in the event store to allow multiple instances of an application?

@slashdotdash
Copy link
Member Author

slashdotdash commented Jun 1, 2017

@aflatter Commanded, and the Event Store, make use of OTP processes to guarantee serialized concurrent access to event streams and aggregates. They are designed to run as singleton processes, one process per logical stream/aggregate.

For a cluster of nodes there needs to be a distributed process registry to locate the process and send it messages. This would require node to node communication.

@slashdotdash
Copy link
Member Author

slashdotdash commented Aug 21, 2017

I've submitted a pull request to Swarm to support consistency during a network partition (#38). This ensures only a single instance of a process is running in the cluster. A requirement for distributing aggregate, event handler, and process manager processes.

@slashdotdash
Copy link
Member Author

@edimoldovan This issue is now under active development.

@drozzy
Copy link

drozzy commented Aug 22, 2017

Why... is this needed? EventStore itself cannot be clustered (at this time).

I'm just wondering if this is adding extra complexity, without actual use cases (sorry to be a downer).

@astery
Copy link
Contributor

astery commented Aug 23, 2017

Ben working on EventStore to be clustered. I'm waiting this to have easier way to manage memory consumption.

@slashdotdash
Copy link
Member Author

slashdotdash commented Aug 23, 2017

@drozzy I'm currently working on adding support to the Elixir Event Store, using Swarm for process distribution (#53). I plan to apply the same approach to Commanded. Greg's Event Store already supports running on a cluster. There have been a number of requests for this feature. The drivers are reliability (run multiple nodes so that your app continues running when a node crashes) and to support rolling deployments.

@slashdotdash slashdotdash self-assigned this Aug 23, 2017
@rosacris
Copy link

I've been struggling with this for quite a while now. We are experimenting with event sourcing and we wanted to come up with an event store that could be scaled horizontally in the future. We tried to get around of giving a total order to the events, but things get really complex.
Do you have any ideas on the design of a multi-master event store? The closest thing I can imagine is a distributed immutable log with persisted queues for subscriptions (similar to kafka). A weaker alternative could be sharding aggregates (each aggregate history on a single node), and then the subscriptions should keep a cursor for each node.

@slashdotdash
Copy link
Member Author

@rosacris You might be interested to look at how Eventuate deals with replication using causal consistency (Vector clocks). An event sourced system only needs to guarantee ordering of events within a single stream (aggregate). So you could use that to shard on, as you mentioned.

You could use an aggregate stream (e.g. $all in Greg's Event Store) that is built up by merging events from all other streams to give a global ordering of events. This could be a real stream containing copied events, or just an index to the existing events. Potentially using causal consistency to guarantee ordering of cause-and-effect events. This means that concurrent events may be copied to the aggregated $all stream in any order, as long as they don't have any relationship. But events that are caused by another event must be appended afterwards. Once events are copied into the $all stream their order is permanent. Subscribers then have a single index to track for their position in the "stream of streams".

@ssboisen
Copy link

ssboisen commented Sep 6, 2017

@rosacris We use Cassandra as an eventstore at Lix. Cassandras Lightweight transactions allow us to guarantee serialisability within one aggregate. Cassandra is very much horizontal scaleable if that's your need. We use an in-house event-sourcing framework which we havn't yet had the time to clean up enough to make it ready for open source.

@rosacris
Copy link

rosacris commented Sep 6, 2017

@slashdotdash thanks for the suggestions, that's what what we ended up doing. The key observation here is that we only need a total order among the events of the same aggregate, the $all stream can be any serialization of the global partial-order. Is it ok to say that if in any case there should be a causal dependency between events of different aggregates, it should be enforced by a process manager? (thinking the process manager as a way to specify a bound to all the valid event interleaving).

@ssboisen I was not aware of lightweight transactions! We ditched cassandra for the moment and used MySQL instead (mainly because that's what our legacy system uses). We are also in the middle of cleaning up our ES framework to open source it.

@rosacris
Copy link

rosacris commented Sep 7, 2017

@ssboisen I was checking out Cassandra Lightweight Transactions, but I am unable to see how do they help in the case of the event store. How do you model the tables? One row per aggregate?

@ssboisen
Copy link

ssboisen commented Sep 8, 2017

@rosacris LWT works inside a partition so one partition per aggregate and one row per batch. Something like INSERT INTO keyspace.events (aggregate_id, batch_id, events) VALUES ('bla', 5, '[...]') IF NOT EXISTS where aggregate_root_id and batch_id are primary key.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

8 participants