Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

improve mcache:mget a lot.

  • Loading branch information...
commit dd09498cfe70336e440abc45c6139db8aa0a083d 1 parent e336c4a
@echou authored
View
89 lib/mcache/src/mcache.erl
@@ -1,44 +1,44 @@
-module(mcache).
-author('echou327@gmail.com').
-%-compile([inline, native, {hipe, o3}]).
+-compile([inline]). %, native, {hipe, o3}]).
-export([get_server/2, get/2, mget/2, set/5, set/4, delete/2]).
-define(SEP, ":").
-define(MGET_TIMEOUT, 500).
-define(FMT_RAW, 0).
-%-define(FMT_BJSON, 100).
-define(FMT_NATIVE, 101).
-define(FMT_JSON, 102).
-define(FMT_INT, 103).
+
get(Class, Key) ->
{Key1, Server, _DefaultExpiry} = get_server(Class, Key),
{_, Value} = mcache_client:mc_get(Server, Key1),
decode_value(Value).
mget(Class, [_|_]=Keys) ->
+ erlang:yield(),
{KeyDict, ServerDict} = lists:foldl(
fun(K, {KAcc,SAcc}) ->
{K1, Server, _DefaultExpiry} = get_server(Class, K),
RealKey = iolist_to_binary(K1), % must convert to binary because response key is a binary
- {orddict:store(RealKey, K, KAcc), dict:append(Server, RealKey, SAcc)}
- end, {orddict:new(), dict:new()}, Keys),
- dict:fold(fun(Server, Keys1, Acc) ->
- mcache_client:ab_mget(Server, Keys1),
- Acc
- end, nil, ServerDict),
- ValueDict = mget_receive(orddict:size(KeyDict), ?MGET_TIMEOUT, dict:new()),
- %ValueDict = dict:new(),
- Result = orddict:fold(fun(RealKey, Key, Acc) ->
- case dict:find(RealKey, ValueDict) of
- error -> Acc;
- {ok, undefined} -> Acc;
- {ok, Value} -> [{Key, decode_value(Value)}|Acc]
- end
- end, [], KeyDict),
- lists:reverse(Result).
+ {dict:append(RealKey, K, KAcc), dict:append(Server, RealKey, SAcc)}
+ end, {dict:new(), dict:new()}, Keys),
+ Ref = erlang:make_ref(),
+ dict:map(fun(Server, Ks) -> mcache_client:ab_mget(Server, Ref, Ks) end, ServerDict),
+ %lib:flush_receive(),
+ Results = mget_receive(dict:size(KeyDict), Ref, ?MGET_TIMEOUT*1000, []),
+ flat_foldl(
+ fun({RealKey, Val}, LAcc) ->
+ case dict:find(RealKey, KeyDict) of
+ {ok, Key} ->
+ [{Key, decode_value(Val)}|LAcc];
+ _ ->
+ LAcc
+ end
+ end, [], Results).
set(Class, Key, Value, Format, Expiry) ->
{Key1, Server, DefaultExpiry} = get_server(Class, Key),
@@ -57,37 +57,44 @@ delete(Class, Key) ->
% internal functions
+flat_foldl(_Fun, Acc, []) ->
+ Acc;
+flat_foldl(Fun, Acc, [H|T]) ->
+ Acc1 = case H of
+ [] -> Acc;
+ [_|_] -> flat_foldl(Fun, Acc, H);
+ _ -> Fun(H, Acc)
+ end,
+ flat_foldl(Fun, Acc1, T).
+
my_now() ->
erlang:now().
-mget_receive(0, _Timeout, D) ->
- D;
-mget_receive(_N, Timeout, D) when Timeout =< 0 ->
- D;
-mget_receive(N, Timeout, D) ->
+wrap_items(nil, L) ->
+ L;
+wrap_items(Items, L) ->
+ [Items|L].
+
+mget_receive(0, _Ref, _Timeout, L) ->
+ L;
+mget_receive(_N, _Ref, Timeout, L) when Timeout =< 0 ->
+ L;
+mget_receive(N, Ref, Timeout, L) ->
Now = my_now(),
+ TimeoutMillis = Timeout div 1000,
receive
Any ->
- case Any of
- {Ref, {mget, Items}} when is_reference(Ref) ->
- Now1 = my_now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- D1 = dict:merge(fun(_K,_V1,V2) -> V2 end, D, Items),
- mget_receive(N-dict:size(Items), TimeoutLeft, D1);
- {Ref, {Key, Value}}=Msg when is_reference(Ref) ->
- Now1 = my_now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- mget_receive(N-1, TimeoutLeft, dict:store(Key, Value, D));
- _ ->
- Now1 = my_now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- mget_receive(N, TimeoutLeft, D)
- end
- after Timeout ->
- D
+ Now1 = my_now(),
+ T1 = Timeout - timer:now_diff(Now1, Now),
+ {L1, N1} = case Any of
+ {Ref, {mget, NumKeys, Items}} -> {wrap_items(Items, L), N-NumKeys};
+ _ -> {L, N}
+ end,
+ mget_receive(N1, Ref, T1, L1)
+ after TimeoutMillis ->
+ L
end.
-
cast([H]) when H>255;is_atom(H) ->
cast(H);
cast([H|L]) when H>255;is_atom(H) ->
View
64 lib/mcache/src/mcache_client.erl
@@ -7,7 +7,7 @@
-export([start_link/1]).
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,code_change/3,terminate/2]).
--export([mc_get/2, ab_get/2, mc_mget/2, ab_mget/2, mc_set/5, ab_set/5, mc_delete/2, ab_delete/2]).
+-export([mc_get/2, ab_get/2, mc_mget/2, ab_mget/3, mc_set/5, ab_set/5, mc_delete/2, ab_delete/2]).
-include_lib("kernel/src/inet_int.hrl").
@@ -21,7 +21,7 @@
-include("mcache_binary_frame.hrl").
-record(pending, {from, time}).
--record(mget_pending, {from, time, items}).
+-record(mget_pending, {from, time, num_keys, items}).
start_link({Host, Port}) ->
gen_server:start_link(?MODULE, [{Host, Port}], []).
@@ -68,8 +68,7 @@ handle_call(_Req, _From, State) ->
send_wrapper({mget, Keys}=Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
case (catch send_req(Sock, Seq, Req)) of
true ->
- Items = lists:foldl(fun(K, Dict) -> dict:store(K, undefined, Dict) end, dict:new(), Keys),
- P = #mget_pending{from=From, time=erlang:now(), items=Items},
+ P = #mget_pending{from=From, time=erlang:now(), num_keys=length(Keys), items=nil},
{ok, State#state{seq=Seq+1, pendings=dict:store(Seq,P,Pendings)}};
_ ->
{not_sent, State}
@@ -88,6 +87,11 @@ send_wrapper(Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
handle_cast({mc, _Req}, #state{sock=not_connected}=State) ->
{noreply, State};
+handle_cast({mc_ab, {mget, From, Keys}}, State) ->
+ %error_logger:info_msg("handle_cast(mget, ~p, ~p)~n", [Pid, Keys]),
+ {_, State1} = send_wrapper({mget, Keys}, From, State),
+ {noreply, State1};
+
handle_cast({mc, Req}, #state{sock=Sock, seq=Seq}=State) ->
case (catch send_req(Sock, Seq, Req)) of
ok ->
@@ -147,14 +151,18 @@ socket_close(#state{sock=Sock}=State) when is_port(Sock) ->
catch gen_tcp:close(Sock),
socket_close(State#state{sock=not_connected}).
+prepend_list(Key, Value, nil) ->
+ [{Key,Value}];
+prepend_list(Key, Value, L) ->
+ [{Key,Value}|L].
+
% Handles mget sequences (getkq, getkq, ..., noop)
handle_one_resp(#resp{opcode=getkq, seq=Seq}=Resp, Pendings) ->
case dict:find(Seq, Pendings) of
{ok, #mget_pending{items=Items}=P} ->
case decode_resp(Resp) of
{ok, {Key, Value}} ->
- NewItems = dict:store(Key, Value, Items),
- dict:store(Seq, P#mget_pending{items=NewItems}, Pendings);
+ dict:store(Seq, P#mget_pending{items=prepend_list(Key,Value,Items)}, Pendings);
_ ->
Pendings
end;
@@ -164,8 +172,8 @@ handle_one_resp(#resp{opcode=getkq, seq=Seq}=Resp, Pendings) ->
handle_one_resp(#resp{opcode=noop,seq=Seq}, Pendings) ->
case dict:find(Seq, Pendings) of
- {ok, #mget_pending{from=From, items=Items}} ->
- gen_server:reply(From, {mget, Items});
+ {ok, #mget_pending{from=From, num_keys=NumKeys, items=Items}} ->
+ gen_server:reply(From, {mget, NumKeys, Items});
_ ->
ok
end,
@@ -187,17 +195,16 @@ handle_one_resp(#resp{seq=Seq}=Resp, Pendings) ->
dict:erase(Seq, Pendings).
flush_pendings(Pendings, Result) ->
- dict:fold(
- fun(_Seq, #pending{from=From}, Any) ->
- gen_server:reply(From, Result),
- Any;
- (_Seq, #mget_pending{from=From, items=Items}, Any) ->
+ dict:map(
+ fun(_Seq, #pending{from=From}) ->
+ gen_server:reply(From, Result),
+ ok;
+ (_Seq, #mget_pending{from=From, items=Items}) ->
gen_server:reply(From, {mget, Items}),
- Any;
- (_, _, Any) ->
- Any
+ ok;
+ (_, _) ->
+ ok
end,
- nil,
Pendings).
@@ -227,23 +234,6 @@ async_connect({A,B,C,D}=_Addr, Port, Opts, Time) ->
exit(badarg)
end.
-% don't use it any more
-do_parse_packet(<<16#81, Opcode, KeyLen:16, ExtraLen, DataType, Status:16, TotalBodyLen:32, Seq:32, CAS:64,
- Extra:ExtraLen/binary, Key:KeyLen/binary, Rest/binary>>, Acc) ->
- BodyLen = TotalBodyLen - ExtraLen - KeyLen,
- <<Body:BodyLen/binary, Rest1/binary>> = Rest,
- Resp= #resp{seq=Seq,
- opcode=mcache_proto:opcode(Opcode),
- status=mcache_proto:status(Status),
- data_type=DataType,
- cas=CAS,
- extra=Extra,
- key=Key,
- body=Body},
- do_parse_packet(Rest1, [Resp|Acc]);
-do_parse_packet(Data, Acc) ->
- {Data, lists:reverse(Acc)}.
-
do_send_req(Sock, [_|_]=Reqs) ->
erlang:port_command(Sock, [ mcache_binary_frame:encode(Req) || Req <- Reqs ]);
do_send_req(Sock, Req) ->
@@ -322,8 +312,10 @@ ab_get(Server, Key) ->
mc_mget(Server, Keys) ->
gen_server:call(get_client_pid(Server), {mc, {mget, Keys}}).
-ab_mget(Server, Keys) ->
- gen_server:call(get_client_pid(Server), {mc_ab, {mget, Keys}}).
+ab_mget(Server, Ref, Keys) ->
+ % gen_server:call(get_client_pid(Server), {mc_ab, {mget, Keys}}).
+ gen_server:cast(get_client_pid(Server), {mc_ab, {mget, {self(), Ref}, Keys}}).
+
mc_set(Server, Key, Value, Flags, Expiry) ->
gen_server:call(get_client_pid(Server), {mc, {set, Key, Value, Flags, Expiry}}).
View
9 lib/mcache/src/reloader.erl
@@ -102,7 +102,14 @@ doit(From, To) ->
io:format("Error reading ~s's file info: ~p~n",
[Filename, Reason]),
error
- end || {Module, Filename} <- code:all_loaded(), is_list(Filename)].
+ end || {Module, Filename} <- code:all_loaded(), is_custom_module(Filename)].
+
+is_custom_module(Filename) when is_atom(Filename) ->
+ false;
+is_custom_module([]) ->
+ false;
+is_custom_module(Filename) ->
+ lists:prefix(code:root_dir(), Filename) =:= false.
stamp() ->
erlang:localtime().
Please sign in to comment.
Something went wrong with that request. Please try again.