Skip to content

Commit

Permalink
KAZOO-4091: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fenollp committed Apr 15, 2016
1 parent 3c9870b commit 353427e
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 349 deletions.
2 changes: 1 addition & 1 deletion core/kazoo_caches/src/kazoo_caches.app.src
Expand Up @@ -3,7 +3,7 @@
{description, "Caches - Make sure system caches are available for system processes"},
{vsn, "4.0.0"},
{modules, []},
{registered, [kazoo_caches_sup, whapps_config_cache, whapps_call_cache, whapps_getby_cache]},
{registered, [kazoo_caches_sup, kzc_global_cache, whapps_config_cache, whapps_call_cache, whapps_getby_cache]},
{applications, [ kernel
, stdlib
, crypto
Expand Down
1 change: 1 addition & 0 deletions core/kazoo_caches/src/kazoo_caches_sup.erl
Expand Up @@ -18,6 +18,7 @@

%% Helper macro for declaring children of supervisor
-define(CHILDREN, [?CACHE_ARGS(?WHAPPS_CONFIG_CACHE, ?WHAPPS_CONFIG_PROPS)
,?CACHE('kzc_global_cache')
,?CACHE(?WHAPPS_CALL_CACHE)
,?CACHE_ARGS(?WHAPPS_GETBY_CACHE, ?WHAPPS_GETBY_PROPS)
]).
Expand Down
34 changes: 34 additions & 0 deletions core/kazoo_caches/src/kz_caches.hrl
@@ -1,4 +1,5 @@
-ifndef(KZ_CACHES_HRL).

-include_lib("whistle/include/wh_types.hrl").
-include_lib("whistle/include/wh_log.hrl").
-include_lib("whistle/include/wh_databases.hrl").
Expand All @@ -8,6 +9,39 @@
-define(APP_VERSION, <<"4.0.0">> ).


-define(EXPIRES, ?SECONDS_IN_HOUR). %% an hour
-define(EXPIRE_PERIOD, 10 * ?MILLISECONDS_IN_SECOND).
-define(EXPIRE_PERIOD_MSG, 'expire_cache_objects').
-define(DEFAULT_WAIT_TIMEOUT, 5).


-type callback_fun() :: fun((any(), any(), 'flush' | 'erase' | 'expire') -> any()).
-type callback_funs() :: [callback_fun()].

-type origin_tuple() :: {'db', Database::ne_binary(), PvtTypeOrId::ne_binary()} |
{'type', PvtType::ne_binary(), Id::ne_binary()} |
{'db', Database::ne_binary()} |
{'database', Database::ne_binary()} | %% Added for notify db create/delete
{'type', PvtType::ne_binary()}.
-type origin_tuples() :: [origin_tuple()].

-type store_options() :: [{'origin', origin_tuple() | origin_tuples()} |
{'expires', wh_timeout()} |
{'callback', 'undefined' | callback_fun()}
].

-record(cache_obj, {key :: any()| '_' | '$1'
,value :: any() | '_' | '$1' | '$2'
,expires :: wh_timeout() | '_' | '$3'
,timestamp = wh_util:current_tstamp() :: gregorian_seconds() | '_' | '$4'
,callback :: callback_fun() | '_' | '$2' | '$3' | '$5'
,origin :: origin_tuple() | origin_tuples() | '$1' | '_'
}).

-type cache_obj() :: #cache_obj{}.
-type cache_objs() :: [cache_obj()].


%%--------------------------------------------------------------------
-define(WHAPPS_CONFIG_ORIGIN_BINDINGS, [[{'type', <<"account">>}]
,[{'db', ?WH_CONFIG_DB}]
Expand Down
282 changes: 282 additions & 0 deletions core/kazoo_caches/src/kzc_cache.erl
@@ -0,0 +1,282 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2011-2016, 2600Hz INC
%%% @doc
%%% Simple cache server
%%% @end
%%% @contributors
%%% James Aimonetti
%%% Karl Anderson
%%% Pierre Fenoll
%%%-------------------------------------------------------------------
-module(kzc_cache).

-export([start_link/1, start_link/2]).

-export([store/3, store/4]).
-export([peek/2]).
-export([fetch/2, fetch_keys/1]).
-export([erase/2]).
-export([flush/1]).
-export([filter/2, filter_erase/2]).
-export([dump/1, dump/2]).
-export([wait_for_key/2, wait_for_key/3]).

-include("kz_caches.hrl").

-define(BINDINGS, [{'self', []}]).
-define(RESPONDERS(CBModule), [{{CBModule, 'handle_document_change'}
,[{<<"configuration">>, <<"*">>}]
}
]).
-define(QUEUE_NAME, <<>>).
-define(QUEUE_OPTIONS, []).
-define(CONSUME_OPTIONS, []).

-define(DATABASE_BINDING, [{'type', <<"database">>}]).


-type predicate2() :: fun((any(), any()) -> boolean()).

-export_type([predicate2/0]).


%%% API

%%--------------------------------------------------------------------
%% @doc Starts a cache server
%%--------------------------------------------------------------------
-spec start_link(atom()) -> startlink_ret().
-spec start_link(atom(), wh_proplist()) -> startlink_ret().

start_link(Name) when is_atom(Name) ->
start_link(Name, []).

start_link(Name, Props)
when is_atom(Name), is_list(Props) ->
CBModule = get_props_datastore(Props),
case props:get_value('origin_bindings', Props) of
'undefined' ->
lager:debug("started new cache process (gen_server): ~s", [Name]),
gen_server:start_link({'local', Name}, CBModule, [Name, ?EXPIRE_PERIOD, Props], []);
BindingProps ->
lager:debug("started new cache process (gen_listener): ~s", [Name]),
Bindings = [{'conf', ['federate' | P]} || P <- maybe_add_db_binding(BindingProps)],
gen_listener:start_link({'local', Name}, CBModule
,[{'bindings', Bindings}
,{'responders', ?RESPONDERS(CBModule)}
,{'queue_name', ?QUEUE_NAME}
,{'queue_options', ?QUEUE_OPTIONS}
,{'consume_options', ?CONSUME_OPTIONS}
]
,[Name, ?EXPIRE_PERIOD, Props]
)
end.

-spec maybe_add_db_binding(wh_proplists()) -> wh_proplists().
maybe_add_db_binding([]) -> [];
maybe_add_db_binding([[]]) -> [[]];
maybe_add_db_binding(BindingProps) ->
[?DATABASE_BINDING | BindingProps].

%% Local cache API
-spec store(server_ref(), any(), any()) -> 'ok'.
-spec store(server_ref(), any(), any(), wh_proplist()) -> 'ok'.

store(Srv, K, V) ->
store(Srv, K, V, []).

store(Srv, K, V, Props) when is_atom(Srv) ->
case whereis(Srv) of
'undefined' -> throw({'error', 'unknown_cache', Srv});
Pid -> store(Pid, K, V, Props)
end;
store(Srv, K, V, Props) when is_pid(Srv) ->
Obj = #cache_obj{key = K
,value = V
,expires = get_props_expires(Props)
,callback = get_props_callback(Props)
,origin = get_props_origin(Props)
},
gen_server:cast(Srv, {'store', Obj}).

-spec peek(atom(), any()) -> {'ok', any()} | {'error', 'not_found'}.
peek(Srv, K) ->
try ets:lookup_element(Srv, K, #cache_obj.value) of
Value -> {'ok', Value}
catch
'error':'badarg' ->
{'error', 'not_found'}
end.

-spec fetch(atom(), any()) -> {'ok', any()} | {'error', 'not_found'}.
fetch(Srv, K) ->
case peek(Srv, K) of
{'error', 'not_found'}=E -> E;
{'ok', _V}=Ok ->
gen_server:cast(Srv, {'update_timestamp', K, wh_util:current_tstamp()}),
Ok
end.

-spec erase(atom(), any()) -> 'ok'.
erase(Srv, K) ->
case peek(Srv, K) of
{'error', 'not_found'} -> 'ok';
{'ok', _} ->
gen_server:cast(Srv, {'erase', K})
end.

-spec flush(atom()) -> 'ok'.
flush(Srv) ->
gen_server:cast(Srv, {'flush'}).

-spec fetch_keys(atom()) -> list().
fetch_keys(Srv) ->
MatchSpec = [{#cache_obj{key = '$1'
,_ = '_'
}
,[]
,['$1']
}],
ets:select(Srv, MatchSpec).

-spec filter_erase(atom(), predicate2()) -> non_neg_integer().
filter_erase(Srv, Pred) when is_function(Pred, 2) ->
ets:foldl(fun(#cache_obj{key=K, value=V}, Count) ->
case Pred(K, V) of
'false' -> Count;
'true' ->
?MODULE:erase(Srv, K),
Count+1
end;
(_, Count) -> Count
end
,0
,Srv
).

-spec filter(atom(), predicate2()) -> [{any(), any()}].
filter(Srv, Pred) when is_function(Pred, 2) ->
ets:foldl(fun(#cache_obj{key=K, value=V}, Acc) ->
case Pred(K, V) of
'true' -> [{K, V} | Acc];
'false' -> Acc
end;
(_, Acc) -> Acc
end
,[]
,Srv
).

-spec dump(text()) -> 'ok'.
dump(Srv) ->
dump(Srv, 'false').

-spec dump(atom(), boolean()) -> 'ok'.
dump(Srv, ShowValue)
when is_atom(Srv), is_boolean(ShowValue) ->
{PointerTab, MonitorTab} = gen_listener:call(Srv, {'tables'}),
_ = [dump_table(Tab, ShowValue)
|| Tab <- [Srv, PointerTab, MonitorTab]
],
'ok';
dump(Srv, ShowValue) ->
dump(wh_util:to_atom(Srv), wh_util:to_boolean(ShowValue)).

-spec dump_table(ets:tid(), boolean()) -> 'ok'.
dump_table(Tab, ShowValue) ->
Now = wh_util:current_tstamp(),
io:format("Table ~p~n", [Tab]),
_ = [display_cache_obj(CacheObj, ShowValue, Now)
|| CacheObj <- ets:match_object(Tab, #cache_obj{_ = '_'})
],
'ok'.

-spec display_cache_obj(cache_obj(), boolean(), gregorian_seconds()) -> 'ok'.
display_cache_obj(#cache_obj{key=Key
,value=Value
,timestamp=Timestamp
,expires=Expires
,origin=Origin
,callback=Callback
}
,ShowValue
,Now
) ->
io:format("Key: ~300p~n", [Key]),
io:format("Expires: ~30p~n", [Expires]),
case is_number(Expires) of
'true' ->
io:format("Remaining: ~30p~n", [(Timestamp
+ Expires)
- Now
]);
'false' -> 'ok'
end,
io:format("Origin: ~300p~n", [Origin]),
io:format("Callback: ~s~n", [Callback =/= 'undefined']),
case ShowValue of
'true' -> io:format("Value: ~p~n", [Value]);
'false' -> 'ok'
end,
io:format("~n", []).

-spec wait_for_key(atom(), any()) -> {'ok', any()} | {'error', 'timeout'}.
-spec wait_for_key(atom(), any(), wh_timeout()) -> {'ok', any()} | {'error', 'timeout'}.
wait_for_key(Srv, Key) ->
wait_for_key(Srv, Key, ?DEFAULT_WAIT_TIMEOUT).

wait_for_key(Srv, Key, Timeout) ->
{'ok', Ref} = gen_server:call(Srv, {'wait_for_key', Key, Timeout}),
lager:debug("waiting for message with ref ~p", [Ref]),
receive
{'exists', Ref, Value} -> {'ok', Value};
{'store', Ref, Value} -> {'ok', Value};
{_, Ref, _} -> {'error', 'timeout'}
end.


%%% Internals

-spec get_props_datastore(wh_proplist()) -> module().
get_props_datastore(Props) ->
props:get_value('kzc_datastore', Props, 'kzc_ets_listener').

-spec get_props_expires(wh_proplist()) -> wh_timeout().
get_props_expires(Props) ->
case props:get_value('expires', Props) of
'undefined' -> ?EXPIRES;
'infinity' -> 'infinity';
Expires when is_integer(Expires)
andalso Expires > 0 ->
Expires
end.

-spec get_props_callback(wh_proplist()) -> 'undefined' | callback_fun().
get_props_callback(Props) ->
case props:get_value('callback', Props) of
'undefined' -> 'undefined';
Fun when is_function(Fun, 3) -> Fun
end.

-spec get_props_origin(wh_proplist()) -> 'undefined' | origin_tuple() | origin_tuples().
get_props_origin(Props) ->
maybe_add_db_origin(props:get_value('origin', Props)).

-spec maybe_add_db_origin(wh_proplist()) -> 'undefined' | origin_tuple() | origin_tuples().
maybe_add_db_origin(Props) when is_list(Props) -> maybe_add_db_origin(Props, []);
maybe_add_db_origin({'db', Db}) ->
[{'db',Db}, {'type', <<"database">>, Db}];
maybe_add_db_origin({'db', Db, Id}) ->
[{'db',Db,Id}, {'type', <<"database">>, Db}];
maybe_add_db_origin(Props) -> Props.

-spec maybe_add_db_origin(wh_proplist(), wh_proplist()) -> 'undefined' | origin_tuple() | origin_tuples().
maybe_add_db_origin([], Acc) -> lists:reverse(props:unique(Acc));
maybe_add_db_origin([{'db', Db} | Props], Acc) ->
maybe_add_db_origin(Props, [{'type', <<"database">>, Db}, {'db',Db} |Acc]);
maybe_add_db_origin([{'db', Db, Id} | Props], Acc) ->
maybe_add_db_origin(Props, [{'type', <<"database">>, Db}, {'db',Db,Id} |Acc]);
maybe_add_db_origin([P | Props], Acc) ->
maybe_add_db_origin(Props, [P |Acc]).

%%% End of Module

0 comments on commit 353427e

Please sign in to comment.