Skip to content

Commit

Permalink
WIP: Store free spaces and weights information to Riak
Browse files Browse the repository at this point in the history
  • Loading branch information
shino committed Mar 3, 2014
1 parent 4e21490 commit bae1db6
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 47 deletions.
22 changes: 6 additions & 16 deletions src/riak_cs_mc.erl
Expand Up @@ -113,23 +113,13 @@ default_container_id (manifest) ->
%% return new manifest
-spec assign_container_id(pool_type(), lfs_manifest()) -> lfs_manifest().
assign_container_id(Type, ?MANIFEST{props = Props} = Manifest) ->
%% TODO: Which is better, ets:select or state in new gen_server?
%% Free space management will require gen_server?
%% After that, ETS may be nice for scalability for read.
MS = ets:fun2ms(fun(#pool{key = Key, type = TypeInRecord})
when TypeInRecord =:= Type ->
Key end),
Ids = [Id || {_, Id} <- ets:select(?ETS_TAB, MS)],
lager:log(warning, self(), "{Type, Ids}: ~p~n", [{Type, Ids}]),
%% FIXME: Must take into account free percentage of each container.
%% Current implementation is totally stub
case length(Ids) of
0 ->
case ets:first(?ETS_TAB) of
%% single container
'$end_of_table' ->
Manifest;
Length ->
random:seed(os:timestamp()),
ContainerId = lists:nth(random:uniform(Length), Ids),
lager:log(warning, self(), "ContainerId: ~p~n", [ContainerId]),
%% multiple containers
_Key ->
{ok, ContainerId} = riak_cs_mc_server:allocate(Type),
Manifest?MANIFEST{props = [{block_container, ContainerId} | Props]}
end.

Expand Down
205 changes: 174 additions & 31 deletions src/riak_cs_mc_server.erl
Expand Up @@ -25,6 +25,7 @@
-behavior(gen_server).

-export([start_link/0]).
-export([allocate/1, status/0, update/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

Expand All @@ -33,10 +34,16 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-define(SERVER, ?MODULE).
-define(SERVER, ?MODULE).
%% Riak's bucket and key to store usage information
-define(USAGE_BUCKET, <<"riak-cs-mc">>).
-define(USAGE_KEY, <<"usage">>).

-define(WEIGHT_MULTIPLIER, 1000).

%% FIXME make it more specific
-record(usage, {
container_id :: riak_cs_mc:container_id(),
weight :: non_neg_integer(),
free :: non_neg_integer(),
total :: non_neg_integer()
Expand All @@ -45,21 +52,61 @@
-record(state, {
interval = timer:minutes(5) :: non_neg_integer(),
blocks = [] :: [{riak_cs_mc:pool_key(), usage()}],
manifests = [] :: [{riak_cs_mc:pool_key(), usage()}]
manifests = [] :: [{riak_cs_mc:pool_key(), usage()}],
failed_count =0 :: non_neg_integer()
}).

start_link() ->
%% FIXME: PUT dummy data
update(dummy_date_for_update()),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

dummy_date_for_update() ->
[{block, [
[{container_id, <<"block-A">>},
{free, 100},
{total, 200}
],
[{container_id, <<"block-B">>},
{free, 150},
{total, 220}
]
]
},
{manifest, []}
].

-spec allocate(riak_cs_mc:pool_type()) -> {ok, riak_cs_mc:container_id()} |
{error, term()}.
allocate(Type) ->
gen_server:call(?SERVER, {allocate, Type}).

status() ->
gen_server:call(?SERVER, status).

%% FreeInfo is expected nested proplists. Example:
%% [{block, [[{container_id, <<"block-A">>},
%% {free, 100},
%% {total, 200}
%% ],
%% [{container_id, <<"block-B">>},
%% {free, 150},
%% {total, 220}
%% ]],
%% },
%% {manifest, [...]}
%% ]
update(FreeInfo) ->
calc_weight_and_put(FreeInfo).

init([]) ->
random:seed(os:timestamp()),
%% FIXME
%% 1. Schedule retreival (in loop)
%% 2. Implement retreival and update functionality (use default connection pool)
{Blocks, Manifests} = retreive_usage_and_calc_weight(),
State = #state{blocks = Blocks, manifests = Manifests},
NewState = get_usage(#state{}),
schedule(),
{ok, State}.
{ok, NewState}.

handle_call({allocate, Type}, _From, State)
when Type =:= block orelse Type =:= manifests->
Expand All @@ -70,6 +117,8 @@ handle_call({allocate, Type}, _From, State)
decide_container(State#state.manifests)
end,
{reply, {ok, ContainerId}, State};
handle_call(status, _From, #state{blocks=Blocks, manifests=Manifests} = State) ->
{reply, {ok, [{blocks, Blocks}, {manifests, Manifests}]}, State};
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.

Expand Down Expand Up @@ -107,33 +156,127 @@ code_change(_OldVsn, State, _Extra) ->
decide_container(Usages) ->
%% TODO: if the sum must be a constant value, we can skip this summation.
%% FIXME: What to do if every usage has weight=0?
SumWeight = lists:sum([Weight || {_PoolKey, #usage{weight = Weight}} <- Usages]),
Point = random:uniform(SumWeight),
SumOfWeights = lists:sum([Weight || #usage{weight = Weight} <- Usages]),
Point = random:uniform(SumOfWeights),
decide_container(Point, Usages).

%% Always Point => 1 holds, usage with weight=0 never selected.
decide_container(Point, [{{_Type, ClusterId}, #usage{weight = Weight}} | _Usages])
decide_container(Point, [#usage{container_id = ContainerId, weight = Weight} | _Usages])
when Point =< Weight ->
ClusterId;
decide_container(Point, [{_PoolKey, #usage{weight = Weight}} | Usages]) ->
ContainerId;
decide_container(Point, [#usage{weight = Weight} | Usages]) ->
decide_container(Point - Weight, Usages).

%% Connect to default cluster and GET {riak-cs-mc, usage}, then recalculate weights.
retreive_usage_and_calc_weight() ->
%% FIXME : this is dummy data
Blocks = [
{{block, <<"block-A">>},
#usage{weight=30, free = 100, total = 200}},
{{block, <<"block-B">>},
#usage{weight=70, free = 150, total = 200}}
],
Manifests = [],
{Blocks, Manifests}.
get_usage(State) ->
case riak_cs_utils:riak_connection() of
{ok, Riakc} ->
Result = riakc_pb_socket:get(Riakc, ?USAGE_BUCKET, ?USAGE_KEY),
riak_cs_utils:close_riak_connection(Riakc),
handle_usage_info(Result, State);
{error, _Reason} = E ->
handle_usage_info(E, State)
end.

handle_usage_info({error, Reason}, #state{failed_count = Count} = State) ->
lager:error("Retrieval of cluster usage information failed. Reason: ~@", [Reason]),
State#state{failed_count = Count + 1};
handle_usage_info({ok, Obj}, State) ->
%% TODO: Should blocks and manifests fields be cleared here?
%% TODO: How to handle siblings?
[Value | _] = riakc_obj:get_values(Obj),
update_usage_state(binary_to_term(Value), State#state{failed_count = 0}).

update_usage_state([], State) ->
State;
update_usage_state([{Type, Usages} | Rest], State) ->
UsageForType = [usage_list_to_record(U, #usage{}) || U <- Usages],
NewState = case Type of
block ->
State#state{blocks = UsageForType};
manifest ->
State#state{manifests = UsageForType}
end,
update_usage_state(Rest, NewState).

usage_list_to_record([], Rec) ->
Rec;
usage_list_to_record([{container_id, C} | Rest], Rec) ->
usage_list_to_record(Rest, Rec#usage{container_id = C});
usage_list_to_record([{weight, W} | Rest], Rec) ->
usage_list_to_record(Rest, Rec#usage{weight = W});
usage_list_to_record([{free, F} | Rest], Rec) ->
usage_list_to_record(Rest, Rec#usage{free = F});
usage_list_to_record([{total, T} | Rest], Rec) ->
usage_list_to_record(Rest, Rec#usage{total = T});
%% Ignore unknown props
usage_list_to_record([_ | Rest], Rec) ->
usage_list_to_record(Rest, Rec).

schedule() ->
%% TODO: GET to riak should be in async.
'NOT_IMPLEMENTED_YET'.

calc_weight_and_put(FreeInfo) ->
Weights = calc_weight(FreeInfo, []),
case riak_cs_utils:riak_connection() of
{ok, Riakc} ->
put_new_weight(Riakc, Weights);
{error, _Reason} = E ->
E
end.

calc_weight([], Acc) ->
Acc;
calc_weight([{Type, FreeInfoPerType} | Rest], Acc) ->
Updated = update_weight(FreeInfoPerType, []),
calc_weight(Rest, [{Type, Updated} | Acc]).

update_weight([], Updated) ->
Updated;
update_weight([ContainerInfo | Rest], Updated) ->
Weight = calc_weight(ContainerInfo),
update_weight(Rest, [[{weight, Weight} | ContainerInfo] | Updated]).

calc_weight(ContainerInfo) ->
Threashold = riak_cs_config:get_env(riak_cs, free_ratio_threashold, 20) / 100,
{free, F} = lists:keyfind(free, 1, ContainerInfo),
{total, T} = lists:keyfind(total, 1, ContainerInfo),
case F / T of
TooSmallFreeSpace when TooSmallFreeSpace =< Threashold ->
0;
FreeRatio ->
trunc((FreeRatio - Threashold) * ?WEIGHT_MULTIPLIER)
end.

put_new_weight(Riakc, Weights) ->
Current = case riakc_pb_socket:get(Riakc, ?USAGE_BUCKET, ?USAGE_KEY) of
{error, notfound} ->
{ok, riakc_obj:new(?USAGE_BUCKET, ?USAGE_KEY)};
{error, Other} ->
{error, Other};
{ok, Obj} ->
{ok, Obj}
end,
update_value(Riakc, Weights, Current).

update_value(Riakc, _Weights, {error, Reason}) ->
riak_cs_utils:close_riak_connection(Riakc),
lager:error("Retrieval of cluster usage information failed. Reason: ~@", [Reason]),
{error, Reason};
update_value(Riakc, Weights, {ok, Obj}) ->
NewObj = riakc_obj:update_value(
riakc_obj:update_metadata(Obj, dict:new()),
term_to_binary(Weights)),
PutRes = riakc_pb_socket:put(Riakc, NewObj),
riak_cs_utils:close_riak_connection(Riakc),
case PutRes of
ok ->
ok;
{error, Reason} ->
lager:error("Update of cluster usage information failed. Reason: ~@", [Reason])
end.

%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand All @@ -143,10 +286,10 @@ decide_container_test() ->
Usages = dummy_usages(),
ListOfPointAndContainerId = [
%% <<"block-Z*">> are never selected
{1, <<"block-A">>},
{10, <<"block-A">>},
{30, <<"block-A">>},
{31, <<"block-B">>},
{ 1, <<"block-A">>},
{ 10, <<"block-A">>},
{ 30, <<"block-A">>},
{ 31, <<"block-B">>},
{100, <<"block-B">>},
{101, <<"block-C">>},
{110, <<"block-C">>},
Expand All @@ -156,13 +299,13 @@ decide_container_test() ->

dummy_usages() ->
[
{{block, <<"block-Z1">>}, #usage{weight=0}},
{{block, <<"block-Z2">>}, #usage{weight=0}},
{{block, <<"block-A">>}, #usage{weight=30}},
{{block, <<"block-B">>}, #usage{weight=70}},
{{block, <<"block-Z3">>}, #usage{weight=0}},
{{block, <<"block-C">>}, #usage{weight=20}},
{{block, <<"block-Z4">>}, #usage{weight=0}}
#usage{container=<<"block-Z1">>, weight= 0},
#usage{container=<<"block-Z2">>, weight= 0},
#usage{container=<<"block-A">>, weight=30},
#usage{container=<<"block-B">>, weight=70},
#usage{container=<<"block-Z3">>, weight= 0},
#usage{container=<<"block-C">>, weight=20},
#usage{container=<<"block-Z4">>, weight= 0}
].

-endif.
3 changes: 3 additions & 0 deletions src/riak_cs_sup.erl
Expand Up @@ -90,6 +90,8 @@ process_specs() ->
PutFsmSup = {riak_cs_put_fsm_sup,
{riak_cs_put_fsm_sup, start_link, []},
permanent, 5000, worker, dynamic},
MC = {riak_cs_mc_server, {riak_cs_mc_server, start_link, []},
permanent, 5000, worker, [riak_cs_mc_server]},
DiagsSup = {riak_cs_diags, {riak_cs_diags, start_link, []},
permanent, 5000, worker, dynamic},
[Archiver,
Expand All @@ -100,6 +102,7 @@ process_specs() ->
DeleteFsmSup,
GetFsmSup,
PutFsmSup,
MC,
DiagsSup].

-spec get_option_val({atom(), term()} | atom()) -> {atom(), term()}.
Expand Down

0 comments on commit bae1db6

Please sign in to comment.