Skip to content

Commit

Permalink
initial import
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Dec 3, 2009
0 parents commit caf969d
Show file tree
Hide file tree
Showing 10 changed files with 1,326 additions and 0 deletions.
12 changes: 12 additions & 0 deletions mcache/src/mcache.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
% vim:syn=erlang
{application, mcache,
[{description, "memcached client application"},
{vsn, "%VSN%"},
{modules, [%MODULES%]},
{registered, []},
{mod, {mcache_app, []}},
{env, []},
{applications, [kernel,stdlib]}
]
}.

193 changes: 193 additions & 0 deletions mcache/src/mcache.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
-module(mcache).
-author('echou327@gmail.com').

-compile([inline, native]).
-export([get_server/2, get/2, mget/1, mget/2, set/5, set/4, delete/2, mget2/2]).

-define(SEP, ":").
-define(MGET_TIMEOUT, 1000).
-define(FMT_RAW, 0).
-define(FMT_BJSON, 100). % not implemented yet.
-define(FMT_NATIVE, 101).
-define(FMT_JSON, 102).
-define(FMT_INT, 103).

get(Class, Key) ->
{Key1, Server, _DefaultExpiry} = get_server(Class, Key),
{_, Value} = mcache_client:mc_get(Server, Key1),
decode_value(Value).

mget(Class, [Key]) ->
case ?MODULE:get(Class, Key) of
undefined -> [];
Value -> [{Key, Value}] % already decoded in get()
end;
mget(Class, [_|_]=Keys) ->
KeyDict = lists:foldl(
fun(K, Acc) ->
{K1, Server, _DefaultExpiry} = get_server(Class, K),
RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
mcache_client:ab_get(Server, RealKey),
orddict:store(RealKey, K, Acc)
end, orddict:new(), Keys),
ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
Result = orddict:fold(fun(RealKey, Key, Acc) ->
case dict:find(RealKey, ValueDict) of
error -> Acc;
{ok, undefined} -> Acc;
{ok, Value} -> [{Key, decode_value(Value)}|Acc]
end
end, [], KeyDict),
lists:reverse(Result).


mget2(Class, [_|_]=Keys) ->
{KeyDict, ServerDict} = lists:foldl(
fun(K, {KAcc,SAcc}) ->
{K1, Server, _DefaultExpiry} = get_server(Class, K),
RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
{orddict:store(RealKey, K, KAcc), dict:append(Server, RealKey, SAcc)}
end, {orddict:new(), dict:new()}, Keys),
dict:fold(fun(Server, Keys1, Acc) ->
mcache_client:ab_mget(Server, Keys1),
Acc
end, nil, ServerDict),
ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
Result = orddict:fold(fun(RealKey, Key, Acc) ->
case dict:find(RealKey, ValueDict) of
error -> Acc;
{ok, undefined} -> Acc;
{ok, Value} -> [{Key, decode_value(Value)}|Acc]
end
end, [], KeyDict),
lists:reverse(Result).

mget([{Class, Key}]) ->
case ?MODULE:get(Class, Key) of
undefined -> [];
Value -> [{{Class, Key}, Value}] % already decoded in get()
end;
mget([{_Class, _Keys}|_] = KeyPairs) ->
KeyDict = lists:foldl(
fun({Class, K}=Key, Acc) ->
{K1, Server, _DefaultExpiry} = get_server(Class, K),
K2 = list_to_binary(K1),
mcache_client:ab_get(Server, K2),
orddict:store(K2, Key, Acc)
end, orddict:new(), KeyPairs),
ValueDict = mget_receive(length(KeyPairs), 1000, dict:new()),
Result = orddict:fold(fun(RealKey, Key, Acc) ->
case dict:find(RealKey, ValueDict) of
error -> Acc;
{ok, undefined} -> Acc;
{ok, Value} -> [{Key, decode_value(Value)}|Acc]
end
end, [], KeyDict),
lists:reverse(Result).

set(Class, Key, Value, Format, Expiry) ->
{Key1, Server, DefaultExpiry} = get_server(Class, Key),
{Value1, Flags} = encode_value(Value, Format),
Expiry1 = encode_expiry(Expiry, DefaultExpiry),
mcache_client:ab_set(Server, Key1, Value1, Flags, Expiry1),
ok.

set(Class, Key, Value, Format) ->
set(Class, Key, Value, Format, default).

delete(Class, Key) ->
{Key1, Server} = get_server(Class, Key),
mcache_client:ab_delete(Server, Key1),
ok.

% internal functions

mget_receive(0, _Timeout, D) ->
D;
mget_receive(_N, Timeout, D) when Timeout =< 0 ->
D;
mget_receive(N, Timeout, D) ->
Now = app_util:now(),
receive
{Ref, {mget, Items}} when is_reference(Ref) ->
Now1 = app_util:now(),
TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
D1 = dict:merge(fun(_K,_V1,V2) -> V2 end, D, Items),
mget_receive(N-dict:size(Items), TimeoutLeft, D1);
{Ref, {Key, Value}}=Msg when is_reference(Ref) ->
Now1 = app_util:now(),
TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
mget_receive(N-1, TimeoutLeft, dict:store(Key, Value, D));
Any ->
io:format("~p~n", [Any]),
Now1 = app_util:now(),
TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
mget_receive(N, TimeoutLeft, D)
after Timeout ->
D
end.


cast([H]) when H>255;is_atom(H) ->
cast(H);
cast([H|L]) when H>255;is_atom(H) ->
[cast(H),":"|cast(L)];
cast(V) when is_tuple(V) ->
cast(tuple_to_list(V));
cast(V) when is_list(V); is_binary(V) ->
V;
cast(V) when is_atom(V) ->
atom_to_list(V);
cast(V) when is_integer(V) ->
integer_to_list(V).

map_key(Class, Key) ->
[cast(Class), ":"|cast(Key)].

get_server(Class, Key) ->
Key1 = map_key(Class, Key),
{Pool, Expiry} = mcache_expires:expire(Class),
Server = mcache_continuum:find(Pool, mcache_util:hash(Key1, md5)),
{Key1, Server, Expiry}.


encode_value(Value, raw) ->
{Value, ?FMT_RAW};
encode_value(Value, native) ->
{term_to_binary(Value), ?FMT_NATIVE};
encode_value(Value, json) ->
{eep0018:encode(Value), ?FMT_JSON};
encode_value(Value, int) ->
{<<Value:32>>, ?FMT_INT}.

decode_value(undefined) ->
undefined;
decode_value(not_found) ->
undefined;
decode_value({Data, ?FMT_RAW}) ->
Data;
decode_value({Data, ?FMT_NATIVE}) ->
binary_to_term(Data);
decode_value({Data, ?FMT_JSON}) ->
eep0018:decode(Data);
decode_value({<<Int:32>>, ?FMT_INT}) ->
Int.

encode_expiry(default, DefaultExpiry) ->
encode_expiry1(DefaultExpiry);
encode_expiry(Expiry, _DefaultExpiry) ->
encode_expiry1(Expiry).

encode_expiry1(infinity) ->
0;
encode_expiry1({X, seconds}) ->
X;
encode_expiry1({X, minutes}) ->
X*60;
encode_expiry1({X, hours}) ->
X*3600;
encode_expiry1({X, days}) when X<30->
X*86400;
encode_expiry1(X) when is_integer(X) ->
X.

61 changes: 61 additions & 0 deletions mcache/src/mcache_app.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-module(mcache_app).
-author('echou327@gmail.com').

-behaviour(application).
-behaviour(supervisor).

-export([start/2, stop/1, config_change/3]).
-export([init/1]).


-export([restart/0, start/0, start_link/0, stop/0]).

% API

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

start() ->
application:start(mcache).

stop() ->
application:stop(mcache).

restart() ->
io:format("[MCACHE_APP] restart() ~n", []),
mcache_util:reload_config([mcache]),
?MODULE:stop(),
?MODULE:start().

% application callbacks

start(_Type, _Args) ->
?MODULE:start_link().

stop(_State) ->
ok.

config_change(_Changed, _New, _Removed) ->
ok.

% supervisor callback
init([]) ->
io:format("[MCACHE] Starting~n"),
Specs = specs([{mcache_client_sup, 1}, % must be 1
{mcache_config, 1}]), % must be 1
{ok, {{one_for_one, 10, 10}, Specs}}.

% supervisor local functions
specs(Specs) ->
lists:foldl(fun({Module, Count}, Acc) ->
Acc ++ app_util:sup_child_spec(Module, fun one_spec/2, Count)
end, [], Specs).

one_spec(mcache_config, Id) ->
PoolsConfig = app_util:get_app_env(pools, []),
ExpiresConfig = app_util:get_app_env(expires, []),
{Id, {mcache_config, start_link, [ {PoolsConfig, ExpiresConfig} ]}, permanent, 2000, worker, []};
one_spec(mcache_client_sup, Id) ->
{Id, {mcache_client_sup, start_link, []}, permanent, infinity, supervisor, []};
one_spec(Module, Id) ->
{Id, {Module, start_link, []}, permanent, 2000, worker, []}.
Loading

0 comments on commit caf969d

Please sign in to comment.