Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

integrate memcached_drv into mcache_app and use mcache2.erl to wrap it

  • Loading branch information...
commit 12fc8ad1ff419d69da0ce861211df3ad2a833c14 1 parent 35806af
@echou authored
View
2  lib/mcache/Makefile
@@ -1,5 +1,5 @@
#export ERL_COMPILE_FLAGS += -I $(ERL_TOP)/lib/backend/include
-SUB_DIRECTORIES = src
+SUB_DIRECTORIES = src c_src
include $(ERL_TOP)/make/subdir.mk
View
37 lib/mcache/c_src/Makefile
@@ -0,0 +1,37 @@
+include $(ERL_TOP)/make/otp.mk
+
+OBJ_DIR = ../obj
+LIB_DIR = $(PRIV_DIR)/lib
+
+ERLENV = $(shell env escript $(ERL_TOP)/make/erlenv.escript)
+ERL_ROOT_DIR = $(word 1,$(ERLENV))
+ERL_EI_DIR = $(word 2,$(ERLENV))
+
+CC = g++
+LD = g++
+CFLAGS = -g -O2 -D_REENTRANT -DUSE_THREADS -D_GNU_SOURCE -fPIC
+LDFLAGS = -shared
+
+ALL_CFLAGS = -I . -I $(ERL_ROOT_DIR)/usr/include -I $(ERL_EI_DIR)/include
+LDLIBS = -L $(ERL_EI_DIR)/lib -L $(ERL_ROOT_DIR)/usr/lib -lei -lerl_interface -lmemcached
+
+DRIVER_OBJS = $(OBJ_DIR)/memcached_drv.o
+
+DYN_DRIVER = $(LIB_DIR)/memcached_drv.so
+
+debug opt: $(OBJ_DIR) $(LIB_DIR) $(DYN_DRIVER)
+
+$(OBJ_DIR):
+ -@mkdir -p $(OBJ_DIR)
+
+$(LIB_DIR):
+ -@mkdir -p $(LIB_DIR)
+
+$(OBJ_DIR)/%.o: %.cpp termdata.hpp
+ $(CC) -c -o $@ $(CFLAGS) $(ALL_CFLAGS) $<
+
+$(LIB_DIR)/memcached_drv.so: $(DRIVER_OBJS) Makefile
+ $(LD) $(LDFLAGS) -o $@ $(DRIVER_OBJS) $(LDLIBS)
+
+clean:
+ -@rm -rf $(LIB_DIR) $(OBJ_DIR)
View
5 lib/mcache/c_src/memcached_drv.cpp
@@ -10,7 +10,7 @@ using namespace std;
#define CMD_SET_SERVERS 0
#define CMD_SET 1
#define CMD_MGET 2
-#define CMD_SET_ASYNC 3
+#define CMD_MGET_BY_CLASS 3
class Cache
{
@@ -37,6 +37,7 @@ class Cache
memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_SORT_HOSTS, 1);
}
private:
@@ -164,11 +165,9 @@ class Driver
free_list.push_back(result);
td.open_tuple();
td.add_buf(result->key, result->key_length);
- td.open_tuple();
td.add_buf(memcached_string_value(&(result->value)), memcached_string_length(&(result->value)));
td.add_uint(result->flags);
td.close_tuple();
- td.close_tuple();
}
td.close_list();
td.output(m_port, true);
View
110 lib/mcache/src/mcache2.erl
@@ -0,0 +1,110 @@
+%
+% wraps memcached_drv
+%
+
+-module(mcache2).
+-author('echou327@gmail.com').
+
+-export([mget/2, set/5, set/4]).
+
+-define(SEP, ":").
+-define(MGET_TIMEOUT, 500).
+-define(FMT_RAW, 0).
+-define(FMT_NATIVE, 101).
+-define(FMT_JSON, 102).
+-define(FMT_INT, 103).
+
+mget(Class, [_|_]=Keys) ->
+ {Pool, _Expiry} = mcache_expires:expire(Class),
+ RealKeys = lists:map(fun(K) ->
+ K1 = map_key(Class, K),
+ erlang:put({'$mcache2_mget$', K1}, K),
+ K1
+ end, Keys),
+ try
+ {mc_async, 0, {ok, Values}} = memcached_drv:mget(Pool, 0, RealKeys),
+ lists:foldl(fun({Key, Val, Flag}, Acc) ->
+ case erlang:get({'$mcache2_mget$', Key}) of
+ undefined ->
+ Acc;
+ K ->
+ [{K, decode_value({Val, Flag})}|Acc]
+ end
+ end, [], Values)
+ after
+ lists:foreach(fun({{'$mcache2_mget$',_}=K,V}) ->
+ erlang:erase(K)
+ end, erlang:get())
+ end.
+
+set(Class, Key, Value, Format, Expiry) ->
+ {Pool, DefaultExpiry} = mcache_expires:expire(Class),
+ {Value1, Flags} = encode_value(Value, Format),
+ Expiry1 = encode_expiry(Expiry, DefaultExpiry),
+ {mc_async, 0, {ok}} = memcached_drv:set(Pool, 0, map_key(Class, Key), Value1, Flags, Expiry1),
+ ok.
+
+set(Class, Key, Value, Format) ->
+ set(Class, Key, Value, Format, default).
+
+
+% internal functions
+my_now() ->
+ erlang:now().
+
+cast([H]) when H>255;is_atom(H) ->
+ cast(H);
+cast([H|L]) when H>255;is_atom(H) ->
+ [cast(H),":"|cast(L)];
+cast(V) when is_tuple(V) ->
+ cast(tuple_to_list(V));
+cast(V) when is_list(V); is_binary(V) ->
+ V;
+cast(V) when is_atom(V) ->
+ atom_to_list(V);
+cast(V) when is_integer(V) ->
+ integer_to_list(V).
+
+map_key(Class, Key) ->
+ iolist_to_binary([cast(Class), ":", cast(Key)]).
+
+encode_value(Value, raw) ->
+ {Value, ?FMT_RAW};
+encode_value(Value, native) ->
+ {term_to_binary(Value), ?FMT_NATIVE};
+encode_value(Value, json) ->
+ {eep0018:encode(Value), ?FMT_JSON};
+encode_value(Value, int) ->
+ {<<Value:32>>, ?FMT_INT}.
+
+decode_value(undefined) ->
+ undefined;
+decode_value(not_found) ->
+ undefined;
+decode_value({Data, ?FMT_RAW}) ->
+ Data;
+decode_value({Data, ?FMT_NATIVE}) ->
+ binary_to_term(Data);
+decode_value({Data, ?FMT_JSON}) ->
+ eep0018:decode(Data);
+decode_value({<<Int:32>>, ?FMT_INT}) ->
+ Int.
+
+encode_expiry(default, DefaultExpiry) ->
+ encode_expiry1(DefaultExpiry);
+encode_expiry(Expiry, _DefaultExpiry) ->
+ encode_expiry1(Expiry).
+
+encode_expiry1(infinity) ->
+ 0;
+encode_expiry1({X, seconds}) ->
+ X;
+encode_expiry1({X, minutes}) ->
+ X*60;
+encode_expiry1({X, hours}) ->
+ X*3600;
+encode_expiry1({X, days}) when X<30->
+ X*86400;
+encode_expiry1(X) when is_integer(X) ->
+ X.
+
View
6 lib/mcache/src/mcache_app.erl
@@ -42,7 +42,9 @@ config_change(_Changed, _New, _Removed) ->
init([]) ->
io:format("[MCACHE] Starting~n"),
Specs = specs([{mcache_client_sup, 1}, % must be 1
- {mcache_config, 1}]), % must be 1
+ {mcache_config, 1},
+ {mcache_memcached_drv_sup, 1}
+ ]), % must be 1
{ok, {{one_for_one, 10, 10}, Specs}}.
% supervisor local functions
@@ -57,5 +59,7 @@ one_spec(mcache_config, Id) ->
{Id, {mcache_config, start_link, [ {PoolsConfig, ExpiresConfig} ]}, permanent, 2000, worker, []};
one_spec(mcache_client_sup, Id) ->
{Id, {mcache_client_sup, start_link, []}, permanent, infinity, supervisor, []};
+one_spec(mcache_memcached_drv_sup, Id) ->
+ {Id, {mcache_memcached_drv_sup, start_link, []}, permanent, infinity, supervisor, []};
one_spec(Module, Id) ->
{Id, {Module, start_link, []}, permanent, 2000, worker, []}.
View
28 lib/mcache/src/mcache_memcached_drv_sup.erl
@@ -0,0 +1,28 @@
+-module(mcache_memcached_drv_sup).
+-author('echou327@gmail.com').
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+% API
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+% supervisor callback
+init([]) ->
+ io:format("[~s] Starting~n", [?MODULE]),
+ Pools = mcache_util:get_app_env(pools, []),
+ Specs = lists:map(fun(Pool) ->
+ Name = proplists:get_value(name, Pool),
+ Servers = proplists:get_value(servers, Pool),
+ S1 = string:join([Addr||{Addr, _}<-Servers], ","),
+ { {memcached_drv, Name},
+ {memcached_drv, start_link, [Name, S1]},
+ permanent, 2000, worker, []
+ }
+ end, Pools),
+ {ok, {{one_for_one, 10, 10}, Specs}}.
+
View
4 lib/mcache/src/mcache_test.erl
@@ -25,9 +25,9 @@ test_set(P, R) ->
stresstest:start("test_set", P, R, F).
-test_mget(P, R) ->
+test_mget(P, R, M) ->
F = fun(I) ->
- mcache:mget(mcache.test, [<<"key:", V:32>> || V <- lists:seq(I, I + 20)])
+ M:mget(mcache.test, [<<"key:", V:32>> || V <- lists:seq(I, I + 20)])
end,
stresstest:start("test_mget", P, R, F).
View
37 lib/mcache/src/memcached_drv.erl
@@ -7,7 +7,7 @@
% API
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
--export([set_servers/2, set_async/2]).
+-export([set_servers/2]).
-export([mget/2, mget/3, set/5, set/6]).
-record(state, {pool, servers}).
@@ -20,7 +20,7 @@
-define(CMD_SET_SERVERS, 0).
-define(CMD_SET, 1).
-define(CMD_MGET, 2).
--define(CMD_SET_ASYNC, 3).
+-define(CMD_MGET_BY_CLASS, 3).
start_link(PoolName, Servers) ->
gen_server:start_link(?MODULE, [PoolName, Servers], []).
@@ -55,13 +55,6 @@ init([PoolName, Servers]) ->
handle_call(_, _, State) ->
{noreply, State}.
-on_off(on) -> 1;
-on_off(_) -> 0.
-
-handle_cast({set_async, OnOff}, State) ->
- [ port_control(Port, ?CMD_SET_ASYNC, [on_off(OnOff)]) || Port <- get_all_driver_ports(State#state.pool) ],
- {noreply, State};
-
handle_cast({set_servers, Servers}, State) ->
[ port_control(Port, ?CMD_SET_SERVERS, [Servers, 0]) || Port <- get_all_driver_ports(State#state.pool) ],
{noreply, State#state{servers=Servers}};
@@ -148,15 +141,6 @@ set_servers(Pool, Servers) ->
false
end.
-set_async(Pool, OnOff) ->
- case pg2:get_local_members({?MODULE, Pool}) of
- [_|_]=Pids ->
- [gen_server:cast(Pid, {set_async, OnOff})||Pid<-Pids],
- ok;
- _ ->
- false
- end.
-
mget(Pool, Keys) -> mget(Pool, 0, Keys).
mget(Pool, Seq, [_|_]=Keys) ->
@@ -169,6 +153,23 @@ mget(Pool, Seq, [_|_]=Keys) ->
Data1 = lists:reverse(Data),
send_command(Port, [?CMD_MGET|Data1]).
+class_to_iolist(Class) when is_atom(Class) ->
+ atom_to_list(Class);
+class_to_iolist(Class) when is_list(Class); is_binary(Class) ->
+ Class.
+
+mget_by_class(Pool, Seq, Class, [_|_]=Keys) ->
+ Port = get_driver_port(Pool),
+ NumKeys = length(Keys),
+ Class1 = class_to_iolist(Class),
+ ClassLen = iolist_size(Class1),
+ Data = lists:foldl(fun(K,A) ->
+ KLen = iolist_size(K),
+ [K, <<KLen:32>>|A]
+ end, [<<Seq:32, NumKeys:32, ClassLen:32>>, Class1], Keys),
+ Data1 = lists:reverse(Data),
+ send_command(Port, [?CMD_MGET_BY_CLASS|Data1]).
+
set(Pool, Key, Value, Flags, Expires) -> set(Pool, 0, Key, Value, Flags, Expires).
set(Pool, Seq, Key, Value, Flags, Expires) ->
Please sign in to comment.
Something went wrong with that request. Please try again.