Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

blah

  • Loading branch information...
commit ab58015b2ea12c538b693d68f7cd438b4c928350 1 parent 3148239
root authored
View
12 .gitignore
@@ -1,6 +1,6 @@
-*.dump
-ebin
-priv
-lib/*/obj
-lib/*/ebin
-lib/*/priv
+*.dump
+ebin
+priv
+lib/*/obj
+lib/*/ebin
+lib/*/priv
View
48 Makefile
@@ -1,24 +1,24 @@
-MAKE = make --no-print-directory
-
-export ERL_TOP = $(shell pwd)
-export EBIN = $(ERL_TOP)/ebin
-export PRIV_DIR = $(ERL_TOP)/priv
-#export DEBUG = 1
-
-OUT_DIR = $(ERL_TOP)/out
-
-.PHONY: dialyzer clean release
-
-all: libs
-
-libs:
- @cd lib && $(MAKE) opt
-
-clean:
- @cd lib && $(MAKE) clean
-
-dialyzer:
- dialyzer $(DIALYZERFLAGS)
-
-$(OUT_DIR):
- -@mkdir -p $(OUT_DIR)
+MAKE = make --no-print-directory
+
+export ERL_TOP = $(shell pwd)
+export EBIN = $(ERL_TOP)/ebin
+export PRIV_DIR = $(ERL_TOP)/priv
+#export DEBUG = 1
+
+OUT_DIR = $(ERL_TOP)/out
+
+.PHONY: dialyzer clean release
+
+all: libs
+
+libs:
+ @cd lib && $(MAKE) opt
+
+clean:
+ @cd lib && $(MAKE) clean
+
+dialyzer:
+ dialyzer $(DIALYZERFLAGS)
+
+$(OUT_DIR):
+ -@mkdir -p $(OUT_DIR)
View
164 README.markdown
@@ -1,83 +1,83 @@
-## Description
-
-mcache is an erlang memcached client application. It utilizes many new features to
-improve performance such as NIF (only from R13B03 on), dynamic compiling
-modules.
-
-## Start/Stop/Configuration
-
-mcache is an OTP application. You may start or stop it as following:
-<pre>
-application:start(mcache)
-application:stop(mcache).
-</pre>
-
-It requires the following configuration (in `-config <ConfigFile>` or `sys.config`)
-<pre>
-{mcache,
- [
- {pools,[
- [{name, generic}, % Pool name is "generic"
- {servers,
- [
- { {1,0,0,1}, 11211, 256 }, % Servers definition. IP address should be in {A,B,C,D} format
- { {1,0,0,2}, 11211, 256 }
- ]}
- ]
- ]}
- ]
-}
-</pre>
-
-## Usage
-
-1. Get a single key.
- <pre>
- mcache:get(Class, Key).
- </pre>
- For example: `mcache:get(my.friends, foobar)` gets the key `"my.friends:foobar"`
-
- Which memcached server is selected? The following steps go:
- 1. Get expiry config from `Class`. Default is `{generic, 300}` (i.e. `{PoolName, ExpireSeconds}`)
- 1. Get the server continuum from the pool name. (in ketama's consistent hashing algorithm)
- 1. Calc the server from Key's MD5 hash value according the above continuum.
-
- **Return values**:
- - `undefined`, if key not found.
- - `Value`, any other values.
-
-
-1. Get multiple keys.
- <pre>
- mcache:mget(Class, [Key|_]).
- </pre>
-
- Note: it gets all the keys with the same `Class`.
-
-1. Set a key and value.
- <pre>
- mcache:set(Class, Key, Value, Format, Expiry)
- </pre>
-
- **Class** is any atom or iolist.
-
- **Key** can be any iolist.
-
- **Format** can be the following atoms:
- - `raw`, an iolist.
- - `native`, any Erlang term (uses `term_to_binary()`)
- - `json`, convert to json string (using an enhanced version of EEP0018)
- - `int`, data in `<<Int:32>>` format.
-
- **Expiry** can be as following:
- - `default`, uses ExpireConfig
- - `infinity`, no expiration
- - `{X, seconds}`, or minutes, hours, days, etc.
- - `Integer`, any numeric seconds.
-
- This argument can be ignored. `default` is used in this case.
-
- **Return**
- * A list of {Key, Value} pairs. The key doesn't contain `Class` part.
- * Missing keys **won't** show in the list.
+## Description
+
+mcache is an erlang memcached client application. It utilizes many new features to
+improve performance such as NIF (only from R13B03 on), dynamic compiling
+modules.
+
+## Start/Stop/Configuration
+
+mcache is an OTP application. You may start or stop it as following:
+<pre>
+application:start(mcache)
+application:stop(mcache).
+</pre>
+
+It requires the following configuration (in `-config <ConfigFile>` or `sys.config`)
+<pre>
+{mcache,
+ [
+ {pools,[
+ [{name, generic}, % Pool name is "generic"
+ {servers,
+ [
+ { {1,0,0,1}, 11211, 256 }, % Servers definition. IP address should be in {A,B,C,D} format
+ { {1,0,0,2}, 11211, 256 }
+ ]}
+ ]
+ ]}
+ ]
+}
+</pre>
+
+## Usage
+
+1. Get a single key.
+ <pre>
+ mcache:get(Class, Key).
+ </pre>
+ For example: `mcache:get(my.friends, foobar)` gets the key `"my.friends:foobar"`
+
+ Which memcached server is selected? The following steps go:
+ 1. Get expiry config from `Class`. Default is `{generic, 300}` (i.e. `{PoolName, ExpireSeconds}`)
+ 1. Get the server continuum from the pool name. (in ketama's consistent hashing algorithm)
+ 1. Calc the server from Key's MD5 hash value according the above continuum.
+
+ **Return values**:
+ - `undefined`, if key not found.
+ - `Value`, any other values.
+
+
+1. Get multiple keys.
+ <pre>
+ mcache:mget(Class, [Key|_]).
+ </pre>
+
+ Note: it gets all the keys with the same `Class`.
+
+1. Set a key and value.
+ <pre>
+ mcache:set(Class, Key, Value, Format, Expiry)
+ </pre>
+
+ **Class** is any atom or iolist.
+
+ **Key** can be any iolist.
+
+ **Format** can be the following atoms:
+ - `raw`, an iolist.
+ - `native`, any Erlang term (uses `term_to_binary()`)
+ - `json`, convert to json string (using an enhanced version of EEP0018)
+ - `int`, data in `<<Int:32>>` format.
+
+ **Expiry** can be as following:
+ - `default`, uses ExpireConfig
+ - `infinity`, no expiration
+ - `{X, seconds}`, or minutes, hours, days, etc.
+ - `Integer`, any numeric seconds.
+
+ This argument can be ignored. `default` is used in this case.
+
+ **Return**
+ * A list of {Key, Value} pairs. The key doesn't contain `Class` part.
+ * Missing keys **won't** show in the list.
* If all keys are missing, an empty list ([]) is returned.
View
164 README.txt
@@ -1,83 +1,83 @@
-## Description
-
-mcache is an erlang memcached client application. It utilizes many new features to
-improve performance such as NIF (only from R13B03 on), dynamic compiling
-modules.
-
-## Start/Stop/Configuration
-
-mcache is an OTP application. You may start or stop it as following:
-<pre>
-application:start(mcache)
-application:stop(mcache).
-</pre>
-
-It requires the following configuration (in `-config <ConfigFile>` or `sys.config`)
-<pre>
-{mcache,
- [
- {pools,[
- [{name, generic}, % Pool name is "generic"
- {servers,
- [
- { {1,0,0,1}, 11211, 256 }, % Servers definition. IP address should be in {A,B,C,D} format
- { {1,0,0,2}, 11211, 256 }
- ]}
- ]
- ]}
- ]
-}
-</pre>
-
-## Usage
-
-1. Get a single key.
- <pre>
- mcache:get(Class, Key).
- </pre>
- For example: `mcache:get(my.friends, foobar)` gets the key `"my.friends:foobar"`
-
- Which memcached server is selected? The following steps go:
- 1. Get expiry config from `Class`. Default is `{generic, 300}` (i.e. `{PoolName, ExpireSeconds}`)
- 1. Get the server continuum from the pool name. (in ketama's consistent hashing algorithm)
- 1. Calc the server from Key's MD5 hash value according the above continuum.
-
- **Return values**:
- - `undefined`, if key not found.
- - `Value`, any other values.
-
-
-1. Get multiple keys.
- <pre>
- mcache:mget(Class, [Key|_]).
- </pre>
-
- Note: it gets all the keys with the same `Class`.
-
-1. Set a key and value.
- <pre>
- mcache:set(Class, Key, Value, Format, Expiry)
- </pre>
-
- **Class** is any atom or iolist.
-
- **Key** can be any iolist.
-
- **Format** can be the following atoms:
- - `raw`, an iolist.
- - `native`, any Erlang term (uses `term_to_binary()`)
- - `json`, convert to json string (using an enhanced version of EEP0018)
- - `int`, data in `<<Int:32>>` format.
-
- **Expiry** can be as following:
- - `default`, uses ExpireConfig
- - `infinity`, no expiration
- - `{X, seconds}`, or minutes, hours, days, etc.
- - `Integer`, any numeric seconds.
-
- This argument can be ignored. `default` is used in this case.
-
- **Return**
- * A list of {Key, Value} pairs. The key doesn't contain `Class` part.
- * Missing keys **won't** show in the list.
+## Description
+
+mcache is an erlang memcached client application. It utilizes many new features to
+improve performance such as NIF (only from R13B03 on), dynamic compiling
+modules.
+
+## Start/Stop/Configuration
+
+mcache is an OTP application. You may start or stop it as following:
+<pre>
+application:start(mcache)
+application:stop(mcache).
+</pre>
+
+It requires the following configuration (in `-config <ConfigFile>` or `sys.config`)
+<pre>
+{mcache,
+ [
+ {pools,[
+ [{name, generic}, % Pool name is "generic"
+ {servers,
+ [
+ { {1,0,0,1}, 11211, 256 }, % Servers definition. IP address should be in {A,B,C,D} format
+ { {1,0,0,2}, 11211, 256 }
+ ]}
+ ]
+ ]}
+ ]
+}
+</pre>
+
+## Usage
+
+1. Get a single key.
+ <pre>
+ mcache:get(Class, Key).
+ </pre>
+ For example: `mcache:get(my.friends, foobar)` gets the key `"my.friends:foobar"`
+
+ Which memcached server is selected? The following steps go:
+ 1. Get expiry config from `Class`. Default is `{generic, 300}` (i.e. `{PoolName, ExpireSeconds}`)
+ 1. Get the server continuum from the pool name. (in ketama's consistent hashing algorithm)
+ 1. Calc the server from Key's MD5 hash value according the above continuum.
+
+ **Return values**:
+ - `undefined`, if key not found.
+ - `Value`, any other values.
+
+
+1. Get multiple keys.
+ <pre>
+ mcache:mget(Class, [Key|_]).
+ </pre>
+
+ Note: it gets all the keys with the same `Class`.
+
+1. Set a key and value.
+ <pre>
+ mcache:set(Class, Key, Value, Format, Expiry)
+ </pre>
+
+ **Class** is any atom or iolist.
+
+ **Key** can be any iolist.
+
+ **Format** can be the following atoms:
+ - `raw`, an iolist.
+ - `native`, any Erlang term (uses `term_to_binary()`)
+ - `json`, convert to json string (using an enhanced version of EEP0018)
+ - `int`, data in `<<Int:32>>` format.
+
+ **Expiry** can be as following:
+ - `default`, uses ExpireConfig
+ - `infinity`, no expiration
+ - `{X, seconds}`, or minutes, hours, days, etc.
+ - `Integer`, any numeric seconds.
+
+ This argument can be ignored. `default` is used in this case.
+
+ **Return**
+ * A list of {Key, Value} pairs. The key doesn't contain `Class` part.
+ * Missing keys **won't** show in the list.
* If all keys are missing, an empty list ([]) is returned.
View
6 lib/Makefile
@@ -1,3 +1,3 @@
-include $(ERL_TOP)/make/subdir.mk
-
-SUB_DIRECTORIES = eep0018 mcache
+include $(ERL_TOP)/make/subdir.mk
+
+SUB_DIRECTORIES = eep0018 mcache
View
16 lib/eep0018/README
@@ -1,9 +1,9 @@
-Based on http://github.com/davisp/eep0018
-
-Improvements:
-1. UTF-8 strings are encoded into \uXXXX notations. (see c_src/yajl_encode.c)
-2. The following formats are encoded into maps: (see c_src/encode_json.c)
- * {struct, [{K,V},...]}
- * [{K,V}, ...]
- * {[{K,V}, ...]}
+Based on http://github.com/davisp/eep0018
+
+Improvements:
+1. UTF-8 strings are encoded into \uXXXX notations. (see c_src/yajl_encode.c)
+2. The following formats are encoded into maps: (see c_src/encode_json.c)
+ * {struct, [{K,V},...]}
+ * [{K,V}, ...]
+ * {[{K,V}, ...]}
3. eep0018 is a linked-in driver following Driver Efficiency Guide.
View
8 lib/eep0018/src/Makefile
@@ -1,4 +1,4 @@
-ERL_FILES = gen_linkedin_driver.erl \
- eep0018.erl
-
-include $(ERL_TOP)/make/app.mk
+ERL_FILES = gen_linkedin_driver.erl \
+ eep0018.erl
+
+include $(ERL_TOP)/make/app.mk
View
74 lib/eep0018/src/eep0018.erl
@@ -1,37 +1,37 @@
--module(eep0018).
--author('echou327@gmail.com').
-
--behaviour(gen_linkedin_driver).
-
--export([start/0, start_link/0, encode/1, decode/1]).
--export([driver_info/0]).
-
-% callback
-driver_info() ->
- {eep0018_drv, eep0018_port, eep0018_table}.
-
-start() ->
- gen_linkedin_driver:start(?MODULE).
-
-start_link() ->
- io:format("[DRIVER] start ~p~n", [?MODULE]),
- gen_linkedin_driver:start_link(?MODULE).
-
-
-%%%% API %%%%
-
-encode(Term) ->
- gen_linkedin_driver:control(?MODULE, 0, term_to_binary(Term)).
-
-decode(Json) when is_binary(Json) ->
- case gen_linkedin_driver:control(?MODULE, 1, <<Json/binary, 0:8>>) of
- [] ->
- receive {json, Decoded} -> Decoded end;
- Error ->
- io:format("====== JSON ERROR ============~n", []),
- throw({invalid_json, binary_to_list(Error)})
- end;
-
-decode(Json) when is_list(Json) ->
- decode(list_to_binary(Json)).
-
+-module(eep0018).
+-author('echou327@gmail.com').
+
+-behaviour(gen_linkedin_driver).
+
+-export([start/0, start_link/0, encode/1, decode/1]).
+-export([driver_info/0]).
+
+% callback
+driver_info() ->
+ {eep0018_drv, eep0018_port, eep0018_table}.
+
+start() ->
+ gen_linkedin_driver:start(?MODULE).
+
+start_link() ->
+ io:format("[DRIVER] start ~p~n", [?MODULE]),
+ gen_linkedin_driver:start_link(?MODULE).
+
+
+%%%% API %%%%
+
+encode(Term) ->
+ gen_linkedin_driver:control(?MODULE, 0, term_to_binary(Term)).
+
+decode(Json) when is_binary(Json) ->
+ case gen_linkedin_driver:control(?MODULE, 1, <<Json/binary, 0:8>>) of
+ [] ->
+ receive {json, Decoded} -> Decoded end;
+ Error ->
+ io:format("====== JSON ERROR ============~n", []),
+ throw({invalid_json, binary_to_list(Error)})
+ end;
+
+decode(Json) when is_list(Json) ->
+ decode(list_to_binary(Json)).
+
View
10 lib/mcache/Makefile
@@ -1,5 +1,5 @@
-#export ERL_COMPILE_FLAGS += -I $(ERL_TOP)/lib/backend/include
-
-SUB_DIRECTORIES = src
-
-include $(ERL_TOP)/make/subdir.mk
+#export ERL_COMPILE_FLAGS += -I $(ERL_TOP)/lib/backend/include
+
+SUB_DIRECTORIES = src
+
+include $(ERL_TOP)/make/subdir.mk
View
12 lib/mcache/src/Makefile
@@ -1,6 +1,6 @@
-#NATIVE=1
-
-APP_NAME=mcache
-#ERL_COMPILE_FLAGS += -I ../include
-
-include $(ERL_TOP)/make/app.mk
+#NATIVE=1
+
+APP_NAME=mcache
+#ERL_COMPILE_FLAGS += -I ../include
+
+include $(ERL_TOP)/make/app.mk
View
82 lib/mcache/src/mcache.app.src
@@ -1,41 +1,41 @@
-% vim:syn=erlang
-{application, mcache,
- [ {description, "memcached client application"},
- {vsn, "%VSN%"},
- {modules, [ %MODULES% ]},
- {registered, []},
- {mod, {mcache_app, []}},
- {applications, [kernel,stdlib]},
- {env,
- [ {pools, [ [ {name, generic},
- {connection_count, 10},
- {servers,
- [
- {"1.0.0.1", 256},
- {"1.0.0.2", 256},
- {"1.0.0.3", 256}
- ]
- }
- ],
- [ {name, foobar},
- {connection_count, 20},
- {servers,
- [
- {"1.0.0.1", 256},
- {"1.1.0.1", 512},
- {"1.1.0.2", 256}
- ]
- }
- ]
- ]
- },
- {expires,
- [ {example.foo, {generic, 300}},
- {example.bar, {generic, {10, hours}}}
- ]
- }
- ]
- }
- ]
-}.
-
+% vim:syn=erlang
+{application, mcache,
+ [ {description, "memcached client application"},
+ {vsn, "%VSN%"},
+ {modules, [ %MODULES% ]},
+ {registered, []},
+ {mod, {mcache_app, []}},
+ {applications, [kernel,stdlib]},
+ {env,
+ [ {pools, [ [ {name, generic},
+ {connection_count, 10},
+ {servers,
+ [
+ {"1.0.0.1", 256},
+ {"1.0.0.2", 256},
+ {"1.0.0.3", 256}
+ ]
+ }
+ ],
+ [ {name, foobar},
+ {connection_count, 20},
+ {servers,
+ [
+ {"1.0.0.1", 256},
+ {"1.1.0.1", 512},
+ {"1.1.0.2", 256}
+ ]
+ }
+ ]
+ ]
+ },
+ {expires,
+ [ {example.foo, {generic, 300}},
+ {example.bar, {generic, {10, hours}}}
+ ]
+ }
+ ]
+ }
+ ]
+}.
+
View
386 lib/mcache/src/mcache.erl
@@ -1,193 +1,193 @@
--module(mcache).
--author('echou327@gmail.com').
-
--compile([inline, native]).
--export([get_server/2, get/2, mget/1, mget/2, set/5, set/4, delete/2, mget2/2]).
-
--define(SEP, ":").
--define(MGET_TIMEOUT, 1000).
--define(FMT_RAW, 0).
--define(FMT_BJSON, 100). % not implemented yet.
--define(FMT_NATIVE, 101).
--define(FMT_JSON, 102).
--define(FMT_INT, 103).
-
-get(Class, Key) ->
- {Key1, Server, _DefaultExpiry} = get_server(Class, Key),
- {_, Value} = mcache_client:mc_get(Server, Key1),
- decode_value(Value).
-
-mget(Class, [Key]) ->
- case ?MODULE:get(Class, Key) of
- undefined -> [];
- Value -> [{Key, Value}] % already decoded in get()
- end;
-mget(Class, [_|_]=Keys) ->
- KeyDict = lists:foldl(
- fun(K, Acc) ->
- {K1, Server, _DefaultExpiry} = get_server(Class, K),
- RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
- mcache_client:ab_get(Server, RealKey),
- orddict:store(RealKey, K, Acc)
- end, orddict:new(), Keys),
- ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
- Result = orddict:fold(fun(RealKey, Key, Acc) ->
- case dict:find(RealKey, ValueDict) of
- error -> Acc;
- {ok, undefined} -> Acc;
- {ok, Value} -> [{Key, decode_value(Value)}|Acc]
- end
- end, [], KeyDict),
- lists:reverse(Result).
-
-
-mget2(Class, [_|_]=Keys) ->
- {KeyDict, ServerDict} = lists:foldl(
- fun(K, {KAcc,SAcc}) ->
- {K1, Server, _DefaultExpiry} = get_server(Class, K),
- RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
- {orddict:store(RealKey, K, KAcc), dict:append(Server, RealKey, SAcc)}
- end, {orddict:new(), dict:new()}, Keys),
- dict:fold(fun(Server, Keys1, Acc) ->
- mcache_client:ab_mget(Server, Keys1),
- Acc
- end, nil, ServerDict),
- ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
- Result = orddict:fold(fun(RealKey, Key, Acc) ->
- case dict:find(RealKey, ValueDict) of
- error -> Acc;
- {ok, undefined} -> Acc;
- {ok, Value} -> [{Key, decode_value(Value)}|Acc]
- end
- end, [], KeyDict),
- lists:reverse(Result).
-
-mget([{Class, Key}]) ->
- case ?MODULE:get(Class, Key) of
- undefined -> [];
- Value -> [{{Class, Key}, Value}] % already decoded in get()
- end;
-mget([{_Class, _Keys}|_] = KeyPairs) ->
- KeyDict = lists:foldl(
- fun({Class, K}=Key, Acc) ->
- {K1, Server, _DefaultExpiry} = get_server(Class, K),
- K2 = list_to_binary(K1),
- mcache_client:ab_get(Server, K2),
- orddict:store(K2, Key, Acc)
- end, orddict:new(), KeyPairs),
- ValueDict = mget_receive(length(KeyPairs), 1000, dict:new()),
- Result = orddict:fold(fun(RealKey, Key, Acc) ->
- case dict:find(RealKey, ValueDict) of
- error -> Acc;
- {ok, undefined} -> Acc;
- {ok, Value} -> [{Key, decode_value(Value)}|Acc]
- end
- end, [], KeyDict),
- lists:reverse(Result).
-
-set(Class, Key, Value, Format, Expiry) ->
- {Key1, Server, DefaultExpiry} = get_server(Class, Key),
- {Value1, Flags} = encode_value(Value, Format),
- Expiry1 = encode_expiry(Expiry, DefaultExpiry),
- mcache_client:ab_set(Server, Key1, Value1, Flags, Expiry1),
- ok.
-
-set(Class, Key, Value, Format) ->
- set(Class, Key, Value, Format, default).
-
-delete(Class, Key) ->
- {Key1, Server} = get_server(Class, Key),
- mcache_client:ab_delete(Server, Key1),
- ok.
-
-% internal functions
-
-mget_receive(0, _Timeout, D) ->
- D;
-mget_receive(_N, Timeout, D) when Timeout =< 0 ->
- D;
-mget_receive(N, Timeout, D) ->
- Now = app_util:now(),
- receive
- {Ref, {mget, Items}} when is_reference(Ref) ->
- Now1 = app_util:now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- D1 = dict:merge(fun(_K,_V1,V2) -> V2 end, D, Items),
- mget_receive(N-dict:size(Items), TimeoutLeft, D1);
- {Ref, {Key, Value}}=Msg when is_reference(Ref) ->
- Now1 = app_util:now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- mget_receive(N-1, TimeoutLeft, dict:store(Key, Value, D));
- Any ->
- io:format("~p~n", [Any]),
- Now1 = app_util:now(),
- TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
- mget_receive(N, TimeoutLeft, D)
- after Timeout ->
- D
- end.
-
-
-cast([H]) when H>255;is_atom(H) ->
- cast(H);
-cast([H|L]) when H>255;is_atom(H) ->
- [cast(H),":"|cast(L)];
-cast(V) when is_tuple(V) ->
- cast(tuple_to_list(V));
-cast(V) when is_list(V); is_binary(V) ->
- V;
-cast(V) when is_atom(V) ->
- atom_to_list(V);
-cast(V) when is_integer(V) ->
- integer_to_list(V).
-
-map_key(Class, Key) ->
- [cast(Class), ":"|cast(Key)].
-
-get_server(Class, Key) ->
- Key1 = map_key(Class, Key),
- {Pool, Expiry} = mcache_expires:expire(Class),
- Server = mcache_continuum:find(Pool, mcache_util:hash(Key1, md5)),
- {Key1, Server, Expiry}.
-
-
-encode_value(Value, raw) ->
- {Value, ?FMT_RAW};
-encode_value(Value, native) ->
- {term_to_binary(Value), ?FMT_NATIVE};
-encode_value(Value, json) ->
- {eep0018:encode(Value), ?FMT_JSON};
-encode_value(Value, int) ->
- {<<Value:32>>, ?FMT_INT}.
-
-decode_value(undefined) ->
- undefined;
-decode_value(not_found) ->
- undefined;
-decode_value({Data, ?FMT_RAW}) ->
- Data;
-decode_value({Data, ?FMT_NATIVE}) ->
- binary_to_term(Data);
-decode_value({Data, ?FMT_JSON}) ->
- eep0018:decode(Data);
-decode_value({<<Int:32>>, ?FMT_INT}) ->
- Int.
-
-encode_expiry(default, DefaultExpiry) ->
- encode_expiry1(DefaultExpiry);
-encode_expiry(Expiry, _DefaultExpiry) ->
- encode_expiry1(Expiry).
-
-encode_expiry1(infinity) ->
- 0;
-encode_expiry1({X, seconds}) ->
- X;
-encode_expiry1({X, minutes}) ->
- X*60;
-encode_expiry1({X, hours}) ->
- X*3600;
-encode_expiry1({X, days}) when X<30->
- X*86400;
-encode_expiry1(X) when is_integer(X) ->
- X.
-
+-module(mcache).
+-author('echou327@gmail.com').
+
+-compile([inline, native]).
+-export([get_server/2, get/2, mget/1, mget/2, set/5, set/4, delete/2, mget2/2]).
+
+-define(SEP, ":").
+-define(MGET_TIMEOUT, 1000).
+-define(FMT_RAW, 0).
+-define(FMT_BJSON, 100). % not implemented yet.
+-define(FMT_NATIVE, 101).
+-define(FMT_JSON, 102).
+-define(FMT_INT, 103).
+
+get(Class, Key) ->
+ {Key1, Server, _DefaultExpiry} = get_server(Class, Key),
+ {_, Value} = mcache_client:mc_get(Server, Key1),
+ decode_value(Value).
+
+mget(Class, [Key]) ->
+ case ?MODULE:get(Class, Key) of
+ undefined -> [];
+ Value -> [{Key, Value}] % already decoded in get()
+ end;
+mget(Class, [_|_]=Keys) ->
+ KeyDict = lists:foldl(
+ fun(K, Acc) ->
+ {K1, Server, _DefaultExpiry} = get_server(Class, K),
+ RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
+ mcache_client:ab_get(Server, RealKey),
+ orddict:store(RealKey, K, Acc)
+ end, orddict:new(), Keys),
+ ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
+ Result = orddict:fold(fun(RealKey, Key, Acc) ->
+ case dict:find(RealKey, ValueDict) of
+ error -> Acc;
+ {ok, undefined} -> Acc;
+ {ok, Value} -> [{Key, decode_value(Value)}|Acc]
+ end
+ end, [], KeyDict),
+ lists:reverse(Result).
+
+
+mget2(Class, [_|_]=Keys) ->
+ {KeyDict, ServerDict} = lists:foldl(
+ fun(K, {KAcc,SAcc}) ->
+ {K1, Server, _DefaultExpiry} = get_server(Class, K),
+ RealKey = list_to_binary(K1), % must convert to binary because response key is a binary
+ {orddict:store(RealKey, K, KAcc), dict:append(Server, RealKey, SAcc)}
+ end, {orddict:new(), dict:new()}, Keys),
+ dict:fold(fun(Server, Keys1, Acc) ->
+ mcache_client:ab_mget(Server, Keys1),
+ Acc
+ end, nil, ServerDict),
+ ValueDict = mget_receive(length(Keys), ?MGET_TIMEOUT, dict:new()),
+ Result = orddict:fold(fun(RealKey, Key, Acc) ->
+ case dict:find(RealKey, ValueDict) of
+ error -> Acc;
+ {ok, undefined} -> Acc;
+ {ok, Value} -> [{Key, decode_value(Value)}|Acc]
+ end
+ end, [], KeyDict),
+ lists:reverse(Result).
+
+mget([{Class, Key}]) ->
+ case ?MODULE:get(Class, Key) of
+ undefined -> [];
+ Value -> [{{Class, Key}, Value}] % already decoded in get()
+ end;
+mget([{_Class, _Keys}|_] = KeyPairs) ->
+ KeyDict = lists:foldl(
+ fun({Class, K}=Key, Acc) ->
+ {K1, Server, _DefaultExpiry} = get_server(Class, K),
+ K2 = list_to_binary(K1),
+ mcache_client:ab_get(Server, K2),
+ orddict:store(K2, Key, Acc)
+ end, orddict:new(), KeyPairs),
+ ValueDict = mget_receive(length(KeyPairs), 1000, dict:new()),
+ Result = orddict:fold(fun(RealKey, Key, Acc) ->
+ case dict:find(RealKey, ValueDict) of
+ error -> Acc;
+ {ok, undefined} -> Acc;
+ {ok, Value} -> [{Key, decode_value(Value)}|Acc]
+ end
+ end, [], KeyDict),
+ lists:reverse(Result).
+
+set(Class, Key, Value, Format, Expiry) ->
+ {Key1, Server, DefaultExpiry} = get_server(Class, Key),
+ {Value1, Flags} = encode_value(Value, Format),
+ Expiry1 = encode_expiry(Expiry, DefaultExpiry),
+ mcache_client:ab_set(Server, Key1, Value1, Flags, Expiry1),
+ ok.
+
+set(Class, Key, Value, Format) ->
+ set(Class, Key, Value, Format, default).
+
+delete(Class, Key) ->
+ {Key1, Server} = get_server(Class, Key),
+ mcache_client:ab_delete(Server, Key1),
+ ok.
+
+% internal functions
+
+mget_receive(0, _Timeout, D) ->
+ D;
+mget_receive(_N, Timeout, D) when Timeout =< 0 ->
+ D;
+mget_receive(N, Timeout, D) ->
+ Now = app_util:now(),
+ receive
+ {Ref, {mget, Items}} when is_reference(Ref) ->
+ Now1 = app_util:now(),
+ TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
+ D1 = dict:merge(fun(_K,_V1,V2) -> V2 end, D, Items),
+ mget_receive(N-dict:size(Items), TimeoutLeft, D1);
+ {Ref, {Key, Value}}=Msg when is_reference(Ref) ->
+ Now1 = app_util:now(),
+ TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
+ mget_receive(N-1, TimeoutLeft, dict:store(Key, Value, D));
+ Any ->
+ io:format("~p~n", [Any]),
+ Now1 = app_util:now(),
+ TimeoutLeft = round(Timeout - timer:now_diff(Now1, Now) / 1000),
+ mget_receive(N, TimeoutLeft, D)
+ after Timeout ->
+ D
+ end.
+
+
+cast([H]) when H>255;is_atom(H) ->
+ cast(H);
+cast([H|L]) when H>255;is_atom(H) ->
+ [cast(H),":"|cast(L)];
+cast(V) when is_tuple(V) ->
+ cast(tuple_to_list(V));
+cast(V) when is_list(V); is_binary(V) ->
+ V;
+cast(V) when is_atom(V) ->
+ atom_to_list(V);
+cast(V) when is_integer(V) ->
+ integer_to_list(V).
+
+map_key(Class, Key) ->
+ [cast(Class), ":"|cast(Key)].
+
+get_server(Class, Key) ->
+ Key1 = map_key(Class, Key),
+ {Pool, Expiry} = mcache_expires:expire(Class),
+ Server = mcache_continuum:find(Pool, mcache_util:hash(Key1, md5)),
+ {Key1, Server, Expiry}.
+
+
+encode_value(Value, raw) ->
+ {Value, ?FMT_RAW};
+encode_value(Value, native) ->
+ {term_to_binary(Value), ?FMT_NATIVE};
+encode_value(Value, json) ->
+ {eep0018:encode(Value), ?FMT_JSON};
+encode_value(Value, int) ->
+ {<<Value:32>>, ?FMT_INT}.
+
+decode_value(undefined) ->
+ undefined;
+decode_value(not_found) ->
+ undefined;
+decode_value({Data, ?FMT_RAW}) ->
+ Data;
+decode_value({Data, ?FMT_NATIVE}) ->
+ binary_to_term(Data);
+decode_value({Data, ?FMT_JSON}) ->
+ eep0018:decode(Data);
+decode_value({<<Int:32>>, ?FMT_INT}) ->
+ Int.
+
+encode_expiry(default, DefaultExpiry) ->
+ encode_expiry1(DefaultExpiry);
+encode_expiry(Expiry, _DefaultExpiry) ->
+ encode_expiry1(Expiry).
+
+encode_expiry1(infinity) ->
+ 0;
+encode_expiry1({X, seconds}) ->
+ X;
+encode_expiry1({X, minutes}) ->
+ X*60;
+encode_expiry1({X, hours}) ->
+ X*3600;
+encode_expiry1({X, days}) when X<30->
+ X*86400;
+encode_expiry1(X) when is_integer(X) ->
+ X.
+
View
122 lib/mcache/src/mcache_app.erl
@@ -1,61 +1,61 @@
--module(mcache_app).
--author('echou327@gmail.com').
-
--behaviour(application).
--behaviour(supervisor).
-
--export([start/2, stop/1, config_change/3]).
--export([init/1]).
-
-
--export([restart/0, start/0, start_link/0, stop/0]).
-
-% API
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-start() ->
- application:start(mcache).
-
-stop() ->
- application:stop(mcache).
-
-restart() ->
- io:format("[MCACHE_APP] restart() ~n", []),
- mcache_util:reload_config([mcache]),
- ?MODULE:stop(),
- ?MODULE:start().
-
-% application callbacks
-
-start(_Type, _Args) ->
- ?MODULE:start_link().
-
-stop(_State) ->
- ok.
-
-config_change(_Changed, _New, _Removed) ->
- ok.
-
-% supervisor callback
-init([]) ->
- io:format("[MCACHE] Starting~n"),
- Specs = specs([{mcache_client_sup, 1}, % must be 1
- {mcache_config, 1}]), % must be 1
- {ok, {{one_for_one, 10, 10}, Specs}}.
-
-% supervisor local functions
-specs(Specs) ->
- lists:foldl(fun({Module, Count}, Acc) ->
- Acc ++ mcache_util:sup_child_spec(Module, fun one_spec/2, Count)
- end, [], Specs).
-
-one_spec(mcache_config, Id) ->
- PoolsConfig = mcache_util:get_app_env(pools, []),
- ExpiresConfig = mcache_util:get_app_env(expires, []),
- {Id, {mcache_config, start_link, [ {PoolsConfig, ExpiresConfig} ]}, permanent, 2000, worker, []};
-one_spec(mcache_client_sup, Id) ->
- {Id, {mcache_client_sup, start_link, []}, permanent, infinity, supervisor, []};
-one_spec(Module, Id) ->
- {Id, {Module, start_link, []}, permanent, 2000, worker, []}.
+-module(mcache_app).
+-author('echou327@gmail.com').
+
+-behaviour(application).
+-behaviour(supervisor).
+
+-export([start/2, stop/1, config_change/3]).
+-export([init/1]).
+
+
+-export([restart/0, start/0, start_link/0, stop/0]).
+
+% API
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start() ->
+ application:start(mcache).
+
+stop() ->
+ application:stop(mcache).
+
+restart() ->
+ io:format("[MCACHE_APP] restart() ~n", []),
+ mcache_util:reload_config([mcache]),
+ ?MODULE:stop(),
+ ?MODULE:start().
+
+% application callbacks
+
+start(_Type, _Args) ->
+ ?MODULE:start_link().
+
+stop(_State) ->
+ ok.
+
+config_change(_Changed, _New, _Removed) ->
+ ok.
+
+% supervisor callback
+init([]) ->
+ io:format("[MCACHE] Starting~n"),
+ Specs = specs([{mcache_client_sup, 1}, % must be 1
+ {mcache_config, 1}]), % must be 1
+ {ok, {{one_for_one, 10, 10}, Specs}}.
+
+% supervisor local functions
+specs(Specs) ->
+ lists:foldl(fun({Module, Count}, Acc) ->
+ Acc ++ mcache_util:sup_child_spec(Module, fun one_spec/2, Count)
+ end, [], Specs).
+
+one_spec(mcache_config, Id) ->
+ PoolsConfig = mcache_util:get_app_env(pools, []),
+ ExpiresConfig = mcache_util:get_app_env(expires, []),
+ {Id, {mcache_config, start_link, [ {PoolsConfig, ExpiresConfig} ]}, permanent, 2000, worker, []};
+one_spec(mcache_client_sup, Id) ->
+ {Id, {mcache_client_sup, start_link, []}, permanent, infinity, supervisor, []};
+one_spec(Module, Id) ->
+ {Id, {Module, start_link, []}, permanent, 2000, worker, []}.
View
748 lib/mcache/src/mcache_client.erl
@@ -1,374 +1,374 @@
--module(mcache_client).
--author('echou327@gmail.com').
-
-%-compile([inline, native, {hipe,o3}]).
-
--behaviour(gen_server).
-
--export([start_link/1]).
--export([init/1,handle_call/3,handle_cast/2,handle_info/2,code_change/3,terminate/2]).
--export([mc_get/2, ab_get/2, mc_mget/2, ab_mget/2, mc_set/5, ab_set/5, mc_delete/2, ab_delete/2]).
-
--include_lib("kernel/src/inet_int.hrl").
-
--define(PG2_GROUP_TAG, ?MODULE).
--define(SOCK_OPTS, [binary, {active, true}, {delay_send, false}, {nodelay, true}, {packet, raw}]).
--define(CONNECT_TIMEOUT, 1000).
--define(RECONNECT_AFTER, 2000).
-
--record(state, {addr, seq=0, buffer, pendings, sock=not_connected, connecting}).
-
--record(req, {opcode=get, data_type=0, cas=0, extra= <<>>, key= <<>>, body= <<>>}).
--record(resp, {seq, status, opcode, data_type=0, cas=0, extra= <<>>, key= <<>>, body= <<>>}).
-
--record(pending, {from, time}).
--record(mget_pending, {from, time, items}).
-
-start_link({Host, Port}) ->
- gen_server:start_link(?MODULE, [{Host, Port}], []).
-
-init([{Host, Port}=Server]) ->
- process_flag(trap_exit, true),
-
- pg2:create({?PG2_GROUP_TAG, Server}),
- pg2:join({?PG2_GROUP_TAG, Server}, self()),
-
- {ok, Sock, Ref} = async_connect(Host, Port, ?SOCK_OPTS, ?CONNECT_TIMEOUT),
-
- {ok, #state{sock=not_connected,
- addr={Host, Port},
- seq=0,
- buffer= <<>>,
- pendings=dict:new(),
- connecting={Sock, Ref}}}.
-
-% HANDLE_CALL
-
-handle_call({mc, _Req}, _From, #state{sock=not_connected}=State) ->
- {reply, {error, not_connected}, State};
-
-handle_call({mc, Req}, From, State) ->
- case send_wrapper(Req, From, State) of
- {ok, NewState} ->
- {noreply, NewState};
- {Any, NewState} ->
- {reply, {error, Any}, NewState}
- end;
-
-handle_call({mc_ab, Req}, From, State) ->
- case send_wrapper(Req, From, State) of
- {ok, NewState} ->
- {reply, pending, NewState};
- {Any, NewState} ->
- {reply, {error, Any}, NewState}
- end;
-
-handle_call(_Req, _From, State) ->
- {noreply, State}.
-
-send_wrapper({mget, Keys}=Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
- case (catch send_req(Sock, Seq, Req)) of
- true ->
- Items = lists:foldl(fun(K, Dict) -> dict:store(K, undefined, Dict) end, dict:new(), Keys),
- P = #mget_pending{from=From, time=erlang:now(), items=Items},
- {ok, State#state{seq=Seq+1, pendings=dict:store(Seq,P,Pendings)}};
- _ ->
- {not_sent, State}
- end;
-send_wrapper(Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
- case (catch send_req(Sock, Seq, Req)) of
- true ->
- P = #pending{from=From, time=erlang:now()},
- {ok, State#state{seq=Seq+1, pendings=dict:store(Seq,P,Pendings)}};
- _ ->
- {not_sent, State}
- end.
-
-% HANDLE_CAST
-
-handle_cast({mc, _Req}, #state{sock=not_connected}=State) ->
- {noreply, State};
-
-handle_cast({mc, Req}, #state{sock=Sock, seq=Seq}=State) ->
- case (catch send_req(Sock, Seq, Req)) of
- ok ->
- {noreply, State#state{seq=Seq+1}};
- _ ->
- {noreply, State}
- end;
-
-handle_cast(_Req, State) ->
- {noreply, State}.
-
-% HANDLE_INFO
-
-handle_info({inet_async, Sock, Ref, Status}, #state{connecting={Sock, Ref}}=State) ->
- %error_logger:info_msg("inet_async: ~p, ~p, ~p~n", [Sock, Ref, Status]),
- case Status of
- ok ->
- {noreply, State#state{sock=Sock, connecting=undefined}};
- _ ->
- {noreply, socket_close(State), hibernate}
- end;
-
-handle_info({tcp_closed, Sock}, #state{sock=Sock}=State) ->
- {noreply, socket_close(State), hibernate};
-
-handle_info(reconnect, #state{addr={Host, Port}}=State) ->
- {ok, Sock, Ref} = async_connect(Host, Port, ?SOCK_OPTS, ?CONNECT_TIMEOUT),
- {noreply, State#state{sock=not_connected, connecting={Sock, Ref}}, hibernate};
-
-handle_info({tcp, Sock, Data}, #state{sock=Sock,buffer=Buf,pendings=Pendings}=State) ->
- {NewBuf, Resps} = do_parse_packet(<<Buf/binary,Data/binary>>, []),
- case Resps of
- [] ->
- {noreply, State#state{buffer=NewBuf}};
- [_|_] ->
- NewPendings = lists:foldl(fun handle_one_resp/2, Pendings, Resps),
- {noreply, State#state{buffer=NewBuf,pendings=NewPendings}}
- end;
-
-handle_info(_Msg, State) ->
- %error_logger:info_msg("handle_info: ~p~n", [Msg]),
- {noreply, State}.
-
-% MISC CALLBACKS
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-socket_close(#state{sock=not_connected, pendings=Pendings}=State) ->
- flush_pendings(Pendings, {error, closed}),
- erlang:send_after(?RECONNECT_AFTER, self(), reconnect),
- State#state{connecting=undefined, pendings=dict:new(), buffer= <<>>};
-socket_close(#state{sock=Sock}=State) when is_port(Sock) ->
- catch gen_tcp:close(Sock),
- socket_close(State#state{sock=not_connected}).
-
-% Handles mget sequences (getkq, getkq, ..., noop)
-handle_one_resp(#resp{opcode=getkq, seq=Seq}=Resp, Pendings) ->
- case dict:find(Seq, Pendings) of
- {ok, #mget_pending{items=Items}=P} ->
- case parse_resp(Resp) of
- {ok, {Key, Value}} ->
- NewItems = dict:store(Key, Value, Items),
- dict:store(Seq, P#mget_pending{items=NewItems}, Pendings);
- _ ->
- Pendings
- end;
- _ -> % normal pendings
- Pendings
- end;
-
-handle_one_resp(#resp{opcode=noop,seq=Seq}, Pendings) ->
- case dict:find(Seq, Pendings) of
- {ok, #mget_pending{from=From, items=Items}} ->
- gen_server:reply(From, {mget, Items});
- _ ->
- ok
- end,
- dict:erase(Seq, Pendings);
-
-
-handle_one_resp(#resp{seq=Seq}=Resp, Pendings) ->
- case dict:find(Seq, Pendings) of
- {ok, #pending{from=From}} ->
- Result = case (catch parse_resp(Resp)) of
- {ok, Any} -> Any;
- {'EXIT', Reason} -> {error, Reason};
- Status -> Status
- end,
- gen_server:reply(From, Result);
- _ ->
- ok
- end,
- dict:erase(Seq, Pendings).
-
-flush_pendings(Pendings, Result) ->
- dict:fold(
- fun(_Seq, #pending{from=From}, Any) ->
- gen_server:reply(From, Result),
- Any;
- (_Seq, #mget_pending{from=From, items=Items}, Any) ->
- gen_server:reply(From, {mget, Items}),
- Any;
- (_, _, Any) ->
- Any
- end,
- nil,
- Pendings).
-
-
-% internal apis
-
-get_client_pid(Server) ->
- case pg2:get_closest_pid({?PG2_GROUP_TAG, Server}) of
- {error, {no_process, _Reason}} ->
- exit(no_process);
- Pid when is_pid(Pid) ->
- %error_logger:info_msg("get_client_pid ~p ~p~n", [Server, Pid]),
- Pid
- end.
-
-async_connect({A,B,C,D}=_Addr, Port, Opts, Time) ->
- case inet:connect_options(Opts, inet) of
- {error, Reason} ->
- exit(Reason);
- {ok, #connect_opts{fd=Fd, ifaddr=BAddr={_,_,_,_}, port=BPort, opts=SockOpts}} ->
- case inet:open(Fd,BAddr,BPort,SockOpts,tcp,inet,?MODULE) of
- {ok, S} ->
- prim_inet:async_connect(S, {A,B,C,D}, Port, Time);
- Error ->
- Error
- end;
- {ok, _} ->
- exit(badarg)
- end.
-
-do_parse_packet(<<16#81, Opcode, KeyLen:16, ExtraLen, DataType, Status:16, TotalBodyLen:32, Seq:32, CAS:64,
- Extra:ExtraLen/binary, Key:KeyLen/binary, Rest/binary>>, Acc) ->
- BodyLen = TotalBodyLen - ExtraLen - KeyLen,
- <<Body:BodyLen/binary, Rest1/binary>> = Rest,
- Resp= #resp{seq=Seq,
- opcode=mcache_proto:opcode(Opcode),
- status=mcache_proto:status(Status),
- data_type=DataType,
- cas=CAS,
- extra=Extra,
- key=Key,
- body=Body},
- do_parse_packet(Rest1, [Resp|Acc]);
-do_parse_packet(Data, Acc) ->
- {Data, lists:reverse(Acc)}.
-
-send_req(Sock, Seq, {mget, Keys}) ->
- Reqs = [ #req{opcode=getkq, key=K} || K <- Keys ],
- do_send_req(Sock, Seq, lists:reverse([#req{opcode=noop}|Reqs]));
-
-send_req(Sock, Seq, noop) ->
- do_send_req(Sock, Seq, #req{opcode=noop});
-
-send_req(Sock, Seq, version) ->
- do_send_req(Sock, Seq, #req{opcode=version});
-
-send_req(Sock, Seq, {delete, Key}) ->
- do_send_req(Sock, Seq, #req{opcode=delete, key=Key});
-
-send_req(Sock, Seq, {flush, Expiry}) ->
- do_send_req(Sock, Seq, #req{opcode=flush, extra= <<Expiry:32>>});
-send_req(Sock, Seq, flush) ->
- do_send_req(Sock, Seq, #req{opcode=flush});
-
-send_req(Sock, Seq, {getk, Key}) ->
- do_send_req(Sock, Seq, #req{opcode=getk, key=Key});
-
-send_req(Sock, Seq, {Op, Key, Value, Flags, Expiry}) when Op=:=set;Op=:=add;Op=:=replace ->
- do_send_req(Sock, Seq, #req{opcode=Op, key=Key, body=Value, extra= <<Flags:32, Expiry:32>>});
-
-send_req(Sock, Seq, {Op, Key, Delta, Initial, Expiry}) when Op=:=incr;Op=:=decr ->
- do_send_req(Sock, Seq, #req{opcode=Op, key=Key, extra= <<Delta:64, Initial:64, Expiry:32>>}).
-
-
-
-parse_resp(#resp{opcode=noop}) ->
- {ok, noop};
-
-% not used !!
-parse_resp(#resp{opcode=get, status=Status, extra=Extra, body=Value}) ->
- case Status of
- ok ->
- <<Flags:32>> = Extra,
- {ok, {Value, Flags}};
- _ -> {ok, undefined}
- end;
-
-parse_resp(#resp{opcode=getk, status=Status, extra=Extra, key=Key, body=Value}) ->
- case Status of
- ok ->
- <<Flags:32>> = Extra,
- {ok, {Key, {Value, Flags}}};
- _ ->
- {ok, {Key, undefined}}
- end;
-
-parse_resp(#resp{opcode=getkq, status=Status, extra=Extra, key=Key, body=Value}) ->
- case Status of
- ok ->
- <<Flags:32>> = Extra,
- {ok, {Key, {Value, Flags}}};
- _ ->
- {ok, {Key, undefined}}
- end;
-
-parse_resp(#resp{opcode=incr, status=ok, body= <<Value:64>>}) ->
- {ok, Value};
-
-parse_resp(#resp{opcode=decr, status=ok, body= <<Value:64>>}) ->
- {ok, Value};
-
-parse_resp(#resp{opcode=version, status=ok, body=Body}) ->
- {ok, binary_to_list(Body)};
-
-parse_resp(#resp{opcode=_, status=Status}) ->
- Status.
-
-
-gen_one_req(Seq, #req{opcode=Opcode,
- data_type=DataType,
- cas=CAS,
- extra=Extra,
- key=Key,
- body=Body}=_Req) ->
- KeyLen = iolist_size(Key),
- BodyLen = iolist_size(Body),
- ExtraLen = iolist_size(Extra),
- TotalBodyLen = KeyLen + BodyLen + ExtraLen,
- [16#80, % MAGIC
- mcache_proto:opcode(Opcode), % Opcode
- <<KeyLen:16, % Key length
- ExtraLen:8, % Extra length
- DataType:8, % Data type
- 0:16, % Reserved
- TotalBodyLen:32, % Total body length
- Seq:32, % Opaque (as Seq)
- CAS:64>>, % CAS
- Extra,
- Key,
- Body].
-
-do_send_req(Sock, Seq, [_|_]=Reqs) ->
- erlang:port_command(Sock, [ gen_one_req(Seq, Req) || Req <- Reqs ]);
-do_send_req(Sock, Seq, Req) ->
- erlang:port_command(Sock, gen_one_req(Seq, Req)).
-
-
-% =======================================================
-% PUBLIC API
-% =======================================================
-
-mc_get(Server, Key) ->
- gen_server:call(get_client_pid(Server), {mc, {getk, Key}}).
-
-ab_get(Server, Key) ->
- gen_server:call(get_client_pid(Server), {mc_ab, {getk, Key}}).
-
-mc_mget(Server, Keys) ->
- gen_server:call(get_client_pid(Server), {mc, {mget, Keys}}).
-
-ab_mget(Server, Keys) ->
- gen_server:call(get_client_pid(Server), {mc_ab, {mget, Keys}}).
-
-mc_set(Server, Key, Value, Flags, Expiry) ->
- gen_server:call(get_client_pid(Server), {mc, {set, Key, Value, Flags, Expiry}}).
-
-ab_set(Server, Key, Value, Flags, Expiry) ->
- gen_server:cast(get_client_pid(Server), {mc, {set, Key, Value, Flags, Expiry}}).
-
-mc_delete(Server, Key) ->
- gen_server:call(get_client_pid(Server), {mc, {delete, Key}}).
-
-ab_delete(Server, Key) ->
- gen_server:cast(get_client_pid(Server), {mc, {delete, Key}}).
+-module(mcache_client).
+-author('echou327@gmail.com').
+
+%-compile([inline, native, {hipe,o3}]).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1,handle_call/3,handle_cast/2,handle_info/2,code_change/3,terminate/2]).
+-export([mc_get/2, ab_get/2, mc_mget/2, ab_mget/2, mc_set/5, ab_set/5, mc_delete/2, ab_delete/2]).
+
+-include_lib("kernel/src/inet_int.hrl").
+
+-define(PG2_GROUP_TAG, ?MODULE).
+-define(SOCK_OPTS, [binary, {active, true}, {delay_send, false}, {nodelay, true}, {packet, raw}]).
+-define(CONNECT_TIMEOUT, 1000).
+-define(RECONNECT_AFTER, 2000).
+
+-record(state, {addr, seq=0, buffer, pendings, sock=not_connected, connecting}).
+
+-record(req, {opcode=get, data_type=0, cas=0, extra= <<>>, key= <<>>, body= <<>>}).
+-record(resp, {seq, status, opcode, data_type=0, cas=0, extra= <<>>, key= <<>>, body= <<>>}).
+
+-record(pending, {from, time}).
+-record(mget_pending, {from, time, items}).
+
+start_link({Host, Port}) ->
+ gen_server:start_link(?MODULE, [{Host, Port}], []).
+
+init([{Host, Port}=Server]) ->
+ process_flag(trap_exit, true),
+
+ pg2:create({?PG2_GROUP_TAG, Server}),
+ pg2:join({?PG2_GROUP_TAG, Server}, self()),
+
+ {ok, Sock, Ref} = async_connect(Host, Port, ?SOCK_OPTS, ?CONNECT_TIMEOUT),
+
+ {ok, #state{sock=not_connected,
+ addr={Host, Port},
+ seq=0,
+ buffer= <<>>,
+ pendings=dict:new(),
+ connecting={Sock, Ref}}}.
+
+% HANDLE_CALL
+
+handle_call({mc, _Req}, _From, #state{sock=not_connected}=State) ->
+ {reply, {error, not_connected}, State};
+
+handle_call({mc, Req}, From, State) ->
+ case send_wrapper(Req, From, State) of
+ {ok, NewState} ->
+ {noreply, NewState};
+ {Any, NewState} ->
+ {reply, {error, Any}, NewState}
+ end;
+
+handle_call({mc_ab, Req}, From, State) ->
+ case send_wrapper(Req, From, State) of
+ {ok, NewState} ->
+ {reply, pending, NewState};
+ {Any, NewState} ->
+ {reply, {error, Any}, NewState}
+ end;
+
+handle_call(_Req, _From, State) ->
+ {noreply, State}.
+
+send_wrapper({mget, Keys}=Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
+ case (catch send_req(Sock, Seq, Req)) of
+ true ->
+ Items = lists:foldl(fun(K, Dict) -> dict:store(K, undefined, Dict) end, dict:new(), Keys),
+ P = #mget_pending{from=From, time=erlang:now(), items=Items},
+ {ok, State#state{seq=Seq+1, pendings=dict:store(Seq,P,Pendings)}};
+ _ ->
+ {not_sent, State}
+ end;
+send_wrapper(Req, From, #state{sock=Sock, seq=Seq, pendings=Pendings}=State) ->
+ case (catch send_req(Sock, Seq, Req)) of
+ true ->
+ P = #pending{from=From, time=erlang:now()},
+ {ok, State#state{seq=Seq+1, pendings=dict:store(Seq,P,Pendings)}};
+ _ ->
+ {not_sent, State}
+ end.
+
+% HANDLE_CAST
+
+handle_cast({mc, _Req}, #state{sock=not_connected}=State) ->
+ {noreply, State};
+
+handle_cast({mc, Req}, #state{sock=Sock, seq=Seq}=State) ->
+ case (catch send_req(Sock, Seq, Req)) of
+ ok ->
+ {noreply, State#state{seq=Seq+1}};
+ _ ->
+ {noreply, State}
+ end;
+
+handle_cast(_Req, State) ->
+ {noreply, State}.
+
+% HANDLE_INFO
+
+handle_info({inet_async, Sock, Ref, Status}, #state{connecting={Sock, Ref}}=State) ->
+ %error_logger:info_msg("inet_async: ~p, ~p, ~p~n", [Sock, Ref, Status]),
+ case Status of
+ ok ->
+ {noreply, State#state{sock=Sock, connecting=undefined}};
+ _ ->
+ {noreply, socket_close(State), hibernate}
+ end;
+
+handle_info({tcp_closed, Sock}, #state{sock=Sock}=State) ->
+ {noreply, socket_close(State), hibernate};
+
+handle_info(reconnect, #state{addr={Host, Port}}=State) ->
+ {ok, Sock, Ref} = async_connect(Host, Port, ?SOCK_OPTS, ?CONNECT_TIMEOUT),
+ {noreply, State#state{sock=not_connected, connecting={Sock, Ref}}, hibernate};
+
+handle_info({tcp, Sock, Data}, #state{sock=Sock,buffer=Buf,pendings=Pendings}=State) ->
+ {NewBuf, Resps} = do_parse_packet(<<Buf/binary,Data/binary>>, []),
+ case Resps of
+ [] ->
+ {noreply, State#state{buffer=NewBuf}};
+ [_|_] ->
+ NewPendings = lists:foldl(fun handle_one_resp/2, Pendings, Resps),
+ {noreply, State#state{buffer=NewBuf,pendings=NewPendings}}
+ end;
+
+handle_info(_Msg, State) ->
+ %error_logger:info_msg("handle_info: ~p~n", [Msg]),
+ {noreply, State}.
+
+% MISC CALLBACKS
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+socket_close(#state{sock=not_connected, pendings=Pendings}=State) ->
+ flush_pendings(Pendings, {error, closed}),
+ erlang:send_after(?RECONNECT_AFTER, self(), reconnect),
+ State#state{connecting=undefined, pendings=dict:new(), buffer= <<>>};
+socket_close(#state{sock=Sock}=State) when is_port(Sock) ->
+ catch gen_tcp:close(Sock),
+ socket_close(State#state{sock=not_connected}).
+
+% Handles mget sequences (getkq, getkq, ..., noop)
+handle_one_resp(#resp{opcode=getkq, seq=Seq}=Resp, Pendings) ->
+ case dict:find(Seq, Pendings) of
+ {ok, #mget_pending{items=Items}=P} ->
+ case parse_resp(Resp) of
+ {ok, {Key, Value}} ->
+ NewItems = dict:store(Key, Value, Items),
+ dict:store(Seq, P#mget_pending{items=NewItems}, Pendings);
+ _ ->
+ Pendings
+ end;
+ _ -> % normal pendings
+ Pendings
+ end;
+
+handle_one_resp(#resp{opcode=noop,seq=Seq}, Pendings) ->
+ case dict:find(Seq, Pendings) of
+ {ok, #mget_pending{from=From, items=Items}} ->
+ gen_server:reply(From, {mget, Items});
+ _ ->
+ ok
+ end,
+ dict:erase(Seq, Pendings);
+
+
+handle_one_resp(#resp{seq=Seq}=Resp, Pendings) ->
+ case dict:find(Seq, Pendings) of
+ {ok, #pending{from=From}} ->
+ Result = case (catch parse_resp(Resp)) of
+ {ok, Any} -> Any;
+ {'EXIT', Reason} -> {error, Reason};
+ Status -> Status
+ end,
+ gen_server:reply(From, Result);
+ _ ->
+ ok
+ end,
+ dict:erase(Seq, Pendings).
+
+flush_pendings(Pendings, Result) ->
+ dict:fold(
+ fun(_Seq, #pending{from=From}, Any) ->
+ gen_server:reply(From, Result),
+ Any;
+ (_Seq, #mget_pending{from=From, items=Items}, Any) ->
+ gen_server:reply(From, {mget, Items}),
+ Any;
+ (_, _, Any) ->
+ Any
+ end,
+ nil,
+ Pendings).
+
+
+% internal apis
+
+get_client_pid(Server) ->
+ case pg2:get_closest_pid({?PG2_GROUP_TAG, Server}) of
+ {error, {no_process, _Reason}} ->
+ exit(no_process);
+ Pid when is_pid(Pid) ->
+ %error_logger:info_msg("get_client_pid ~p ~p~n", [Server, Pid]),
+ Pid
+ end.
+
+async_connect({A,B,C,D}=_Addr, Port, Opts, Time) ->
+ case inet:connect_options(Opts, inet) of
+ {error, Reason} ->
+ exit(Reason);
+ {ok, #connect_opts{fd=Fd, ifaddr=BAddr={_,_,_,_}, port=BPort, opts=SockOpts}} ->
+ case inet:open(Fd,BAddr,BPort,SockOpts,tcp,inet,?MODULE) of
+ {ok, S} ->
+ prim_inet:async_connect(S, {A,B,C,D}, Port, Time);
+ Error ->
+ Error
+ end;
+ {ok, _} ->
+ exit(badarg)
+ end.
+
+do_parse_packet(<<16#81, Opcode, KeyLen:16, ExtraLen, DataType, Status:16, TotalBodyLen:32, Seq:32, CAS:64,
+ Extra:ExtraLen/binary, Key:KeyLen/binary, Rest/binary>>, Acc) ->
+ BodyLen = TotalBodyLen - ExtraLen - KeyLen,
+ <<Body:BodyLen/binary, Rest1/binary>> = Rest,
+ Resp= #resp{seq=Seq,
+ opcode=mcache_proto:opcode(Opcode),
+ status=mcache_proto:status(Status),
+ data_type=DataType,
+ cas=CAS,
+ extra=Extra,
+ key=Key,
+ body=Body},
+ do_parse_packet(Rest1, [Resp|Acc]);
+do_parse_packet(Data, Acc) ->
+ {Data, lists:reverse(Acc)}.
+
+send_req(Sock, Seq, {mget, Keys}) ->
+ Reqs = [ #req{opcode=getkq, key=K} || K <- Keys ],
+ do_send_req(Sock, Seq, lists:reverse([#req{opcode=noop}|Reqs]));
+
+send_req(Sock, Seq, noop) ->
+ do_send_req(Sock, Seq, #req{opcode=noop});
+
+send_req(Sock, Seq, version) ->
+ do_send_req(Sock, Seq, #req{opcode=version});
+
+send_req(Sock, Seq, {delete, Key}) ->
+ do_send_req(Sock, Seq, #req{opcode=delete, key=Key});
+
+send_req(Sock, Seq, {flush, Expiry}) ->
+ do_send_req(Sock, Seq, #req{opcode=flush, extra= <<Expiry:32>>});
+send_req(Sock, Seq, flush) ->
+ do_send_req(Sock, Seq, #req{opcode=flush});
+
+send_req(Sock, Seq, {getk, Key}) ->
+ do_send_req(Sock, Seq, #req{opcode=getk, key=Key});
+
+send_req(Sock, Seq, {Op, Key, Value, Flags, Expiry}) when Op=:=set;Op=:=add;Op=:=replace ->
+ do_send_req(Sock, Seq, #req{opcode=Op, key=Key, body=Value, extra= <<Flags:32, Expiry:32>>});
+
+send_req(Sock, Seq, {Op, Key, Delta, Initial, Expiry}) when Op=:=incr;Op=:=decr ->
+ do_send_req(Sock, Seq, #req{opcode=Op, key=Key, extra= <<Delta:64, Initial:64, Expiry:32>>}).
+
+
+
+parse_resp(#resp{opcode=noop}) ->
+ {ok, noop};
+
+% not used !!
+parse_resp(#resp{opcode=get, status=Status, extra=Extra, body=Value}) ->
+ case Status of
+ ok ->
+ <<Flags:32>> = Extra,
+ {ok, {Value, Flags}};
+ _ -> {ok, undefined}
+ end;
+
+parse_resp(#resp{opcode=getk, status=Status, extra=Extra, key=Key, body=Value}) ->
+ case Status of
+ ok ->
+ <<Flags:32>> = Extra,
+ {ok, {Key, {Value, Flags}}};
+ _ ->
+ {ok, {Key, undefined}}
+ end;
+
+parse_resp(#resp{opcode=getkq, status=Status, extra=Extra, key=Key, body=Value}) ->
+ case Status of
+ ok ->
+ <<Flags:32>> = Extra,
+ {ok, {Key, {Value, Flags}}};
+ _ ->
+ {ok, {Key, undefined}}
+ end;
+
+parse_resp(#resp{opcode=incr, status=ok, body= <<Value:64>>}) ->
+ {ok, Value};
+
+parse_resp(#resp{opcode=decr, status=ok, body= <<Value:64>>}) ->
+ {ok, Value};
+
+parse_resp(#resp{opcode=version, status=ok, body=Body}) ->
+ {ok, binary_to_list(Body)};
+
+parse_resp(#resp{opcode=_, status=Status}) ->
+ Status.
+
+
+gen_one_req(Seq, #req{opcode=Opcode,
+ data_type=DataType,
+ cas=CAS,
+ extra=Extra,
+ key=Key,
+ body=Body}=_Req) ->
+ KeyLen = iolist_size(Key),
+ BodyLen = iolist_size(Body),
+ ExtraLen = iolist_size(Extra),
+ TotalBodyLen = KeyLen + BodyLen + ExtraLen,
+ [16#80, % MAGIC
+ mcache_proto:opcode(Opcode), % Opcode
+ <<KeyLen:16, % Key length
+ ExtraLen:8, % Extra length
+ DataType:8, % Data type
+ 0:16, % Reserved
+ TotalBodyLen:32, % Total body length
+ Seq:32, % Opaque (as Seq)
+ CAS:64>>, % CAS
+ Extra,
+ Key,
+ Body].
+
+do_send_req(Sock, Seq, [_|_]=Reqs) ->
+ erlang:port_command(Sock, [ gen_one_req(Seq, Req) || Req <- Reqs ]);
+do_send_req(Sock, Seq, Req) ->
+ erlang:port_command(Sock, gen_one_req(Seq, Req)).
+
+
+% =======================================================
+% PUBLIC API
+% =======================================================
+
+mc_get(Server, Key) ->
+ gen_server:call(get_client_pid(Server), {mc, {getk, Key}}).
+
+ab_get(Server, Key) ->
+ gen_server:call(get_client_pid(Server), {mc_ab, {getk, Key}}).
+
+mc_mget(Server, Keys) ->
+ gen_server:call(get_client_pid(Server), {mc, {mget, Keys}}).
+
+ab_mget(Server, Keys) ->
+ gen_server:call(get_client_pid(Server), {mc_ab, {mget, Keys}}).
+
+mc_set(Server, Key, Value, Flags, Expiry) ->
+ gen_server:call(get_client_pid(Server), {mc, {set, Key, Value, Flags, Expiry}}).
+
+ab_set(Server, Key, Value, Flags, Expiry) ->
+ gen_server:cast(get_client_pid(Server), {mc, {set, Key, Value, Flags, Expiry}}).
+
+mc_delete(Server, Key) ->
+ gen_server:call(get_client_pid(Server), {mc, {delete, Key}}).
+
+ab_delete(Server, Key) ->
+ gen_server:cast(get_client_pid(Server), {mc, {delete, Key}}).
View
104 lib/mcache/src/mcache_client_sup.erl
@@ -1,52 +1,52 @@
--module(mcache_client_sup).
--author('echou327@gmail.com').
-
--behaviour(supervisor).
--export([init/1]).
-
--export([start_link/0, start_child/3, restart_child/2, terminate_child/2]).
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init([]) ->
- error_logger:info_msg("[MCACHE_CLIENT_SUP] Started.~n"),
-
- {ok, {{one_for_one, 10, 10}, []}}.
-
-start_child(Name, {Host, Port}, Num) ->
- lists:foreach(
- fun(I) ->
- Id = {mcache_client, {Name, Host, Port}, I},
- Spec = {Id,{mcache_client,start_link,[{Host,Port}]},permanent,2000,worker,[mcache_client]},
- case supervisor:start_child(?MODULE, Spec) of
- {ok, _} -> ok;
- {ok, _, _} -> ok;
- {error, {already_started,_}} ->
- supervisor:terminate_child(?MODULE, Id),
- supervisor:delete_child(?MODULE, Id),
- supervisor:start_child(?MODULE, Spec);
- {error, already_present} ->
- supervisor:delete_child(?MODULE, Id),
- supervisor:start_child(?MODULE, Spec);
- {error, _Error} ->
- erlang:error(_Error)
- end
- end,
- lists:seq(1, Num)).
-
-terminate_child(Name, {Host, Port}) ->
- F = fun({{mcache_client, {Name1, Host1, Port1}, _Index} = Id, _Child, _Type, _Modules}) when Name=:=Name1,Host=:=Host1,Port=:=Port1->
- supervisor:terminate_child(?MODULE, Id);
- (_) ->
- ignore
- end,
- lists:foreach(F, supervisor:which_children(?MODULE)).
-
-restart_child(Name, {Host, Port}) ->
- F = fun({{mcache_client, {Name1, Host1, Port1}, _Index} = Id, _Child, _Type, _Modules}) when Name=:=Name1,Host=:=Host1,Port=:=Port1->
- supervisor:restart_child(?MODULE, Id);
- (_) ->
- ignore
- end,
- lists:foreach(F, supervisor:which_children(?MODULE)).
+-module(mcache_client_sup).
+-author('echou327@gmail.com').
+
+-behaviour(supervisor).
+-export([init/1]).
+
+-export([start_link/0, start_child/3, restart_child/2, terminate_child/2]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ error_logger:info_msg("[MCACHE_CLIENT_SUP] Started.~n"),
+
+ {ok, {{one_for_one, 10, 10}, []}}.
+
+start_child(Name, {Host, Port}, Num) ->
+ lists:foreach(
+ fun(I) ->
+ Id = {mcache_client, {Name, Host, Port}, I},
+ Spec = {Id,{mcache_client,start_link,[{Host,Port}]},permanent,2000,worker,[mcache_client]},
+ case supervisor:start_child(?MODULE, Spec) of
+ {ok, _} -> ok;
+ {ok, _, _} -> ok;
+ {error, {already_started,_}} ->
+ supervisor:terminate_child(?MODULE, Id),
+ supervisor:delete_child(?MODULE, Id),
+ supervisor:start_child(?MODULE, Spec);
+ {error, already_present} ->
+ supervisor:delete_child(?MODULE, Id),
+ supervisor:start_child(?MODULE, Spec);
+ {error, _Error} ->
+ erlang:error(_Error)
+ end
+ end,
+ lists:seq(1, Num)).
+
+terminate_child(Name, {Host, Port}) ->
+ F = fun({{mcache_client, {Name1, Host1, Port1}, _Index} = Id, _Child, _Type, _Modules}) when Name=:=Name1,Host=:=Host1,Port=:=Port1->
+ supervisor:terminate_child(?MODULE, Id);
+ (_) ->
+ ignore
+ end,
+ lists:foreach(F, supervisor:which_children(?MODULE)).
+
+restart_child(Name, {Host, Port}) ->
+ F = fun({{mcache_client, {Name1, Host1, Port1}, _Index} = Id, _Child, _Type, _Modules}) when Name=:=Name1,Host=:=Host1,Port=:=Port1->
+ supervisor:restart_child(?MODULE, Id);
+ (_) ->
+ ignore
+ end,
+ lists:foreach(F, supervisor:which_children(?MODULE)).
View
332 lib/mcache/src/mcache_config.erl
@@ -1,166 +1,166 @@
--module(mcache_config).
--author('thijsterlouw@gmail.com').
--author('echou327@gmail.com').
-
--behaviour(gen_server).
-
--export([start_link/1]).
--export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
-
--export([update_expires/1,get_expires/0]).
-
--define(DEFAULT_EXPIRE_SECONDS, 300). %5 minutes
--define(DEFAULT_POOL, generic).
-
--record(state, {expires, initial_expires}).
-
-%%%=========================================================================
-%%% Start the config
-%%%=========================================================================
-
-start_link(Opts) ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).
-
-init({Pools, Expires}=Opts) ->
- io:format("[MCACHE_CONFIG] init~n", []),
- process_flag(trap_exit, true),
-
- parse_expires(Expires),
- parse_pools(Pools),
- {ok, #state{expires=Expires, initial_expires=Expires}}.
-
-
-handle_call({update_expires, Info}, _From, #state{expires=OldExpires, initial_expires=InitialExpires}=State) ->
- NewExpires = case Info of
- restore ->
- % restore to initial config
- InitialExpires;
- {assign, Expires} ->
- % set a new set of config
- Expires;
- {delete, Class} ->
- proplists:delete(Class, OldExpires);
- {set, Class, {PoolName, Expiry}} ->
- Expires1 = proplists:delete(Class, OldExpires),
- [{Class, {PoolName, Expiry}}|Expires1];
- _ ->
- not_modified
- end,
- if NewExpires =:= not_modified ->
- {reply, ok, State};
- true ->
- case (catch parse_expires(NewExpires)) of
- {'EXIT', Reason} ->
- {reply, {error, Reason}, State};
- _ ->
- {reply, ok, #state{expires=NewExpires}}
- end
- end;
-
-handle_call(get_expires, _From, #state{expires=Expires}=State) ->
- {reply, {ok, Expires}, State};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-%handle_call(_Request, _From, State) -> {reply, ok, State}.
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(Reason, _State) ->
- io:format("[MCACHE_CONFIG] terminate() reason: ~p~n", [Reason]).
-
-code_change(_OldVsn, _Extra, State) ->
- io:format("blabla~n",[]),
- State.
-
-
-% API
-update_expires(UpdateInfo) ->
- gen_server:call(?MODULE, {update_expires, UpdateInfo}).
-
-get_expires() ->
- gen_server:call(?MODULE, get_expires).
-
-
-
-% internal functions
-
-parse_expires(Expires) ->
- io:format("[MCACHE_CONFIG] Generating mcache_expires module ...~n", []),
-
- Code =
-[
-"-module(mcache_expires).\n-author('echou327@gmail.com').\n-export([expire/1]).% DO NOT EDIT.\n\n",
-[
-io_lib:format("expire(~p) -> ~p;~n", [Class, {PoolName, Expire}]) || {Class, {PoolName, Expire}} <- Expires
-],
-io_lib:format("expire(_) -> ~p.~n", [{?DEFAULT_POOL, ?DEFAULT_EXPIRE_SECONDS}])
-],
- Code1 = lists:flatten(Code),
- {M, B} = dynamic_compile:from_string(Code1),
- code:load_binary(M, "", B),
- Code1.
-
-parse_pools(Pools) ->
-
- % generate mcache_continuum
- io:format("[MCACHE_CONFIG] Generating mcache_continuum module ...~n", []),
- Continuums = lists:map(
- fun(PoolConfig) ->
- Name = proplists:get_value(name, PoolConfig, generic),
- Servers = proplists:get_value(servers, PoolConfig, []),
- Servers1 = [normalize_server(S) || S <- Servers],
- {Name, Servers1}
- end,
- Pools),
- {NewPools, _} = mcache_continuum_gen:gen(Continuums, gb_trees),
-
- PoolsDict = lists:foldl(
- fun(PoolConfig, Acc) ->
- Name = proplists:get_value(name, PoolConfig),
- dict:store(Name, PoolConfig, Acc)
- end, dict:new(), Pools),
-
- % start mcache_clients
- lists:foreach(
- fun({Name, Servers, _}) ->
- %io:format("[MCACHE_CONFIG] ~p ~p~n", [Name, Pools]),
- {ok, PoolConfig} = dict:find(Name, PoolsDict),
- ConnectionCount = proplists:get_value(connection_count, PoolConfig, 10),
- lists:foreach(
- fun({{A,B,C,D}=Host,Port}) ->
- io:format("[MCACHE_CONFIG] Starting ~p mcache clients at [~p, ~p.~p.~p.~p:~p] ...~n", [ConnectionCount, Name, A,B,C,D,Port]),
- mcache_client_sup:start_child(Name, {Host, Port}, ConnectionCount)
- end,
- Servers)
- end,
- NewPools).
-
-normalize_server({{_,_,_,_}=Addr, Port, Weight}) ->
- {Addr, Port, Weight};
-normalize_server({Addr, Port, Weight}) when is_list(Addr) ->
- {Addr1, _} = normalize_addr(Addr),
- {Addr1, Port, Weight};
-normalize_server({Addr, Weight}) ->
- {Addr1, Port1} = normalize_addr(Addr),
- {Addr1, Port1, Weight}.
-
-normalize_addr(Addr) when is_list(Addr) ->
- case string:tokens(Addr, ":") of
- [IP] ->
- {ok, Addr1} = inet:getaddr(IP, inet),
- {Addr1, 11211};
- [IP,PortStr|_] ->
- {ok, Addr1} = inet:getaddr(IP, inet),
- Port1 = case string:to_integer(PortStr) of
- {Port, []} -> Port;
- _ -> 11211
- end,
- {Addr1, Port1};
- _ ->
- {{127,0,0,1}, 11211}
- end.
+-module(mcache_config).
+-author('thijsterlouw@gmail.com').
+-author('echou327@gmail.com').
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
+
+-export([update_expires/1,get_expires/0]).
+
+-define(DEFAULT_EXPIRE_SECONDS, 300). %5 minutes
+-define(DEFAULT_POOL, generic).
+
+-record(state, {expires, initial_expires}).
+
+%%%=========================================================================
+%%% Start the config
+%%%=========================================================================
+
+start_link(Opts) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).
+
+init({Pools, Expires}=Opts) ->
+ io:format("[MCACHE_CONFIG] init~n", []),
+ process_flag(trap_exit, true),
+
+ parse_expires(Expires),
+ parse_pools(Pools),
+ {ok, #state{expires=Expires, initial_expires=Expires}}.
+
+
+handle_call({update_expires, Info}, _From, #state{expires=OldExpires, initial_expires=InitialExpires}=State) ->
+ NewExpires = case Info of
+ restore ->
+ % restore to initial config
+ InitialExpires;
+ {assign, Expires} ->
+ % set a new set of config
+ Expires;
+ {delete, Class} ->
+ proplists:delete(Class, OldExpires);
+ {set, Class, {PoolName, Expiry}} ->
+ Expires1 = proplists:delete(Class, OldExpires),
+ [{Class, {PoolName, Expiry}}|Expires1];
+ _ ->
+ not_modified
+ end,
+ if NewExpires =:= not_modified ->
+ {reply, ok, State};
+ true ->
+ case (catch parse_expires(NewExpires)) of
+ {'EXIT', Reason} ->
+ {reply, {error, Reason}, State};
+ _ ->
+ {reply, ok, #state{expires=NewExpires}}
+ end
+ end;
+
+handle_call(get_expires, _From, #state{expires=Expires}=State) ->
+ {reply, {ok, Expires}, State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+%handle_call(_Request, _From, State) -> {reply, ok, State}.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(Reason, _State) ->
+ io:format("[MCACHE_CONFIG] terminate() reason: ~p~n", [Reason]).