New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ds): add message GC #12338
feat(ds): add message GC #12338
Conversation
5388966
to
384eab8
Compare
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Outdated
Show resolved
Hide resolved
|
||
start_gc() -> | ||
ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB), | ||
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly related to the GC, but adjacent: we want to have the ability to preserve the LTS between the generations. Let's brainstorm how it can be done.
Quick and dirty solution: the storage layer backend can query the metadata CF, and copy the data from its predecessor, if it has the same module. But maybe someone has a less hacky solution.
Preferably, it should be decided before the list_generations
API is committed, since it might require some passage of the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like this inheritance should be part of add_generation
, indeed. 🤔
Since the need for inheritance is storage layer dependent (i.e.: LTS only), and since add_generation
already simply creates a new gen with whatever is configured at the moment, it seems like this check for "if it's same module, inherit this" should be done there and not in the GC (by itself being responsible for this "baton passing").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another dirty solution: we may keep LTS trie out of generation data and never GC it. Just apply thresholds from the current gen when inserting and clean it up on demand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That'll still require some coordination, because in that case the next generation should use the same column family to preserve the metadata. Also, it might introduce a bit of complexity when the impl module changes: if we go from LTS to some other module, then the old trie column family should be dropped if we keep it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another dirty solution: we may keep LTS trie out of generation data and never GC it. Just apply thresholds from the current gen when inserting and clean it up on demand.
This is an interesting idea, actually... However, this means LTS stops being part of the storage layer and becomes an entity of its own.
- On one side, it makes sense since since topics are global.
- On the other hand, not all storage backends need LTS trie.
- If LTS trie becomes a separate entity, we'd need some management interfaces (and a replication path?) for it.
%% See the License for the specific language governing permissions and | ||
%% limitations under the License. | ||
%%-------------------------------------------------------------------- | ||
-module(emqx_ds_proto_v3). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Wouldn't one expect that emqx_ds_proto_v3
proxies RPC calls not to emqx_ds_replication_layer
but to emqx_ds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't initiate this series of modules myself, but I think the reasoning here is that this is the proto module for the emqx_ds
application. See emqx_bridge_proto_v1
for example: it calls 2 modules other than emqx_bridge
.
I guess for emqx
application we made a finer proto module breakdown due to the increased complexity of that application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was my mistake not to call this module emqx_ds_replication_layer_proto_...
from the very beginning, because technically this BPAPI is specific to the builtin replication layer indeed. Not sure if we can change this easily now, though.
%% An opaque term identifying a generation. Each implementation will possibly add | ||
%% information to this term to match its inner structure (e.g.: by embedding the shard id, | ||
%% in the case of `emqx_ds_replication_layer'). | ||
-opaque generation_id() :: term(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generation is a concept from the storage layer that becomes RankY at the emqx_ds
layer. It's best not to redefine this name.
Could this type simply instead be a pair of {RankX, RankY}
? (I called it stream_rank()
on my branch)
For the builtin backend, at least, stream_rank()
is guaranteed to be unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem in perhaps using the existing emqx_ds:stream_rank()
type for it.
But then it might introduce the requirement on other implementation that values of this type should also be unique, besides the ordering properties, right?
We need the value to directly map to an unique generation so that we may drop it later, so can we say stream_rank()
is indeed an unique identifier for the generation? I thought that a generation could hold several streams inside it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example: let's say we have a single generation using LTS, with an empty trie. After the first few messages are stored/iterated over, one or more streams already appear, and after it learns new structure, further streams appear, all in the same generation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we already expose the concept of generation in emqx_ds:add_generation
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the value to directly map to an unique generation so that we may drop it later, so can we say stream_rank() is indeed an unique identifier for the generation? I thought that a generation could hold several streams inside it.
True. When I was writing the remark, I forgot to mention that stream_rank()
is not a good name, but semantically it's the entity that most closely matches the definition of the unit that the garbage collector operates on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this type to generation_rank()
... How about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's stick with it for now. Renaming types is possible without breaking BPAPI, so we don't have to commit to any particular name.
e39c4aa
to
ba691e5
Compare
bbb699e
to
7ee6b79
Compare
trie_restore_existing(Trie, Dump). | ||
|
||
-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie(). | ||
trie_restore_existing(Trie, Dump) -> | ||
lists:foreach( | ||
fun({{StateFrom, Token}, StateTo}) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function must ignore all but wildcard routes. Otherwise it will copy the trie elements created during the learning; we want to drop them to optimize the number of streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. But is there a way to know if a given trie dump entry is part of a larger wildcard route without reconstructing the whole trie? i.e.: just looking at the Token
?
e.g.: For the filter t/+/a
, we still need to insert the static t
and a
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. I mean the algorithm should, of course, copy the full paths, but only as long as there is at least one wildcard in it. I think we can postpone the implementation for later, though.
Please add a changelog entry. |
92f85c7
to
528dfb6
Compare
Nodes = lists:usort(lists:map(fun(Shard) -> node_of_shard(DB, Shard) end, Shards)), | ||
Results = emqx_ds_proto_v3:list_generations_with_lifetimes(Nodes, DB), | ||
%% TODO: Handle errors? | ||
Gens = [Gen || {ok, Gen} <- Results], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to print a warning here.
lists:foreach( | ||
fun(ShardId) -> | ||
emqx_ds_storage_layer:add_generation({DB, ShardId}) | ||
end, | ||
MyShards | ||
). | ||
|
||
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db()) -> ok | {error, _}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Can we pass the shard as an argument and do aggregation on the caller side? It feels like a more fine-grained API.
|
||
-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. | ||
drop_generation(DB, {Shard, GenId}) -> | ||
Node = node_of_shard(DB, Shard), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good for now, but maybe add a TODO that explains that this operation must be propagated, in one way or another, to the entire replica set.
528dfb6
to
7035b4c
Compare
Fixes https://emqx.atlassian.net/browse/EMQX-9748 (rocksdb part only)
Release version: v/e5.6
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update