Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Multi-cluster support #816

Closed
wants to merge 27 commits into from

4 participants

@shino
Collaborator

[WIP] DO NOT MERGE

needs basho/stanchion#76

This PR is for early-stage review.
The branch may be force-pushed because of rebase/ammend/etc.


Currently all Riak CS's internal data types are stored into single
Riak cluster. The idea is to distribute the keys among clusters
according to the data types.

From client, it looks single riak cs system as it is currently.

Data Structure Change

Add clusterid entry into each bucket record, which indicates in
which cluster the object manifest is stored. Add (or Reuse)
clusterid entry into each object manifest, which indicates in
which cluster the blocks are stored.

This enables Riak CS to store object manifests and blocks into
separate clusters.

How to choose cluster to store blocks

When PUT request came from users, CS node gets the bucket record
and retrieves clusterid to store the object manifest.

(Beforehand, when bucket being created Riak CS chooses the cluster to
store object manifests by the same logic from choosing cluster to
store the blocks => open question?)

Then Riak CS puts the object manifest into the cluster indicated by
the clusterid. Inside the object manifest, there are another clusterid
to put blocks, decided by the following manner:

  • get estimated free space of each Riak cluster(*)
  • filter out the cluster which doesn't have enough free space to store more objects
  • probabilistically choose a Riak cluster depending on the amount of free space

Then kick a riak_cs_block_server.

(*) "free space of Riak cluster" can be defined in several ways.
One candidate is as follows:

  • For each Riak node, decide free percentage from disksup:get_disk_data() by using the partition containing paltform_data_dir.
  • For cluster's free percentage, take minimum of each node's free percentage. Taking minimum rather than average is for safety to avoid disk full.
This was referenced
@shino shino added the Enhancement label
shino added some commits
@shino shino WIP: isolate block cluster 281abf0
@shino shino Fix bug of GC connection configuration 91e9f60
@shino shino Shorten module name fa32591
@shino shino WIP: wighted random selection logic 4e21490
@shino shino WIP: Store free spaces and weights information to Riak bae1db6
@shino shino Add container ID to bucket record in moss.buckets b91d343
@shino shino WIP: Add container ID to bucket record in user record in moss.users bebcf4f
@shino shino WIP: Check-out another riakc in multi-cluster settings to manipulate …
…manifests
e02205e
@shino shino WIP: Store/fetch manifests to/from the cluster in bucket record 5d2509b
@shino shino Pass default riakc pid to block server
When one configures system with single block cluster and multiple
manifest containers (although it is not probable), block_server should
use default riakc.
8a29c0a
@shino shino Make Object ACL and MP upload multi-cluster-aware 3794744
@shino shino Make list objects multi-cluster-aware 6554339
@shino shino Add script to control allocation weights 107badf
@shino shino Fix bug of weight initialization 32cb3e8
@shino shino Use proper riakc connection in storage calculation 416e4f4
@shino shino Change the term "container" to "bag"
A bag represent a set of riak clusters which are related via MDC repl.
The reason of this change is because the term "container" is used
in Swift.
d586898
@shino shino Fix storage calcuation bug a7c033d
@shino shino Add a worker process to GET from / PUT to riak 8d56750
@shino shino Separate connection pools for list objects 651684c
@shino shino Use app environment to determine whether multi-bag is used 586641a
@shino shino Fix bugs in eunit and pulse test cases 27d538d
@shino shino Revert wrongly pushed config files
This reverts commit d586898 partially.
c5c759a
@shino shino Add cleanup of connection pool resource 3114797
@shino shino Add a riak_test test case for simple disjoint bag configuration
All new buckets are assigned bag-B to store manifests and
all new manifests are assigned bag-C to store blocks.

| bag   | manfiest |   block |
|-------+----------+---------|
| bag-A |  default | default |
| bag-B |      100 |     --- |
| bag-C |      --- |     100 |

Under this configuration, assert manifests and blocks are
stored to expected bags and whole contents throught S3 API.
de9e6cb
@shino shino Refactor: change (multi-)container or mc to bag ca22224
@shino shino Simplify PB connection information, just use IP:port 86af575
@shino shino Add a transition riak_test from default single bag to three bags 667a521
@kuenishi kuenishi commented on the diff
src/riak_cs_bag.erl
((43 lines not shown))
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
+-include("riak_cs.hrl").
+-include("riak_cs_bag.hrl").
+
+-type allocate_type() :: manifest | block.
+-type pool_type() :: request_pool | bucket_list_pool.
+%% Use "bag ID" instead of "cluster ID".
+%% There are more than one clusters in case of MDC.
+-type bag_id() :: binary().
+-type pool_key() :: {pool_type(), bag_id()}.
+-type weight_info() :: #weight_info{}.
+
+-spec is_multi_bag_ebabled() -> boolean().
+is_multi_bag_ebabled() ->
@kuenishi Owner

typo!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag.erl
((6 lines not shown))
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% ---------------------------------------------------------------------
+
+%% @doc Support multi Riak clusters in single Riak CS system
@kuenishi Owner

It'd be better to have high level idea of 'bag', what it exactly means, difference from other concepts such as cluster id.

@kuenishi Owner
kuenishi added a note

like this:

%% - user records are in default bag
%% - bucket records are in default bag
%% - whole manifests in a bucket are in a same bag, whose bag id is indicated in the bucket data
%% - whole blocks of an object are in a same bag, whose bag id is indicated in the manifest
%%
%% - storage/access usage are in default buckets
@kuenishi Owner
kuenishi added a note

I also prefer there should be app.config example here; while it is a bit early to put there. So maybe here's the best place to put configuration example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi self-assigned this
@kuenishi
Owner

The term clusterid changed to bag ?

@kuenishi kuenishi commented on the diff
src/riak_cs_bag_worker.erl
((142 lines not shown))
+handle_weight_info_list({ok, Obj}, State) ->
+ %% TODO: How to handle siblings
+ [Value | _] = riakc_obj:get_values(Obj),
+ Weights = binary_to_term(Value),
+ riak_cs_bag_server:new_weights(Weights),
+ {ok, Weights, State#state{failed_count = 0, weights = Weights}}.
+
+schedule(State) ->
+ Interval = refresh_interval_msec(),
+ Ref = erlang:send_after(Interval, self(), refresh_by_timer),
+ State#state{timer_ref = Ref}.
+
+json_to_weight_info_list({struct, JSON}) ->
+ json_to_weight_info_list(JSON, []).
+
+json_to_weight_info_list([], WeightInfoList) ->
@kuenishi Owner

With jsonx we could have created simple decode/1 function ...

@kuenishi Owner

Like this.

encoder() ->
    jsonx:encoder([{weight_info, record_info(fields, weight_info)}],
                  [{ignore, [null]}]).

to_json(#weight_info{}=W) ->
    E = encoder(),
    case E(W) of
        Bin when is_binary(Bin) -> Bin;
        Other -> error(Other)
    end.

Maybe we can write a simple record<->JSON mapper with lists:zip .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag.erl
((56 lines not shown))
+
+-spec is_multi_bag_ebabled() -> boolean().
+is_multi_bag_ebabled() ->
+ application:get_env(riak_cs, multi_bag) =/= undefined.
+
+%% Return pool specs from application configuration.
+%% This function assumes that it is called ONLY ONCE at initialization.
+%% TODO: return specs from ETS after initialization?
+-spec pool_specs(term()) -> [{atom(), {non_neg_integer(), non_neg_integer()}}].
+pool_specs(MasterPoolConfig) ->
+ init_ets(),
+ case application:get_env(riak_cs, multi_bag) of
+ undefined ->
+ [];
+ {ok, BagConfig} ->
+ register_pools(BagConfig, BagConfig, MasterPoolConfig, [])
@kuenishi Owner

I prefer this style for readability and testability:

 {ok, BagConfigs} ->
   PoolRecords = [bag_config_to_pool_record(BagConfigs, MasterPoolConfig)
                          || BagConfig <- BagConfigs ],
   true = ets:insert(?ETS_TAB, PoolRecords),
   PoolSpecs = [bag_config_to_pool_spec(BagConfig, MasterPoolConfig)
                          || BagConfig <- BagConfigs ]
 end.

bag_config_to_pool_record({BagIdStr, Address, Port}, MasterPoolConfig) ->
    blah blah blah
    #pool{ key = ... }.

bag_config_to_pool_spec({BagIdStr, Address, Port}, MasterPoolConfig) ->
    Name = list_to_atom(....),
             ...
    {Name, ...}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi
Owner

Just want to make sure, is these correct? :

  • user data is in default bag
  • bucket data are in default bag
  • whole manifests in a bucket are in a same bag, whose bag id is indicated in the bucket data
  • whole blocks of an object are in a same bag, whose bag id is indicated in the manifest
@shino
Collaborator

@kuenishi Yes. All are correct.

@kuenishi
Owner
  • get_fsm path
  • put_fsm path
  • list objects path
  • bucket CRUD path
  • user CRUD path
  • storage/access calculation path
  • balancing system
  • stanchion review again
  • heartful hand-made tests
  • r_t review

I would be happy if a tiny documentation on basic design is written in wiki (or maybe TODO for me).

@shino
Collaborator

I have not mention TODOs, sorry.

Two items are not implemented yet:

  • GC (only collect objects in the default riak cluster only)
  • MDC proxy_get
@kuenishi kuenishi commented on the diff
src/riak_cs_storage.erl
((18 lines not shown))
{error, _} = E ->
_ = lager:error("failed to calculate usage of "
"bucket '~s' of user '~s'. Reason: ~p",
- [Bucket, User, E]),
- {Bucket, iolist_to_binary(io_lib:format("~p", [E]))}
+ [Name, User, E]),
+ {Name, iolist_to_binary(io_lib:format("~p", [E]))}
+ end.
+
+-spec sum_bucket_with_pool(pid(), cs_bucket()) -> term() | {error, term()}.
+sum_bucket_with_pool(DefaultRiakc, ?RCS_BUCKET{name=Name} = Bucket) ->
+ case riak_cs_bag:pool_name(request_pool, Bucket) of
+ undefined ->
+ sum_bucket(DefaultRiakc, Name);
+ PoolName ->
+ %% TODO: riak_cs_utils:with_riak_connection(PoolName, Fun) is useful?
@kuenishi Owner
kuenishi added a note

I don't think so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_server.erl
((52 lines not shown))
+allocate(Type) ->
+ gen_server:call(?SERVER, {allocate, Type}).
+
+new_weights(Weights) ->
+ gen_server:cast(?SERVER, {new_weights, Weights}).
+
+status() ->
+ gen_server:call(?SERVER, status).
+
+init([]) ->
+ {ok, #state{}}.
+
+handle_call({allocate, Type}, _From, #state{initialized = true} = State)
+ when Type =:= block orelse Type =:= manifest ->
+ Decision = case Type of
+ block ->
@kuenishi Owner
kuenishi added a note

weird indentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_server.erl
((57 lines not shown))
+
+status() ->
+ gen_server:call(?SERVER, status).
+
+init([]) ->
+ {ok, #state{}}.
+
+handle_call({allocate, Type}, _From, #state{initialized = true} = State)
+ when Type =:= block orelse Type =:= manifest ->
+ Decision = case Type of
+ block ->
+ decide_bag(State#state.blocks);
+ manifest ->
+ decide_bag(State#state.manifests)
+ end,
+ case Decision of
@kuenishi Owner
kuenishi added a note

I prefer this style:

WeightInfo = case Type of
     block -> State#state.blocks;
     manifest -> State#state.manifests
   end,
 case decide_bag(WeightInfo) of
   ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_server.erl
((103 lines not shown))
+
+%% Decide bag to allocate block/manifest randomly regarding weights
+%% bag weight cummulative-weight point (1..60)
+%% bag1 20 20 1..20
+%% bag2 10 30 21..30
+%% bag3 0 30 N/A
+%% bag4 30 60 31..60
+-spec decide_bag([{riak_cs_bag:pool_key(), riak_cs_bag:weight_info()}]) ->
+ {ok, riak_cs_bag:bag_id()} |
+ {error, no_bag}.
+decide_bag([]) ->
+ {error, no_bag};
+decide_bag(WeightInfoList) ->
+ %% TODO: SumOfWeights can be stored in state
+ SumOfWeights = lists:sum([Weight || #weight_info{weight = Weight} <- WeightInfoList]),
+ Point = random:uniform(SumOfWeights),
@kuenishi Owner
kuenishi added a note

Just and idea: what if we can make this deterministic ? Say, using crypto:bytes_to_integer(crypto:md5(term_to_binary({Bucket, Key, UUID}))) rem SumOfWeights would enable us reproduce specific behavior.

@kuenishi Owner
kuenishi added a note

Though #weight_info{} is not so deterministic ...

If possible, anything random we do should have an option to pass a random number seed in, so we can make deterministic tests.

@shino Collaborator
shino added a note

I prefer deterministic way. @kuenishi 's suggestion sounds nice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_server.erl
((98 lines not shown))
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Decide bag to allocate block/manifest randomly regarding weights
+%% bag weight cummulative-weight point (1..60)
+%% bag1 20 20 1..20
+%% bag2 10 30 21..30
+%% bag3 0 30 N/A
+%% bag4 30 60 31..60
+-spec decide_bag([{riak_cs_bag:pool_key(), riak_cs_bag:weight_info()}]) ->
+ {ok, riak_cs_bag:bag_id()} |
+ {error, no_bag}.
+decide_bag([]) ->
@kuenishi Owner
kuenishi added a note

It's also not good to use three different words "decode", "allocate" and "assign" for the same purpose. It would be confusing for people who reads the code in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_worker.erl
((130 lines not shown))
+ riak_cs_utils:close_riak_connection(Riakc),
+ handle_weight_info_list(Result, State);
+ {error, _Reason} = E ->
+ handle_weight_info_list(E, State)
+ end.
+
+handle_weight_info_list({error, notfound}, State) ->
+ lager:debug("Bag weight information is not found"),
+ {ok, [], State#state{failed_count = 0}};
+handle_weight_info_list({error, Reason}, #state{failed_count = Count} = State) ->
+ lager:error("Retrieval of bag weight information failed. Reason: ~p", [Reason]),
+ {error, Reason, State#state{failed_count = Count + 1}};
+handle_weight_info_list({ok, Obj}, State) ->
+ %% TODO: How to handle siblings
+ [Value | _] = riakc_obj:get_values(Obj),
+ Weights = binary_to_term(Value),
@kuenishi Owner
kuenishi added a note

we have to use binary_to_term(Value, [safe]) and handle badarg.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@kuenishi kuenishi commented on the diff
src/riak_cs_bag_worker.erl
((128 lines not shown))
+ {ok, Riakc} ->
+ Result = riakc_pb_socket:get(Riakc, ?WEIGHT_BUCKET, ?WEIGHT_KEY),
+ riak_cs_utils:close_riak_connection(Riakc),
+ handle_weight_info_list(Result, State);
+ {error, _Reason} = E ->
+ handle_weight_info_list(E, State)
+ end.
+
+handle_weight_info_list({error, notfound}, State) ->
+ lager:debug("Bag weight information is not found"),
+ {ok, [], State#state{failed_count = 0}};
+handle_weight_info_list({error, Reason}, #state{failed_count = Count} = State) ->
+ lager:error("Retrieval of bag weight information failed. Reason: ~p", [Reason]),
+ {error, Reason, State#state{failed_count = Count + 1}};
+handle_weight_info_list({ok, Obj}, State) ->
+ %% TODO: How to handle siblings
@kuenishi Owner
kuenishi added a note

According to the weight data structure, I think we can use CRDT-style conflict resolution. Also, it would be much better if we have an interface to change weight info incrementally, because in usual operation cluster expansion are always incremental like adding nodes, removing nodes and not refreshing nodes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@slfritchie
Owner

Closing, pending re-review.

@slfritchie slfritchie closed this
@shino shino deleted the feature/multi-cluster-support branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 3, 2014
  1. @shino

    WIP: isolate block cluster

    shino authored
  2. @shino
  3. @shino

    Shorten module name

    shino authored
  4. @shino
  5. @shino
  6. @shino
  7. @shino
  8. @shino
  9. @shino
  10. @shino

    Pass default riakc pid to block server

    shino authored
    When one configures system with single block cluster and multiple
    manifest containers (although it is not probable), block_server should
    use default riakc.
  11. @shino
  12. @shino
  13. @shino
  14. @shino

    Fix bug of weight initialization

    shino authored
  15. @shino
  16. @shino

    Change the term "container" to "bag"

    shino authored
    A bag represent a set of riak clusters which are related via MDC repl.
    The reason of this change is because the term "container" is used
    in Swift.
  17. @shino

    Fix storage calcuation bug

    shino authored
Commits on Mar 5, 2014
  1. @shino
  2. @shino
Commits on Mar 11, 2014
  1. @shino
  2. @shino
Commits on Mar 12, 2014
  1. @shino

    Revert wrongly pushed config files

    shino authored
    This reverts commit d586898 partially.
Commits on Mar 18, 2014
  1. @shino
  2. @shino

    Add a riak_test test case for simple disjoint bag configuration

    shino authored
    All new buckets are assigned bag-B to store manifests and
    all new manifests are assigned bag-C to store blocks.
    
    | bag   | manfiest |   block |
    |-------+----------+---------|
    | bag-A |  default | default |
    | bag-B |      100 |     --- |
    | bag-C |      --- |     100 |
    
    Under this configuration, assert manifests and blocks are
    stored to expected bags and whole contents throught S3 API.
Commits on Mar 19, 2014
  1. @shino
  2. @shino
Commits on Mar 20, 2014
  1. @shino
Something went wrong with that request. Please try again.