Skip to content

Commit

Permalink
ddfs master: Refactor ddfs_master to respect the opaqueness of gb_set…
Browse files Browse the repository at this point in the history
…s().

Basically, instead of using the false|gb_set() for tag_cache we now use
{boolean(), gb_set()} where the first item shows whether the cache is valid or
not.
See: http://erlang.org/pipermail/erlang-questions/2014-April/078647.html
  • Loading branch information
Shayan Pooya committed Apr 16, 2014
1 parent b8af313 commit b4e1d4d
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions master/src/ddfs/ddfs_master.erl
Expand Up @@ -36,9 +36,9 @@
-type node_info() :: {node(), {non_neg_integer(), non_neg_integer()}}.
-type gc_stats() :: none | gc_run_stats().

-record(state, {tags = gb_trees:empty() :: gb_tree(),
tag_cache = false :: false | gb_set(),
cache_refresher :: pid(),
-record(state, {tags = gb_trees:empty() :: gb_tree(),
tag_cache = {false, gb_sets:empty()} :: {boolean(), gb_set()},
cache_refresher :: pid(),

nodes = [] :: [node_info()],
write_blacklist = [] :: [node()],
Expand Down Expand Up @@ -264,7 +264,7 @@ handle_cast({update_gc_stats, Stats}, S) ->
{noreply, S#state{gc_stats = {Stats, now()}}};

handle_cast({update_tag_cache, TagCache}, S) ->
{noreply, S#state{tag_cache = TagCache}};
{noreply, S#state{tag_cache = {true, TagCache}}};

handle_cast(refresh_tag_cache, #state{cache_refresher = Refresher} = S) ->
Refresher ! refresh,
Expand Down Expand Up @@ -337,13 +337,15 @@ do_new_blob(Obj, K, Include, Exclude, BlackList, Nodes) ->
% Tag request: Start a new tag server if one doesn't exist already. Forward
% the request to the tag server.

-spec get_tag_pid(tagname(), gb_tree(), false | gb_set()) ->
-spec get_tag_pid(tagname(), gb_tree(), {boolean(), gb_set()}) ->
{pid(), gb_tree()}.
get_tag_pid(Tag, Tags, Cache) ->
get_tag_pid(Tag, Tags, {Valid, Cache}) ->
case gb_trees:lookup(Tag, Tags) of
none ->
NotFound = (Cache =/= false
andalso not gb_sets:is_element(Tag, Cache)),
NotFound = case Valid of
true -> not gb_sets:is_element(Tag, Cache);
false -> false
end,
{ok, Server} = ddfs_tag:start(Tag, NotFound),
erlang:monitor(process, Server),
{Server, gb_trees:insert(Tag, Server, Tags)};
Expand All @@ -353,18 +355,22 @@ get_tag_pid(Tag, Tags, Cache) ->

-spec do_tag_request(term(), tagname(), replyto(), state()) ->
state().
do_tag_request(M, Tag, From, #state{tags = Tags, tag_cache = Cache} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, Cache),
do_tag_request(M, Tag, From, #state{tags = Tags, tag_cache = {Valid, Cache}} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, {Valid, Cache}),
gen_server:cast(Pid, {M, From}),
S#state{tags = TagsN,
tag_cache = Cache =/= false andalso gb_sets:add(Tag, Cache)}.
case Valid of
true -> S#state{tags = TagsN, tag_cache = {true, gb_sets:add(Tag, Cache)}};
false -> S#state{tags = TagsN}
end.

-spec do_tag_notify(term(), tagname(), state()) -> state().
do_tag_notify(M, Tag, #state{tags = Tags, tag_cache = Cache} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, Cache),
do_tag_notify(M, Tag, #state{tags = Tags, tag_cache = {Valid, Cache}} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, {Valid, Cache}),
gen_server:cast(Pid, {notify, M}),
S#state{tags = TagsN,
tag_cache = Cache =/= false andalso gb_sets:add(Tag, Cache)}.
case Valid of
true -> S#state{tags = TagsN, tag_cache = {true, gb_sets:add(Tag, Cache)}};
false -> S#state{tags = TagsN}
end.

-spec do_update_nodes(nodes_update(), state()) -> state().
do_update_nodes(NewNodes, #state{nodes = Nodes, tags = Tags} = S) ->
Expand All @@ -388,7 +394,7 @@ do_update_nodes(NewNodes, #state{nodes = Nodes, tags = Tags} = S) ->
S#state{nodes = UpdatedNodes,
write_blacklist = WriteBlacklist,
read_blacklist = ReadBlacklist,
tag_cache = false,
tag_cache = {false, gb_sets:empty()},
tags = gb_trees:empty()};
true ->
S#state{write_blacklist = WriteBlacklist,
Expand Down

0 comments on commit b4e1d4d

Please sign in to comment.