Skip to content

Commit

Permalink
Part of the vclock implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Joachim committed May 13, 2013
1 parent bca3fac commit 1f4601c
Show file tree
Hide file tree
Showing 3 changed files with 389 additions and 10 deletions.
2 changes: 2 additions & 0 deletions include/edis.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@
type :: edis_db:item_type(),
encoding :: edis_db:item_encoding(),
value :: term(),
vclock = edis_vclock:fresh() :: edis_vclock:vclock(), %type defined in edis_vclock.erl
timestamp = edis_vclock:timestamp() :: integer(),
expire = infinity :: infinity | pos_integer()}).
86 changes: 76 additions & 10 deletions src/edis_db.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%%%-------------------------------------------------------------------
%%% @author Fernando Benavides <fernando.benavides@inakanetworks.com>
%%% @author Chad DePue <chad@inakanetworks.com>
%%% @author Joachim Nilsson <joachim@inakanetworks.com>
%%% @copyright (C) 2011 InakaLabs SRL
%%% @doc edis Database
%%% @todo It's currently delivering all operations to the leveldb instance, i.e. no in-memory management
Expand Down Expand Up @@ -216,8 +217,7 @@ handle_call(#edis_command{cmd = <<"MSET">>, args = KVs}, _From, State) ->
#edis_item{key = Key, encoding = raw,
type = string, value = Value}} || {Key, Value} <- KVs], State),
{reply, Reply, stamp([K || {K, _} <- KVs], write, State)};
handle_call(#edis_command{cmd = <<"
">>, args = KVs}, _From, State) ->
handle_call(#edis_command{cmd = <<"MSETNX">>, args = KVs}, _From, State) ->
{Reply, Action} =
case lists:any(
fun({Key, _}) ->
Expand Down Expand Up @@ -1616,11 +1616,56 @@ handle_call(X, _From, State) ->
handle_cast({db_write, Actions}, State) ->
(State#state.backend_mod):write(State#state.backend_ref, Actions),
{noreply, State};

handle_cast({db_put, Destination, EdisItem}, State) ->
(State#state.backend_mod):put(State#state.backend_ref, Destination, EdisItem),
EdisItemLocal = (State#state.backend_mod):get(State#state.backend_ref, EdisItem),

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

(State#state.backend_mod):get(…) may return not_found, which (with the corresponding BIG comment) should be treated as if the incoming vclock is MORE advanced than the local one

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

Also… (State#state.backend_mod):get(…) expects a key in its second argument, so you should call it with (State#state.backend_ref, Destination)

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Both resolved.

case edis_vclock:descends(EdisItemLocal#edis_item.vclock, EdisItem#edis_item.vclock) of
true ->
%% The case of when the incoming EdisItem vclock is MORE advanced than the local one
db_put(Destination, EdisItem, State)

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

This should just call (State#state.backend_mod):put(…) and not notify anybody else

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Resolved

end,

case edis_vclock:descends(EdisItem#edis_item.vclock, EdisItemLocal#edis_item.vclock) of
true ->
%% The case of when the incoming EdisItem vclock is LESS advanced than the local one
db_put(Destination, EdisItemLocal, State);
false ->
LocalTS = EdisItemLocal#edis_item.timestamp,
IncomingTS = EdisItem#edis_item.timestamp,
DescVClock = edis_vclock:merge([EdisItem#edis_item.vclock, EdisItemLocal#edis_item.vclock]),
case IncomingTS > LocalTS of
true ->
UpdatedVClock = EdisItem#edis_item{vclock = DescVClock},
db_put(Destination, UpdatedVClock, State);
false ->
UpdatedVClock = EdisItemLocal#edis_item{vclock = DescVClock},
db_put(Destination, UpdatedVClock, State)
end
end,
{noreply, State};
handle_cast({db_delete, Destination}, State) ->
(State#state.backend_mod):delete(State#state.backend_ref, Destination),

handle_cast({db_delete, Destination, EdisItem}, State) ->
EdisItemLocal = (State#state.backend_mod):get(State#state.backend_ref, EdisItem),

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

(State#state.backend_mod):get(…) may return not_found, which (with the corresponding BIG comment) should be treated as if the incoming vclock is MORE advanced than the local one

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

Also… (State#state.backend_mod):get(…) expects a key in its second argument, so you should call it with (State#state.backend_ref, Destination)

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Both resolved.

case edis_vclock:descends(EdisItemLocal#edis_item.vclock, EdisItem#edis_item.vclock) of
true ->
%% The case of when the incoming EdisItem vclock is MORE advanced than the local one
db_delete(Destination, State)

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

This should just call (State#state.backend_mod):delete(…) and not notify anybody else

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Resolved

end,

case edis_vclock:descends(EdisItem#edis_item.vclock, EdisItemLocal#edis_item.vclock) of
true ->
%% The case of when the incoming EdisItem vclock is LESS advanced than the local one
db_delete(Destination, State);
false ->
LocalTS = EdisItemLocal#edis_item.timestamp,
IncomingTS = EdisItem#edis_item.timestamp,
case IncomingTS > LocalTS of
true ->
db_delete(EdisItem#edis_item.key, State);
false ->
db_delete(EdisItemLocal#edis_item.key, State)
end
end,
{noreply, State}.

%% @hidden
Expand Down Expand Up @@ -1968,14 +2013,35 @@ db_write(Actions, State) ->
ok.

db_put(Destination, EdisItem, State) ->
IncrementedVClock = EdisItem#edis_item{
vclock = edis_vclock:increment(process(State#state.index), EdisItem#edis_item.vclock),
timestamp = edis_vclock:timestamp()},
(State#state.backend_mod):put(
State#state.backend_ref,
Destination,
EdisItem),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_put, Destination, EdisItem}),
IncrementedVClock),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_put, Destination, IncrementedVClock}),
ok.

db_delete(Destination, State) ->
(State#state.backend_mod):delete(State#state.backend_ref, Destination),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_delete, Destination}),
ok.
EdisItem = (State#state.backend_mod):get(State#state.backend_ref, Destination),
case EdisItem of
#edis_item{vclock = _} ->
NewEdisItem = EdisItem#edis_item{
vclock = edis_vclock:increment(process(State#state.index), EdisItem#edis_item.vclock),
timestamp = edis_vclock:timestamp()},
(State#state.backend_mod):delete(State#state.backend_ref, Destination),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_delete, Destination, NewEdisItem});
#edis_item{} ->

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

This case statement can never match

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Resolved

%% It has to be resolved what to do in respect of vector clocks for this case
{error, bad_item_type};
not_found ->
not_found;

This comment has been minimized.

Copy link
@elbrujohalcon

elbrujohalcon May 13, 2013

Collaborator

Please add a comment stating that "we're assuming that all nodes agreed once that the item should be deleted"

This comment has been minimized.

Copy link
@Joachim31415

Joachim31415 May 13, 2013

Resolved

{error, Reason} ->
lager:error("~p~n", [Reason])
end,
ok.




Loading

0 comments on commit 1f4601c

Please sign in to comment.