Skip to content

Commit

Permalink
Abcast functions implemented in edis_db for support of inter-node dat…
Browse files Browse the repository at this point in the history
…abase functionality
  • Loading branch information
Joachim committed Mar 27, 2013
1 parent 0a81d71 commit df10d2c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/edis_command_runner.erl
Expand Up @@ -104,7 +104,7 @@ handle_cast({run, Cmd, Args}, State) ->
lager:warning("Invalid password.~n", []),
tcp_err(<<"invalid password">>, State#state{authenticated = false});
_:Error ->
lager:error("Error in db ~p: ~p~n", [State#state.db_index, Error]),
lager:error("Error in db ~p: ~p~nStack: ~p", [State#state.db_index, Error, erlang:get_stacktrace()]),
tcp_err(parse_error(Cmd, Error), State)
end.

Expand Down
155 changes: 91 additions & 64 deletions src/edis_db.erl
Expand Up @@ -212,14 +212,12 @@ handle_call(#edis_command{cmd = <<"MGET">>, args = Keys}, _From, State) ->
end, {ok, []}, Keys),
{reply, Reply, stamp(Keys, read, State)};
handle_call(#edis_command{cmd = <<"MSET">>, args = KVs}, _From, State) ->
Reply =
(State#state.backend_mod):write(
State#state.backend_ref,
[{put, Key,
#edis_item{key = Key, encoding = raw,
type = string, value = Value}} || {Key, Value} <- KVs]),
Reply = db_write([{put, Key,
#edis_item{key = Key, encoding = raw,
type = string, value = Value}} || {Key, Value} <- KVs], State),
{reply, Reply, stamp([K || {K, _} <- KVs], write, State)};
handle_call(#edis_command{cmd = <<"MSETNX">>, args = KVs}, _From, State) ->
handle_call(#edis_command{cmd = <<"
">>, args = KVs}, _From, State) ->
{Reply, Action} =
case lists:any(
fun({Key, _}) ->
Expand All @@ -228,11 +226,9 @@ handle_call(#edis_command{cmd = <<"MSETNX">>, args = KVs}, _From, State) ->
true ->
{{ok, 0}, read};
false ->
ok = (State#state.backend_mod):write(
State#state.backend_ref,
[{put, Key,
#edis_item{key = Key, encoding = raw,
type = string, value = Value}} || {Key, Value} <- KVs]),
ok = db_write([{put, Key,
#edis_item{key = Key, encoding = raw,
type = string, value = Value}} || {Key, Value} <- KVs], State),
{{ok, 1}, write}
end,
{reply, Reply, stamp([K || {K, _} <- KVs], Action, State)};
Expand All @@ -257,11 +253,12 @@ handle_call(#edis_command{cmd = <<"SETBIT">>, args = [Key, Offset, Bit]}, _From,
{reply, Reply, stamp(Key, write, State)};
handle_call(#edis_command{cmd = <<"SETEX">>, args = [Key, Seconds, Value]}, _From, State) ->
Reply =
(State#state.backend_mod):put(
State#state.backend_ref, Key,
db_put(
Key,
#edis_item{key = Key, type = string, encoding = raw,
expire = edis_util:now() + Seconds,
value = Value}),
value = Value},
State),
{reply, Reply, stamp(Key, write, State)};
handle_call(#edis_command{cmd = <<"SETNX">>, args = [Key, Value]}, From, State) ->
handle_call(#edis_command{cmd = <<"MSETNX">>, args = [{Key, Value}]}, From, State);
Expand Down Expand Up @@ -294,7 +291,7 @@ handle_call(#edis_command{cmd = <<"DEL">>, args = Keys}, _From, State) ->
DeleteActions =
[{delete, Key} || Key <- Keys, exists_item(State#state.backend_mod, State#state.backend_ref, Key)],
Reply =
case (State#state.backend_mod):write(State#state.backend_ref, DeleteActions) of
case db_write(DeleteActions, State) of
ok -> {ok, length(DeleteActions)};
{error, Reason} -> {error, Reason}
end,
Expand All @@ -315,7 +312,7 @@ handle_call(#edis_command{cmd = <<"EXPIREAT">>, args = [Key, Timestamp]}, _From,
Now when Timestamp =< Now -> %% It's a delete (it already expired)
case exists_item(State#state.backend_mod, State#state.backend_ref, Key) of
true ->
case (State#state.backend_mod):delete(State#state.backend_ref, Key) of
case db_delete(Key, State) of
ok -> {ok, true};
{error, Reason} -> {error, Reason}
end;
Expand Down Expand Up @@ -374,7 +371,7 @@ handle_call(#edis_command{cmd = <<"MOVE">>, args = [Key, NewDb]}, _From, State)
Item ->
try run(process(NewDb), #edis_command{cmd = <<"-INTERNAL-RECV">>, args = [Item]}) of
ok ->
case (State#state.backend_mod):delete(State#state.backend_ref, Key) of
case db_delete(Key, State) of
ok ->
{{ok, true}, write};
{error, Reason} ->
Expand All @@ -391,8 +388,10 @@ handle_call(#edis_command{cmd = <<"-INTERNAL-RECV">>, args = [Item]}, _From, Sta
{Reply, Action} =
case exists_item(State#state.backend_mod, State#state.backend_ref, Item#edis_item.key) of
true -> {{error, found}, read};
false -> {(State#state.backend_mod):put(
State#state.backend_ref, Item#edis_item.key, Item), write}
false -> {db_put(
Item#edis_item.key,
Item,
State), write}
end,
{reply, Reply, stamp(Item#edis_item.key, Action, State)};
handle_call(#edis_command{cmd = <<"OBJECT REFCOUNT">>, args = [Key]}, _From, State) ->
Expand Down Expand Up @@ -464,9 +463,7 @@ handle_call(#edis_command{cmd = <<"RENAME">>, args = [Key, NewKey]}, _From, Stat
{error, Reason} ->
{error, Reason};
Item ->
(State#state.backend_mod):write(State#state.backend_ref,
[{delete, Key},
{put, NewKey, Item#edis_item{key = NewKey}}])
db_write([{delete, Key},{put, NewKey, Item#edis_item{key = NewKey}}], State)
end,
{reply, Reply, stamp([Key, NewKey], write, State)};
handle_call(#edis_command{cmd = <<"RENAMENX">>, args = [Key, NewKey]}, _From, State) ->
Expand All @@ -481,9 +478,7 @@ handle_call(#edis_command{cmd = <<"RENAMENX">>, args = [Key, NewKey]}, _From, St
true ->
{{ok, false}, read};
false ->
ok = (State#state.backend_mod):write(State#state.backend_ref,
[{delete, Key},
{put, NewKey, Item#edis_item{key = NewKey}}]),
ok = db_write([{delete, Key},{put, NewKey, Item#edis_item{key = NewKey}}], State),
{{ok, true}, write}
end
end,
Expand Down Expand Up @@ -522,9 +517,10 @@ handle_call(C = #edis_command{cmd = <<"SORT">>, args = [Key, Options = #edis_sor
{reply, {ok, []}, NewState};
{reply, {ok, Sorted}, NewState} ->
Reply =
case (State#state.backend_mod):put(
NewState#state.backend_ref, Destination,
#edis_item{key = Destination, type = list, encoding = linkedlist, value = Sorted}) of
case db_put(
Destination,
#edis_item{key = Destination, type = list, encoding = linkedlist, value = Sorted},
State) of
ok -> {ok, erlang:length(Sorted)};
{error, Reason} -> {error, Reason}
end,
Expand All @@ -544,7 +540,7 @@ handle_call(#edis_command{cmd = <<"HDEL">>, args = [Key | Fields]}, _From, State
Item#edis_item{value = NewDict}}
end) of
{ok, {Deleted, 0}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{{ok, Deleted}, write};
{ok, {Deleted, _}} ->
{{ok, Deleted}, write};
Expand Down Expand Up @@ -798,7 +794,7 @@ handle_call(#edis_command{cmd = <<"LPOP">>, args = [Key]}, _From, State) ->
end
end, {keep, undefined}) of
{ok, {delete, Value}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{ok, Value};
{ok, {keep, Value}} ->
{ok, Value};
Expand Down Expand Up @@ -945,7 +941,7 @@ handle_call(#edis_command{cmd = <<"LTRIM">>, args = [Key, Start, Stop]}, _From,
end, ok) of
{ok, ok} -> ok;
{error, empty} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
ok;
{error, Reason} -> {error, Reason}
end,
Expand All @@ -965,7 +961,7 @@ handle_call(#edis_command{cmd = <<"RPOP">>, args = [Key]}, _From, State) ->
end
end, {keep, undefined}) of
{ok, {delete, Value}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{ok, Value};
{ok, {keep, Value}} ->
{ok, Value};
Expand Down Expand Up @@ -1006,8 +1002,7 @@ handle_call(#edis_command{cmd = <<"RPOPLPUSH">>, args = [Source, Destination]},
not_found -> {put, Destination, #edis_item{key = Destination, type = list, encoding = hashtable, value = edis_lists:from_list([Value])}};
DestinationItem -> {put, Destination, DestinationItem#edis_item{value = edis_lists:push(Value,DestinationItem#edis_item.value)}}
end,
case (State#state.backend_mod):write(State#state.backend_ref,
[SourceAction,DestinationAction]) of
case db_write([SourceAction,DestinationAction], State) of
ok -> {ok, Value};
{error, Reason} -> {error, Reason}
end
Expand Down Expand Up @@ -1090,15 +1085,15 @@ handle_call(#edis_command{cmd = <<"SDIFF">>, args = [Key | Keys]}, _From, State)
handle_call(#edis_command{cmd = <<"SDIFFSTORE">>, args = [Destination | Keys]}, From, State) ->
case handle_call(#edis_command{cmd = <<"SDIFF">>, args = Keys}, From, State) of
{reply, {ok, []}, NewState} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{reply, {ok, 0}, stamp(Keys, read, stamp(Destination, write, NewState))};
{reply, {ok, Members}, NewState} ->
Value = gb_sets:from_list(Members),
Reply =
case (State#state.backend_mod):put(
State#state.backend_ref,
case db_put(
Destination,
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value}) of
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value},
State) of
ok -> {ok, gb_sets:size(Value)};
{error, Reason} -> {error, Reason}
end,
Expand All @@ -1123,15 +1118,15 @@ handle_call(#edis_command{cmd = <<"SINTER">>, args = Keys}, _From, State) ->
handle_call(#edis_command{cmd = <<"SINTERSTORE">>, args = [Destination | Keys]}, From, State) ->
case handle_call(#edis_command{cmd = <<"SINTER">>, args = Keys}, From, State) of
{reply, {ok, []}, NewState} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{reply, {ok, 0}, stamp(Keys, read, stamp(Destination, write, NewState))};
{reply, {ok, Members}, NewState} ->
Value = gb_sets:from_list(Members),
Reply =
case (State#state.backend_mod):put(
State#state.backend_ref,
case db_put(
Destination,
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value}) of
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value},
State) of
ok -> {ok, gb_sets:size(Value)};
{error, Reason} -> {error, Reason}
end,
Expand Down Expand Up @@ -1180,8 +1175,7 @@ handle_call(#edis_command{cmd = <<"SMOVE">>, args = [Source, Destination, Member
end
end
end,
case (State#state.backend_mod):write(State#state.backend_ref,
[SourceAction,DestinationAction]) of
case db_write([SourceAction,DestinationAction], State) of
ok -> {ok,true};
{error, Reason} -> {error, Reason}
end
Expand All @@ -1201,7 +1195,7 @@ handle_call(#edis_command{cmd = <<"SPOP">>, args = [Key]}, _From, State) ->
end
end, undefined) of
{ok, {delete, Member}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{ok, Member};
OtherReply ->
OtherReply
Expand Down Expand Up @@ -1240,7 +1234,7 @@ handle_call(#edis_command{cmd = <<"SREM">>, args = [Key | Members]}, _From, Stat
end
end, 0) of
{ok, {delete, Count}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{ok, Count};
OtherReply ->
OtherReply
Expand All @@ -1263,15 +1257,15 @@ handle_call(#edis_command{cmd = <<"SUNION">>, args = Keys}, _From, State) ->
handle_call(#edis_command{cmd = <<"SUNIONSTORE">>, args = [Destination | Keys]}, From, State) ->
case handle_call(#edis_command{cmd = <<"SUNION">>, args = Keys}, From, State) of
{reply, {ok, []}, NewState} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{reply, {ok, 0}, stamp(Keys, read, stamp(Destination, write, NewState))};
{reply, {ok, Members}, NewState} ->
Value = gb_sets:from_list(Members),
Reply =
case (State#state.backend_mod):put(
State#state.backend_ref,
case db_put(
Destination,
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value}) of
#edis_item{key = Destination, type = set, encoding = hashtable, value = Value},
State) of
ok -> {ok, gb_sets:size(Value)};
{error, Reason} -> {error, Reason}
end,
Expand Down Expand Up @@ -1339,20 +1333,20 @@ handle_call(#edis_command{cmd = <<"ZINTERSTORE">>, args = [Destination, Weighted
ZSet ->
case zsets:size(ZSet) of
0 ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{ok, 0};
Size ->
case (State#state.backend_mod):put(
State#state.backend_ref,
case db_put(
Destination,
#edis_item{key = Destination, type = zset, encoding = skiplist, value = ZSet}) of
#edis_item{key = Destination, type = zset, encoding = skiplist, value = ZSet},
State) of
ok -> {ok, Size};
{error, Reason} -> {error, Reason}
end
end
catch
_:empty ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{ok, 0};
_:Error ->
lager:error("~p~n", [Error]),
Expand Down Expand Up @@ -1432,7 +1426,7 @@ handle_call(#edis_command{cmd = <<"ZREM">>, args = [Key | Members]}, _From, Stat
end
end, 0) of
{ok, {delete, Count}} ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Key),
_ = db_delete(Key, State),
{ok, Count};
OtherReply ->
OtherReply
Expand Down Expand Up @@ -1541,20 +1535,20 @@ handle_call(#edis_command{cmd = <<"ZUNIONSTORE">>, args = [Destination, Weighted
ZSet ->
case zsets:size(ZSet) of
0 ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{ok, 0};
Size ->
case (State#state.backend_mod):put(
State#state.backend_ref,
case db_put(
Destination,
#edis_item{key = Destination, type = zset, encoding = skiplist, value = ZSet}) of
#edis_item{key = Destination, type = zset, encoding = skiplist, value = ZSet},
State) of
ok -> {ok, Size};
{error, Reason} -> {error, Reason}
end
end
catch
_:empty ->
_ = (State#state.backend_mod):delete(State#state.backend_ref, Destination),
_ = db_delete(Destination, State),
{ok, 0};
_:Error ->
lager:error("~p~n", [Error]),
Expand Down Expand Up @@ -1618,8 +1612,16 @@ handle_call(X, _From, State) ->
{stop, {unexpected_request, X}, {unexpected_request, X}, State}.

%% @hidden
-spec handle_cast(X, state()) -> {stop, {unexpected_request, X}, state()}.
handle_cast(X, State) -> {stop, {unexpected_request, X}, State}.
-spec handle_cast({write|put|delete, term()}, state()) -> {noreply, state()}.
handle_cast({db_write, Actions}, State) ->
(State#state.backend_mod):write(State#state.backend_ref, Actions),
{noreply, State};
handle_cast({db_put, Destination, EdisItem}, State) ->
(State#state.backend_mod):put(State#state.backend_ref, Destination, EdisItem),
{noreply, State};
handle_cast({db_delete, Destination}, State) ->
(State#state.backend_mod):delete(State#state.backend_ref, Destination),
{noreply, State}.

%% @hidden
-spec handle_info(term(), state()) -> {noreply, state(), hibernate}.
Expand Down Expand Up @@ -1957,3 +1959,28 @@ retrieve(Mod, Ref, Element, Pattern) ->
not_found -> undefined;
{error, _Reason} -> undefined
end.



db_write(Actions, State) ->
(State#state.backend_mod):write(State#state.backend_ref, Actions),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_write, Actions}),
ok.

db_put(Destination, EdisItem, State) ->
(State#state.backend_mod):put(
State#state.backend_ref,
Destination,
EdisItem),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_put, Destination, EdisItem}),
ok.

db_delete(Destination, State) ->
(State#state.backend_mod):delete(State#state.backend_ref, Destination),
abcast = gen_server:abcast(nodes(), process(State#state.index), {db_delete, Destination}),
ok.

%%lager:notice("Nodelist: ~n~n\t\t~p Result: ~p~n~n", [Nodes, Result]).



1 comment on commit df10d2c

@Joachim31415
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OBSERVATIONS FROM MANUAL TESTS:

CHECK - get/set works, as db_write is called in these -> db_write works
CHECK - setex works, as db_put is called in this -> db_put works. TTL seems to work as well, i suppose there is some minor discrepancy tho
CHECK - move works, as db_delete is called in this -> db_delete works

ERROR - Functions using sets, hashes and lists and are not using any of my db_* functions and do therefore not get mirrored in databases at other nodes

Please sign in to comment.