Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
add logging around db load
Browse files Browse the repository at this point in the history
  • Loading branch information
jkvor committed May 17, 2011
1 parent 4ecdfdc commit 0f1b0b4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
11 changes: 0 additions & 11 deletions src/logplex_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

%% Application callbacks
-export([start/2, stop/1, init/1]).
-export([nsync_opts/0]).

-include_lib("logplex.hrl").

Expand Down Expand Up @@ -94,16 +93,6 @@ boot_pagerduty() ->
boot_nsync() ->
ok = application:start(nsync, temporary).

nsync_opts() ->
RedisOpts = logplex_utils:redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
Ip = case proplists:get_value(ip, RedisOpts) of
{_,_,_,_}=L -> string:join([integer_to_list(I) || I <- tuple_to_list(L)], ".");
Other -> Other
end,
RedisOpts1 = proplists:delete(ip, RedisOpts),
RedisOpts2 = [{ip, Ip} | RedisOpts1],
[{callback, {nsync_callback, handle, []}}, {block, true} | RedisOpts2].

boot_redis() ->
case application:start(redis, temporary) of
ok ->
Expand Down
22 changes: 20 additions & 2 deletions src/logplex_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ restore(Filename) ->
%%--------------------------------------------------------------------
init([]) ->
start(),
{ok, _Pid} = nsync:start_link(logplex_app:nsync_opts()),
Opts = nsync_opts(),
io:format("nsync:start_link(~p)~n", [Opts]),
{ok, _Pid} = nsync:start_link(Opts),
io:format("nsync finish~n"),
spawn_link(fun backup_timer/0),
{ok, []}.

Expand Down Expand Up @@ -131,16 +134,28 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
nsync_opts() ->
RedisOpts = logplex_utils:redis_opts("LOGPLEX_CONFIG_REDIS_URL"),
Ip = case proplists:get_value(ip, RedisOpts) of
{_,_,_,_}=L -> string:join([integer_to_list(I) || I <- tuple_to_list(L)], ".");
Other -> Other
end,
RedisOpts1 = proplists:delete(ip, RedisOpts),
RedisOpts2 = [{ip, Ip} | RedisOpts1],
[{callback, {nsync_callback, handle, []}}, {block, true}, {timeout, 720 * 1000} | RedisOpts2].

start() ->
wait_for_nodes(),
SchemaVsn = schema_version(),
io:format("schema version ~p~n", [SchemaVsn]),
case nodes() == [] of
true ->
SchemaVsn == undefined andalso create_schema(),
mnesia:start(),
SchemaVsn == undefined andalso create_tables(),
ok;
false ->
io:format("mnesia nodes ~p~n", [nodes()]),
mnesia:start(),
mnesia:change_config(extra_db_nodes, nodes()),
mnesia:change_table_copy_type(schema, node(), disc_copies),
Expand All @@ -149,6 +164,7 @@ start() ->
end.

wait_for_nodes() ->
io:format("wait for nodes~n"),
Registered = lists:sort([Node || {Node, _} <- redgrid:nodes()]),
Running = lists:sort([node()|nodes()]),
case Registered == Running of
Expand Down Expand Up @@ -182,7 +198,7 @@ create_tables() ->
ok.

sync_tables_to_local() ->
io:format("sync tables to local~n"),
A = now(),
Tables = [{T, mnesia:table_info(T, where_to_commit)} || T <- mnesia:system_info(tables)],
Copies =
lists:foldl(
Expand All @@ -197,6 +213,8 @@ sync_tables_to_local() ->
end, [], Tables),
[mnesia:add_table_copy(T, node(), Type) || {T, Type} <- Copies],
mnesia:wait_for_tables(mnesia:system_info(tables), 60000),
B = now(),
io:format("logplex_db sync tables to local duration=~w~n", [timer:now_diff(B,A) div 1000]),
ok.

backup_timer() ->
Expand Down
48 changes: 32 additions & 16 deletions src/nsync_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ handle({load, <<"drain_index">>, Index}) ->

handle({load, <<"ch:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
Id = list_to_integer(parse_id(Rest)),
AppId = case dict_find(<<"app_id">>, Dict) of
undefined -> undefined;
Val -> list_to_integer(binary_to_list(Val))
end,
Channel = #channel{id=Id,
name=dict:fetch(<<"name">>, Dict),
app_id=list_to_integer(binary_to_list(dict:fetch(<<"app_id">>, Dict))),
addon=dict:fetch(<<"addon">>, Dict)},
name=dict_find(<<"name">>, Dict),
app_id=AppId,
addon=dict_find(<<"addon">>, Dict)},
mnesia:dirty_write(channel, Channel),
undefined;

Expand All @@ -50,26 +54,31 @@ handle({load, <<"tok:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
Id = list_to_integer(parse_id(Rest)),
Drain = #drain{id=Id,
channel_id=dict:fetch(<<"ch">>, Dict),
resolved_host=logplex_utils:resolve_host(dict:fetch(<<"host">>, Dict)),
host=dict:fetch(<<"host">>, Dict),
port=dict:fetch(<<"port">>, Dict)},
channel_id=dict_find(<<"ch">>, Dict),
resolved_host=logplex_utils:resolve_host(dict_find(<<"host">>, Dict)),
host=dict_find(<<"host">>, Dict),
port=dict_find(<<"port">>, Dict)},
mnesia:dirty_write(drain, Drain),
undefined;

handle({load, _Key, _Val}) ->
undefined;

handle({load, eof}) ->
io:format("nsync load complete~n"),
ok;

handle({cmd, "hmset", [<<"ch:", Rest/binary>> | Args]}) ->
Id = list_to_integer(parse_id(Rest)),
Dict = dict_from_list(Args),
AppId = case dict_find(<<"app_id">>, Dict) of
undefined -> undefined;
Val -> list_to_integer(binary_to_list(Val))
end,
Channel = #channel{id=Id,
name=dict:fetch(<<"name">>, Dict),
app_id=list_to_integer(binary_to_list(dict:fetch(<<"app_id">>, Dict))),
addon=dict:fetch(<<"addon">>, Dict)},
name=dict_find(<<"name">>, Dict),
app_id=AppId,
addon=dict_find(<<"addon">>, Dict)},
{atomic, _} = mnesia:transaction(
fun() ->
mnesia:write(channel, Channel, write),
Expand All @@ -91,10 +100,10 @@ handle({cmd, "hmset", [<<"drain:", Rest/binary>> | Args]}) ->
Id = list_to_integer(parse_id(Rest)),
Dict = dict_from_list(Args),
Drain = #drain{id=Id,
channel_id=dict:fetch(<<"ch">>, Dict),
resolved_host=logplex_utils:resolve_host(dict:fetch(<<"host">>, Dict)),
host=dict:fetch(<<"host">>, Dict),
port=dict:fetch(<<"port">>, Dict)},
channel_id=dict_find(<<"ch">>, Dict),
resolved_host=logplex_utils:resolve_host(dict_find(<<"host">>, Dict)),
host=dict_find(<<"host">>, Dict),
port=dict_find(<<"port">>, Dict)},
{atomic, _} = mnesia:transaction(
fun() ->
mnesia:write(drain, Drain, write),
Expand Down Expand Up @@ -144,8 +153,8 @@ parse_id(<<C, Rest/binary>>, Acc) ->
parse_id(Rest, [C|Acc]).

create_token(Id, Dict) ->
ChannelId = list_to_integer(binary_to_list(dict:fetch(<<"ch">>, Dict))),
Name = dict:fetch(<<"name">>, Dict),
ChannelId = list_to_integer(binary_to_list(dict_find(<<"ch">>, Dict))),
Name = dict_find(<<"name">>, Dict),
{AppId, Addon} = case lookup_channel(ChannelId) of
#channel{app_id=AppId0, addon=Addon0} -> {AppId0, Addon0};
undefined -> {undefined, undefined}
Expand All @@ -171,3 +180,10 @@ dict_from_list([], Dict) ->

dict_from_list([Key, Val | Rest], Dict) ->
dict_from_list(Rest, dict:store(Key, Val, Dict)).

dict_find(Key, Dict) ->
case dict:find(Key, Dict) of
{ok, Val} -> Val;
_ -> undefined
end.

0 comments on commit 0f1b0b4

Please sign in to comment.