Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ds): implement raft-based replication #12361

Merged
merged 33 commits into from
Mar 20, 2024

Conversation

keynslug
Copy link
Contributor

@keynslug keynslug commented Jan 21, 2024

Fixes EMQX-11756.

Summary

Uses Raft through https://github.com/rabbitmq/ra to replicate writes across nodes: each shard is now a Raft consensus group. Replicas are guaranteed to not diverge, therefore we're free to pick any replica to perform reads.

  • Total order of messages in each shard
  • Session replay tests work
  • All tests are green

Roadmap

This is mostly out-of-scope for this PR (maybe except for first two), but good to have written down.

  1. Measure performance impact.

    Read performance shouldn't really suffer. Things to explore: issue release_cursor less often, make smarter assumptions about shard servers, do not waste cycles on constructing names / IDs.

  2. Perform rest of shard-level operations through consensus.

    Namely: adding generations. At first look it seems important because of snapshotting. Initially we could make it work synchronously, where adding a generation will block message writes. Then think about doing asynchronously while preserving safety.

  3. Solve snapshotting story.

    We could relax strictness and take the most recent snapshots, instead of taking them at the specified Raft index. Supposedly, this should be safe as long as writes are idempotent, and repeating write operations already present in a snapshot will not accidentally cause write duplication. At first glance, the only thing needed is making add generation part of Raft logs.

  4. Design good first start UX story.

    In other words: how operators are supposed to ensure that the cluster they want to deploy will start and fairly allocate shards between them. Currently node in the cluster does not know how large the cluster is supposed to be, and could steal too much shards / replicas.

  5. Perform cluster-wide changes through consensus.

    Related to the above. Super important to unlock safe rebalancing. Quite tricky because of uncertainty about initial cluster membership.

  6. Implement rebalancing.

  7. Skip RocksDB WAL.

    This is a rather long-term goal. We already have a Raft log which serves basically the same purpose, so disabling RocksDB WAL and taking control over dumping memtables / releasing Raft log entries should help to avoid keeping two logs.

PR Checklist

Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:

  • Added tests for the changes
  • Added property-based tests for code which performs user input validation
  • Changed lines covered in coverage report
  • Change log has been added to changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md files
  • For internal contributor: there is a jira ticket to track this change
  • Created PR to emqx-docs if documentation update is required, or link to a follow-up jira ticket
  • Schema changes are backward compatible

@@ -412,11 +410,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
]) ->
binary().
make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
UniqueInteger = erlang:unique_integer([monotonic, positive]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: it's still possible to have duplicate timestamps at high concurrency/rates, so shouldn't we keep this random part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want total order of messages per stream, this PR just enforces total order per shard through allocating unique pseudo-realtime timestamps to messages (admittedly, in a quite ugly way). Since we don't want timestamps to occasionally jump backwards during leader election it's important to have that as part of Raft log / state, and uniqueness (i.e. strict monotonicity) is trivial to add on top of that.

With that we don't need this extra uniqueness, which furthermore is not really guaranteed to be unique across shard's replica set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about if that could also be a problem, now from the opposite side. 😅

When moving session data from mnesia to DS, I took advantage of the fact that the message timestamps are respected by the storage layer to upsert keys. Now that emqx_ds_replication_layer:assign_timestamps enforces unique timestamps, it'll no longer be possible to upsert a key if one wants to do that in a particular DB. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that emqx_ds_replication_layer:assign_timestamps enforces unique timestamps, it'll no longer be possible

Conceptually, this needs to be enforced only for messages that needs to be replayed in a strict order. If we need to accommodate for another storage semantics, nothing should stop us from expanding API here, e.g. adding flags or properties to store batch command.

@@ -430,17 +453,23 @@ ensure_site() ->
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
create_shards(DB, NShards, ReplicationFactor) ->
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = sites(),
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
Nodes = mria_mnesia:running_nodes(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Nodes = mria_mnesia:running_nodes(),
Nodes = emqx:running_nodes(),

?

log_init_args => #{uid => UID}
}),
case Servers of
[LocalServer | _] ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: why do we trigger election if Servers/replica set has the local server at the head? Is it a way to make only one server trigger the election?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Apparently ra servers do not start elections automatically, and we need to poke them, thus this quick attempt to trigger it only once. Needs some testing to verify that it works well enough.

%%

init(#{db := DB, shard := Shard}) ->
_ = erlang:put(emqx_ds_db_shard, {DB, Shard}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: is there a particular reason not to put this in the state map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My line of thinking is that State is essentially a product of applying Raft log entries, and is periodically snapshotted and dumped to disk. This tuple feels odd as part of state / snapshot, it's more like a runtime-only information needed to locate relevant Erlang processes. On the other hand I agree that it feels hacky to put it in the process dictionary.

Copy link
Member

@ieQu1 ieQu1 Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't object to using process dictionary too much, as long as the terms are static.
Process dictionary becomes rather sticky when it's used to store temporary data, because it doesn't get cleaned up on exception. This is not the case here.

Messages = assign_timestamps(Timestamp, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, Options),
%% NOTE: Last assigned timestamp.
NLatest = Timestamp + length(Messages) - 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: m.b. return from NLatest from assign_timestamps

@@ -94,9 +94,11 @@
-type next_result() :: next_result(iterator()).

%% Timestamp
%% Each message must have unique timestamp.
Copy link
Member

@ieQu1 ieQu1 Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage layer introduces a "unique integer" of its own, and keeps the original timestamp. (Admittedly, it was a temporary kludge) It looks like this PR shifts the responsibility of making the timestamps unique to the application.

Some thoughts on this:

  1. I think emqx_ds should consume and return unchanged messages, including the timestamp

  2. We shouldn't move the responsibility of making message timestamps unique to the API consumer (it will introduce a footgun for the API users); it's better if the replication layer takes care of it in a discreet manner.

Moreover, if the replication layer is able to create its own "internal" timestamps, we have more options in the way of dealing with the clock skews. For example, (in the future) we can order the messages according to the leader's clock, which will make things more predictable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR shifts the responsibility of making the timestamps unique to the application

To the replication layer actually.

We shouldn't move the responsibility of making message timestamps unique to the API consumer

Agree. This PR doesn't move the responsibility though. The replication layer modifies message timestamps to ensure uniqueness. API consumer will definitely be surprised though, but it's a PoC-grade hack at this point.

we can order the messages according to the leader's clock

They are already ordered according to the leader's clock (to be more precise, shard's consensus group tracks strictly monotonically increasing clock).

@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch from c199c0b to 58bd42b Compare February 12, 2024 18:24
@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch from 0e832db to d6327fc Compare March 8, 2024 18:09
@@ -91,6 +93,31 @@ map(F, S) ->
end
end.

%% @doc Zip a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams.
-spec zip([stream(X)]) -> stream([X]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: with this name, I'd expect the typespec to be something like zip([stream(_)]) -> stream(tuple()), similar to lists:zip, Enum.zip and others. But I see how this has the same "shape" as a stream of tuples. 🤔

Also, this function seems to be unused in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably rename it. Agree that it brokes expectations of what _zip_s are, however I found that having it emit rows as lists is a bit more usable overall, especially given it's generic in the number of things being zipped (e.g. one can do zipwith/2 through erlang:apply/2).

Also, this function seems to be unused in this PR?

Yeah it was used in the previous iteration. Do you think it's better to drop it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's better to drop it?

I think it's fine to leave it here, since it's tested.

@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch from d6327fc to 9242cf4 Compare March 12, 2024 17:43
@keynslug keynslug marked this pull request as ready for review March 12, 2024 19:07
@keynslug keynslug requested review from a team and lafirest as code owners March 12, 2024 19:07
@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch from 9242cf4 to f9df2a7 Compare March 12, 2024 19:47
@keynslug keynslug changed the title wip: implement raft-based replication feat(ds): implement raft-based replication Mar 18, 2024
@@ -431,8 +380,8 @@ ensure_site() ->
{ok, [Site]} ->
ok;
_ ->
Site = crypto:strong_rand_bytes(8),
logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]),
Site = binary:encode_hex(crypto:strong_rand_bytes(4)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change from 8 to 4? Not a huge win in terms of storage, but probability of collision in a 20-node cluster in terms of birthday paradox goes up from ~1e-17 to ~1e-8.

undefined ->
ok = persistent_term:put(__X_Key, __X_Value = (EXPR)),
__X_Value;
__X_Value ->
Copy link
Member

@ieQu1 ieQu1 Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous macro. If someone invokes it several times in the same function, they will get a badmatch. I would prefer it to be rewritten as a function that takes an argument possibly created with the help of a macro.
Also, from reading it I don't see why it should be a macro.

}
),
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
end.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
end.
end;
handle_info(_Info, State) ->
{noreply, State}
end.

ok = start_egresses(DB, Shards),
ok = save_db_meta(DB, Shards),
ok = save_shards_meta(DB, Shards),
State#{shards => Shards, status := ready};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
State#{shards => Shards, status := ready};
{ok, State#{shards => Shards, status := ready}};

do_delete_next_v4/5
do_delete_next_v4/5,

%% FIXME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant?

@@ -412,11 +410,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
]) ->
binary().
make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
UniqueInteger = erlang:unique_integer([monotonic, positive]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about if that could also be a problem, now from the opposite side. 😅

When moving session data from mnesia to DS, I took advantage of the fact that the message timestamps are respected by the storage layer to upsert keys. Now that emqx_ds_replication_layer:assign_timestamps enforces unique timestamps, it'll no longer be possible to upsert a key if one wants to do that in a particular DB. 🤔

{ok, {SupFlags, Children}}.

start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
DataDir = filename:join([emqx:data_dir(), DB, dsrepl]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataDir = filename:join([emqx:data_dir(), DB, dsrepl]),
DataDir = filename:join([emqx_ds:base_dir(), DB, dsrepl]),

Comment on lines 587 to 600
-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
server_state().
handle_update_config(S0 = #s{schema = Schema}, Since, Options) ->
Prototype = maps:get(storage, Options),
S = S0#s{schema = Schema#{prototype := Prototype}},
handle_add_generation(S, Since).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this doesn't always return the server state: it may return {error, Reason}.

Since = emqx_message:timestamp_now(),
S = add_generation(S0, Since),
handle_call(#call_add_generation{since = Since}, _From, S0) ->
S = #s{} = handle_add_generation(S0, Since),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to check for {error, _} here.

Since = emqx_message:timestamp_now(),
S = add_generation(S1, Since),
handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
S = #s{} = handle_update_config(S0, Since, Options),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to check for {error, _} here.

@@ -469,10 +439,11 @@ do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->

-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
do_add_generation_v2(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
%% FIXME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant?

Still very rough but mostly working.
This is needed to ensure total message order for a shard, and
guarantee that no messages will be written "in the past". which
may break replay consistency.
Before this commit the most likely shard allocation outcome was
that all shard are allocated to just one node.
It doesn't feel right, but right now is the easiest way to have it
in the scope of `apply/3`, because `init/1` doesn't actually invoked
for ra machines recovered from the existing log / snapshot.
That starts shard and egress processes only when shards are fully
allocated.
1. This avoids the need to deserialize the message to get the timestamp.
2. It also makes possible to decouple the storage key timestamp from the
   message timestamp, which might be useful for replication purposes.
@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch 2 times, most recently from b3df14a to 5a46b82 Compare March 19, 2024 20:19
start_link(DB, Shard, Opts) ->
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).

shard_servers(DB, Shard) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (can be done in a follow-up): it would be nice to have type specs for all the exported functions.

ieQu1
ieQu1 previously approved these changes Mar 19, 2024
keynslug and others added 15 commits March 20, 2024 13:20
Storage also becomes a bit more pure, depending on the upper layer to
provide the timestamps, which also makes it possible to handle more
operations idempotently.
In order to minimize chances of site id collision to practically zero.
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
Also update few outdated typespecs. Also make error reasons easier
to comprehend.
Also annotate internal exports with comments according with their
intended use.
Before this commit, messages in the current batch will be retried as
part of next batch. This could have led to message duplication which is
probably not what the user wants by default.
@keynslug keynslug force-pushed the ft/EMQX-11756/emqx-ds-replication branch from 082d3f3 to e55e1dd Compare March 20, 2024 12:20
ieQu1
ieQu1 previously approved these changes Mar 20, 2024
@keynslug keynslug merged commit e10d43c into emqx:master Mar 20, 2024
167 checks passed
@emqxqa
Copy link

emqxqa commented May 8, 2024

Test Execution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants