Skip to content

Commit

Permalink
Merge pull request #2701 from emqx/master
Browse files Browse the repository at this point in the history
Auto-pull-request-by-2019-07-20
  • Loading branch information
turtleDeng committed Jul 20, 2019
2 parents 4d14d51 + a6210f7 commit bffca30
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ git clone https://github.com/emqx/emqx-rel.git
cd emqx-rel && make
cd _rel/emqx && ./bin/emqx console
cd _build/emqx/rel/emqx && ./bin/emqx console
```

Expand Down
15 changes: 14 additions & 1 deletion etc/emqx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,19 @@ cluster.autoclean = 5m

## The address type is used to extract host from k8s service.
##
## Value: ip | dns
## Value: ip | dns | hostname
## cluster.k8s.address_type = ip

## The app name helps build 'node.name'.
##
## Value: String
## cluster.k8s.app_name = emqx

## The suffix added to dns and hostname get from k8s service
##
## Value: String
## cluster.k8s.suffix = pod.cluster.local

## Kubernates Namespace
##
## Value: String
Expand Down Expand Up @@ -1878,6 +1883,14 @@ plugins.expand_plugins_dir = {{ platform_plugins_dir }}/
## Default: 1m, 1 minute
broker.sys_interval = 1m

## System heartbeat interval of publishing following heart beat message:
## - "$SYS/brokers/<node>/uptime"
## - "$SYS/brokers/<node>/datetime"
##
## Value: Duration
## Default: 30s
broker.sys_heartbeat = 30s

## Enable global session registry.
##
## Value: on | off
Expand Down
15 changes: 13 additions & 2 deletions priv/emqx.schema
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
]}.

{mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [
{datatype, {enum, [ip, dns]}}
{datatype, {enum, [ip, dns, hostname]}}
]}.

{mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [
Expand All @@ -140,6 +140,11 @@
{datatype, string}
]}.

{mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [
{datatype, string},
{default, ""}
]}.

{translation, "ekka.cluster_discovery", fun(Conf) ->
Strategy = cuttlefish:conf_get("cluster.discovery", Conf),
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
Expand Down Expand Up @@ -176,7 +181,8 @@
{service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)},
{address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)},
{app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)},
{namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}];
{namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)},
{suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}];
(manual) ->
[ ]
end,
Expand Down Expand Up @@ -1807,6 +1813,11 @@ end}.
{default, "1m"}
]}.

{mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [
{datatype, {duration, ms}},
{default, "30s"}
]}.

{mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [
{default, on},
{datatype, flag}
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{gproc, "0.8.0"}, % hex
{replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.
Expand Down
13 changes: 13 additions & 0 deletions src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ call(CPid, Req) ->
%%--------------------------------------------------------------------

init({Transport, RawSocket, Options}) ->
process_flag(trap_exit, true),
{ok, Socket} = Transport:wait(RawSocket),
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
Expand Down Expand Up @@ -365,6 +366,16 @@ handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
handle(info, {shutdown, Reason}, State) ->
shutdown(Reason, State);

handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:session(ProtoState) of
undefined ->
?LOG(error, "Unexpected EXIT: ~p", [Info]),
{keep_state, State};
SessionPid ->
?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]),
shutdown(Reason, State)
end;

handle(info, Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{keep_state, State}.
Expand Down Expand Up @@ -499,6 +510,8 @@ maybe_gc(_, State) -> State.
reply(From, Reply, State) ->
{keep_state, State, [{reply, From, Reply}]}.

shutdown(Reason = {shutdown, _}, State) ->
stop(Reason, State);
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).

Expand Down
28 changes: 19 additions & 9 deletions src/emqx_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@
-define(RPC, gen_rpc).

call(Node, Mod, Fun, Args) ->
filter_result(?RPC:call(Node, Mod, Fun, Args)).
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).

multicall(Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)).
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).

cast(Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(Node, Mod, Fun, Args)).
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).

filter_result(Delivery) ->
case Delivery of
{badrpc, Reason} -> {badrpc, Reason};
{badtcp, Reason} -> {badrpc, Reason};
_ -> Delivery
end.
rpc_node(Node) ->
{Node, erlang:system_info(scheduler_id)}.

rpc_nodes(Nodes) ->
rpc_nodes(Nodes, []).

rpc_nodes([], Acc) ->
Acc;
rpc_nodes([Node | Nodes], Acc) ->
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).


filter_result({Error, Reason})
when Error =:= badrpc; Error =:= badtcp ->
{badrpc, Reason};
filter_result(Delivery) ->
Delivery.
14 changes: 2 additions & 12 deletions src/emqx_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry
_ ->
send_willmsg(WillMsg)
end,
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
shutdown(Reason, State#state{will_msg = undefined, conn_pid = undefined});

handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
State1 = case Reason of
Expand All @@ -652,23 +652,13 @@ handle_info(Info, State) ->

terminate(Reason, #state{will_msg = WillMsg,
client_id = ClientId,
username = Username,
conn_pid = ConnPid,
old_conn_pid = OldConnPid}) ->
username = Username}) ->
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

maybe_shutdown(undefined, _Reason) ->
ok;
maybe_shutdown(Pid, normal) ->
Pid ! {shutdown, normal};
maybe_shutdown(Pid, Reason) ->
exit(Pid, Reason).

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion src/emqx_sys.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
, datetime/0
, sysdescr/0
, sys_interval/0
, sys_heatbeat_interval/0
]).

-export([info/0]).
Expand Down Expand Up @@ -92,6 +93,11 @@ datetime() ->
sys_interval() ->
application:get_env(?APP, broker_sys_interval, 60000).

%% @doc Get sys heatbeat interval
-spec(sys_heatbeat_interval() -> pos_integer()).
sys_heatbeat_interval() ->
application:get_env(?APP, sys_heartbeat, 30000).

%% @doc Get sys info
-spec(info() -> list(tuple())).
info() ->
Expand All @@ -111,7 +117,7 @@ init([]) ->
{ok, heartbeat(tick(State))}.

heartbeat(State) ->
State#state{heartbeat = start_timer(timer:seconds(1), heartbeat)}.
State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}.
tick(State) ->
State#state{ticker = start_timer(sys_interval(), tick)}.

Expand Down
42 changes: 32 additions & 10 deletions src/emqx_ws_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -299,30 +299,49 @@ websocket_info({shutdown, Reason}, State) ->
websocket_info({stop, Reason}, State) ->
{stop, State#state{shutdown = Reason}};

websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:session(ProtoState) of
undefined ->
?LOG(error, "Unexpected EXIT: ~p", [Info]),
{ok, State};
SessionPid ->
?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]),
shutdown(Reason, State)
end;

websocket_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, State}.

terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError]),
terminate(WsReason, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, websocket reason: ~p",
[Shutdown, WsReason]),
emqx_keepalive:cancel(Keepalive),
case {ProtoState, Shutdown} of
{undefined, _} -> ok;
{_, {shutdown, Reason}} ->
emqx_protocol:terminate(Reason, ProtoState),
exit(Reason);
{_, Error} ->
emqx_protocol:terminate(Error, ProtoState),
exit({error, SockError})
terminate_session(Reason, ProtoState);
{_, _Error} ->
?LOG(info, "Terminate for unexpected error: ~p", [WsReason]),
terminate_session(unknown, ProtoState)
end.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

terminate_session(Reason, ProtoState) ->
emqx_protocol:terminate(Reason, ProtoState),
case emqx_protocol:session(ProtoState) of
undefined ->
ok;
SessionPid ->
unlink(SessionPid),
SessionPid ! {'EXIT', self(), Reason}
end.

handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:received(Packet, ProtoState) of
{ok, NProtoState} ->
Expand All @@ -343,6 +362,9 @@ ensure_stats_timer(State = #state{enable_stats = true,
ensure_stats_timer(State) ->
State.

shutdown(Reason = {shutdown, _}, State) ->
self() ! {stop, Reason},
{ok, State};
shutdown(Reason, State) ->
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
self() ! {stop, {shutdown, Reason}},
Expand Down

0 comments on commit bffca30

Please sign in to comment.