Skip to content

Commit

Permalink
Merge branch 'log_level' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
tigercl committed Mar 27, 2019
2 parents f0fa9a9 + 0fbf771 commit 1decc08
Show file tree
Hide file tree
Showing 35 changed files with 187 additions and 180 deletions.
3 changes: 2 additions & 1 deletion src/emqx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-module(emqx).

-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").

%% Start/Stop the application
Expand Down Expand Up @@ -180,7 +181,7 @@ shutdown() ->
shutdown(normal).

shutdown(Reason) ->
emqx_logger:error("emqx shutdown for ~s", [Reason]),
?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]),
emqx_alarm_handler:unload(),
emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
Expand Down
6 changes: 3 additions & 3 deletions src/emqx_alarm_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,17 @@ init(_) ->
handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(notice, "Alarm report: set ~p", [Alarm]),
?LOG(warning, "[Alarm Handler] ~p set", [Alarm]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "Alarm report: clear ~p", [AlarmId]),
?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]),
emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
clear_alarm_(AlarmId),
{ok, State};
Expand Down
6 changes: 3 additions & 3 deletions src/emqx_banned.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.

handle_call(Req, _From, State) ->
?ERROR("[Banned] unexpected call: ~p", [Req]),
?LOG(error, "[Banned] unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast(Msg, State) ->
?ERROR("[Banned] unexpected msg: ~p", [Msg]),
?LOG(error, "[Banned] unexpected msg: ~p", [Msg]),
{noreply, State}.

handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]),
{noreply, ensure_expiry_timer(State), hibernate};

handle_info(Info, State) ->
?ERROR("[Banned] unexpected info: ~p", [Info]),
?LOG(error, "[Banned] unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, #{expiry_timer := TRef}) ->
Expand Down
14 changes: 7 additions & 7 deletions src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ standing_by(state_timeout, do_connect, State) ->
standing_by({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, {error,standing_by}}]};
standing_by(info, Info, State) ->
?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State};
standing_by(Type, Content, State) ->
common(standing_by, Type, Content, State).
Expand All @@ -320,7 +320,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
ok = subscribe_local_topics(Forwards),
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "Bridge ~p connected", [name()]),
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
Action = {state_timeout, 0, connected},
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
error ->
Expand Down Expand Up @@ -370,7 +370,7 @@ connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref := undefined, connection := undefined}};
false ->
Expand All @@ -382,7 +382,7 @@ connected(info, {batch_ack, Ref}, State) ->
keep_state_and_data;
bad_order ->
%% try re-connect then re-send
?LOG(error, "Bad order ack received by bridge ~p", [name()]),
?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]),
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState, ?maybe_send}
Expand Down Expand Up @@ -413,7 +413,7 @@ common(_StateName, info, {dispatch, _, Msg},
NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) ->
?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p",
?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p",
[name(), Type, StateName, Content]),
{keep_state, State}.

Expand Down Expand Up @@ -484,7 +484,7 @@ retry_inflight(#{inflight := Inflight} = State,
{ok, NewState} ->
retry_inflight(NewState, T);
{error, Reason} ->
?LOG(error, "Inflight retry failed\n~p", [Reason]),
?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]),
{error, State#{inflight := Inflight ++ Remain}}
end.

Expand Down Expand Up @@ -515,7 +515,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
batch => Batch}],
{ok, State#{inflight := NewInflight}};
{error, Reason} ->
?LOG(info, "Batch produce failed\n~p", [Reason]),
?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]),
{error, State}
end.

Expand Down
2 changes: 1 addition & 1 deletion src/emqx_bridge_connect.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ start(Module, Config) ->
{ok, Ref, Conn};
{error, Reason} ->
Config1 = obfuscate(Config),
?LOG(error, "Failed to connect with module=~p\n"
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
error
end.
Expand Down
14 changes: 7 additions & 7 deletions src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ publish(Msg) when is_record(Msg, message) ->
Headers = Msg#message.headers,
case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
#message{headers = #{allow_publish := false}} ->
?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]),
?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Expand All @@ -209,7 +209,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
publish(Msg)
catch
_:Error:Stacktrace ->
?ERROR("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
after
ok
end.
Expand Down Expand Up @@ -256,7 +256,7 @@ forward(Node, To, Delivery) ->
%% rpc:call to ensure the delivery, but the latency:(
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?ERROR("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
end.
Expand Down Expand Up @@ -424,14 +424,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
{reply, Ok, State};

handle_call(Req, _From, State) ->
?ERROR("[Broker] unexpected call: ~p", [Req]),
?LOG(error, "[Broker] Unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} ->
?ERROR("[Broker] Failed to add route: ~p", [Reason])
?LOG(error, "[Broker] Failed to add route: ~p", [Reason])
end,
{noreply, State};

Expand All @@ -454,11 +454,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
{noreply, State};

handle_cast(Msg, State) ->
?ERROR("[Broker] unexpected cast: ~p", [Msg]),
?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info(Info, State) ->
?ERROR("[Broker] unexpected info: ~p", [Info]),
?LOG(error, "[Broker] Unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, #{pool := Pool, id := Id}) ->
Expand Down
6 changes: 3 additions & 3 deletions src/emqx_broker_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ init([]) ->
{ok, #{pmon => emqx_pmon:new()}}.

handle_call(Req, _From, State) ->
?ERROR("[BrokerHelper] unexpected call: ~p", [Req]),
?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
Expand All @@ -119,7 +119,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};

handle_cast(Msg, State) ->
?ERROR("[BrokerHelper] unexpected cast: ~p", [Msg]),
?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
Expand All @@ -130,7 +130,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
{noreply, State#{pmon := PMon1}};

handle_info(Info, State) ->
?ERROR("[BrokerHelper] unexpected info: ~p", [Info]),
?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, _State) ->
Expand Down
27 changes: 14 additions & 13 deletions src/emqx_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

-behaviour(gen_statem).

-include("logger.hrl").
-include("types.hrl").
-include("emqx_client.hrl").

Expand Down Expand Up @@ -786,10 +787,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
State#state{inflight = Inflight1};
{value, {pubrel, _Ref, _Ts}} ->
emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]),
?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]),
State;
none ->
emqx_logger:warning("Unexpected PUBREC Packet: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]),
State
end);

Expand All @@ -804,7 +805,7 @@ connected(cast, ?PUBREL_PACKET(PacketId),
false -> {keep_state, NewState}
end;
error ->
emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]),
keep_state_and_data
end;

Expand Down Expand Up @@ -903,33 +904,33 @@ handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
emqx_logger:debug("RECV Data: ~p", [Data]),
?LOG(debug, "[Client] RECV Data: ~p", [Data]),
process_incoming(Data, [], run_sock(State));

handle_event(info, {Error, _Sock, Reason}, _StateName, State)
when Error =:= tcp_error; Error =:= ssl_error ->
emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]),
?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]),
{stop, {shutdown, Reason}, State};

handle_event(info, {Closed, _Sock}, _StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
emqx_logger:debug("[~p] ~p", [?MODULE, Closed]),
?LOG(debug, "[Client] ~p", [Closed]),
{stop, {shutdown, Closed}, State};

handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]),
{stop, {shutdown, Reason}, State};

handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
keep_state_and_data;

handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
{stop, {shutdown, Reason}, State};

handle_event(EventType, EventContent, StateName, _StateData) ->
emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
keep_state_and_data.

%% Mandatory callback functions
Expand Down Expand Up @@ -971,7 +972,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]),
State
end;
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
Expand All @@ -983,7 +984,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]),
State
end.

Expand Down Expand Up @@ -1186,7 +1187,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) ->
send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
emqx_logger:debug("SEND Data: ~1000p", [Packet]),
?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]),
case emqx_client_sock:send(Sock, Data) of
ok -> {ok, bump_last_packet_id(State)};
Error -> Error
Expand Down
6 changes: 3 additions & 3 deletions src/emqx_cm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ init([]) ->
{ok, #{conn_pmon => emqx_pmon:new()}}.

handle_call(Req, _From, State) ->
?ERROR("[CM] unexpected call: ~p", [Req]),
?LOG(error, "[CM] Unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
Expand All @@ -169,7 +169,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};

handle_cast(Msg, State) ->
?ERROR("[CM] unexpected cast: ~p", [Msg]),
?LOG(error, "[CM] Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
Expand All @@ -180,7 +180,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}
{noreply, State#{conn_pmon := PMon1}};

handle_info(Info, State) ->
?ERROR("[CM] unexpected info: ~p", [Info]),
?LOG(error, "[CM] Unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, _State) ->
Expand Down
Loading

0 comments on commit 1decc08

Please sign in to comment.