Skip to content

Commit

Permalink
Add portal transport over emqx_client.
Browse files Browse the repository at this point in the history
  • Loading branch information
spring2maz committed Feb 10, 2019
1 parent 9a18d07 commit d866a97
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 33 deletions.
18 changes: 12 additions & 6 deletions etc/emqx.conf
Expand Up @@ -1689,11 +1689,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: Number
## bridge.aws.subscription.2.qos = 1

## Maximum number of messages in one batch for buffer queue to store
## Maximum number of messages in one batch when sending to remote borkers
## NOTE: when bridging viar MQTT connection to remote broker, this config is only
## used for internal message passing optimization as the underlying MQTT
## protocol does not supports batching.
##
## Value: Integer
## default: 1000
## bridge.aws.queue.batch_size = 1000
## default: 32
## bridge.aws.queue.batch_size = 32

## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
Expand Down Expand Up @@ -1839,11 +1842,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: Number
## bridge.azure.subscription.2.qos = 1

## Batch size for buffer queue stored
## Maximum number of messages in one batch when sending to remote borkers
## NOTE: when bridging viar MQTT connection to remote broker, this config is only
## used for internal message passing optimization as the underlying MQTT
## protocol does not supports batching.
##
## Value: Integer
## default: 1000
## bridge.azure.queue.batch_size = 1000
## default: 32
## bridge.azure.queue.batch_size = 32

## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
Expand Down
5 changes: 3 additions & 2 deletions src/emqx_portal_connect.erl
Expand Up @@ -20,11 +20,12 @@

-optional_callbacks([]).

%% map fields depend on implementation
-type config() :: map().
-type connection() :: term().
-type conn_ref() :: term().
-type batch() :: emqx_protal:batch().
-type batch_ref() :: reference().
-type ack_ref() :: emqx_portal:ack_ref().

-include("logger.hrl").

Expand All @@ -36,7 +37,7 @@
%% send to remote node/cluster
%% portal worker (the caller process) should be expecting
%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
-callback send(connection(), batch()) -> {ok, batch_ref()} | {error, any()}.
-callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}.

%% called when owner is shutting down.
-callback stop(conn_ref(), connection()) -> ok.
Expand Down
29 changes: 19 additions & 10 deletions src/portal/emqx_portal.erl
Expand Up @@ -52,6 +52,9 @@
%% to support automatic load-balancing, i.e. in case it can not keep up
%% with the amount of messages comming in, administrator should split and
%% balance topics between worker/connections manually.
%%
%% NOTES:
%% * Local messages are all normalised to QoS-1 when exporting to remote

-module(emqx_portal).
-behaviour(gen_statem).
Expand All @@ -71,17 +74,18 @@

-export_type([config/0,
batch/0,
ref/0
ack_ref/0
]).

-type config() :: map().
-type batch() :: [emqx_portal_msg:msg()].
-type ref() :: reference().
-type ack_ref() :: term().

-include("logger.hrl").
-include("emqx_mqtt.hrl").

-define(DEFAULT_BATCH_COUNT, 100).
%% same as default in-flight limit for emqx_client
-define(DEFAULT_BATCH_COUNT, 32).
-define(DEFAULT_BATCH_BYTES, 1 bsl 20).
-define(DEFAULT_SEND_AHEAD, 8).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
Expand Down Expand Up @@ -110,7 +114,7 @@
start_link(Name, Config) when is_list(Config) ->
start_link(Name, maps:from_list(Config));
start_link(Name, Config) ->
gen_statem:start_link({local, Name}, ?MODULE, Config, []).
gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).

stop(Pid) -> gen_statem:stop(Pid).

Expand All @@ -122,7 +126,7 @@ import_batch(Batch, AckFun) ->

%% @doc This function is to be evaluated on message/batch exporter side
%% when message/batch is accepted by remote node.
-spec handle_ack(pid(), ref()) -> ok.
-spec handle_ack(pid(), ack_ref()) -> ok.
handle_ack(Pid, Ref) when node() =:= node(Pid) ->
Pid ! {batch_ack, Ref},
ok.
Expand Down Expand Up @@ -231,7 +235,8 @@ connected(internal, maybe_send, State) ->
end;
connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRef, connection := Conn} = State) ->
?INFO("Portal ~p diconnected~nreason=~p", [Conn, Reason]),
?INFO("Portal ~p diconnected~nreason=~p",
[name(), Conn, Reason]),
{next_state, connecting,
State#{conn_ref := undefined,
connection := undefined
Expand All @@ -255,7 +260,8 @@ common(_StateName, info, {dispatch, _, Msg},
NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) ->
?INFO("Ignored unknown ~p event ~p at state ~p", [Type, Content, StateName]),
?DEBUG("Portal ~p discarded ~p type event at state ~p:~p",
[name(), Type, StateName, Content]),
{keep_state, State}.

collect(Acc) ->
Expand Down Expand Up @@ -300,6 +306,7 @@ pop_and_send(#{replayq := Q,
do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
case maybe_send(State, Batch) of
{ok, Ref} ->
%% this is a list of inflight BATCHes, not expecting it to be too long
NewInflight = Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref,
batch => Batch
Expand All @@ -326,8 +333,6 @@ subscribe_local_topics(Topics) ->
emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()})
end, Topics).

name() -> {_, Name} = process_info(self(), registered_name), Name.

disconnect(#{connection := Conn,
conn_ref := ConnRef,
connect_module := Module
Expand All @@ -347,10 +352,14 @@ maybe_send(#{connect_module := Module,
connection := Connection,
mountpoint := Mountpoint
}, Batch) ->
Module:send(Connection, [emqx_portal_msg:apply_mountpoint(M, Mountpoint) || M <- Batch]).
Module:send(Connection, [emqx_portal_msg:to_export(M, Mountpoint) || M <- Batch]).

format_mountpoint(undefined) ->
undefined;
format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).

name() -> {_, Name} = process_info(self(), registered_name), Name.

name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).

135 changes: 135 additions & 0 deletions src/portal/emqx_portal_mqtt.erl
@@ -0,0 +1,135 @@
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

%% @doc This module implements EMQX Portal transport layer on top of MQTT protocol

-module(emqx_portal_mqtt).
-behaviour(emqx_portal_connect).

%% behaviour callbacks
-export([start/1,
send/2,
stop/2
]).

-include("emqx_mqtt.hrl").

-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).

%% Messages towards ack collector process
-define(SENT(MaxPktId), {sent, MaxPktId}).
-define(ACKED(AnyPktId), {acked, AnyPktId}).
-define(STOP(Ref), {stop, Ref}).

start(Config) ->
Ref = make_ref(),
Parent = self(),
AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end),
Handlers = make_hdlr(Parent, AckCollector, Ref),
case emqx_client:start_link(Config#{msg_handler => Handlers, owner => AckCollector}) of
{ok, Pid} ->
case emqx_client:connect(Pid) of
{ok, _} ->
%% ack collector is always a new pid every reconnect.
%% use it as a connection reference
{ok, Ref, #{ack_collector => AckCollector,
client_pid => Pid}};
{error, Reason} ->
ok = stop(AckCollector, Pid),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.

stop(Ref, #{ack_collector := AckCollector,
client_pid := Pid}) ->
MRef = monitor(process, AckCollector),
unlink(AckCollector),
_ = AckCollector ! ?STOP(Ref),
receive
{'DOWN', MRef, _, _, _} ->
ok
after
1000 ->
exit(AckCollector, kill)
end,
_ = emqx_client:stop(Pid),
ok.

send(#{client_pid := ClientPid, ack_collector := AckCollector}, Batch) ->
send_loop(ClientPid, AckCollector, Batch).

send_loop(ClientPid, AckCollector, [Msg | Rest]) ->
case emqx_client:publish(ClientPid, Msg) of
{ok, PktId} when Rest =:= [] ->
Rest =:= [] andalso AckCollector ! ?SENT(PktId),
{ok, PktId};
{ok, _PktId} ->
send_loop(ClientPid, AckCollector, Rest);
{error, {_PacketId, inflight_full}} ->
timer:sleep(100),
send_loop(ClientPid, AckCollector, [Msg | Rest]);
{error, Reason} ->
%% There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.

ack_collector(Parent, ConnRef) ->
ack_collector(Parent, ConnRef, []).

ack_collector(Parent, ConnRef, PktIds) ->
NewIds =
receive
?STOP(ConnRef) ->
exit(normal);
?SENT(PktId) ->
%% this ++ only happens per-BATCH, hence no optimization
PktIds ++ [PktId];
?ACKED(PktId) ->
handle_ack(Parent, PktId, PktIds)
after
200 ->
PktIds
end,
ack_collector(Parent, ConnRef, NewIds).

handle_ack(Parent, PktId, [PktId | Rest]) ->
%% A batch is finished, time to ack portal
ok = emqx_portal:handle_ack(Parent, PktId),
Rest;
handle_ack(_Parent, PktId, [BatchMaxPktId | _] = All) ->
%% partial ack of a batch, terminate here.
true = (PktId < BatchMaxPktId), %% bad order otherwise
All.

%% When puback for QoS-1 message is received from remote MQTT broker
%% NOTE: no support for QoS-2
handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) ->
RC =:= ?RC_SUCCESS andalso error(RC),
AckCollector ! ?ACKED(PktId),
ok.

%% Message published from remote broker. Import to local broker.
import_msg(Msg) ->
%% auto-ack should be enabled in emqx_client, hence dummy ack-fun.
emqx_portal:import_batch([Msg], _AckFun = fun() -> ok end).

make_hdlr(Parent, AckCollector, Ref) ->
#{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,
publish => fun(Msg) -> import_msg(Msg) end,
disconnected => fun(RC, _Properties) -> Parent ! {disconnected, Ref, RC}, ok end
}.

12 changes: 7 additions & 5 deletions src/portal/emqx_portal_msg.erl
Expand Up @@ -16,7 +16,7 @@

-export([ to_binary/1
, from_binary/1
, apply_mountpoint/2
, to_export/2
, to_broker_msgs/1
, estimate_size/1
]).
Expand All @@ -28,10 +28,12 @@

-type msg() :: emqx_types:message().

%% @doc Mount topic to a prefix.
-spec apply_mountpoint(msg(), undefined | binary()) -> msg().
apply_mountpoint(#{topic := Topic} = Msg, Mountpoint) ->
Msg#{topic := topic(Mountpoint, Topic)}.
%% @doc Make export format:
%% 1. Mount topic to a prefix
%% 2. fix QoS to 1
-spec to_export(msg(), undefined | binary()) -> msg().
to_export(#{topic := Topic} = Msg, Mountpoint) ->
Msg#{topic := topic(Mountpoint, Topic), qos => 1}.

%% @doc Make `binary()' in order to make iodata to be persisted on disk.
-spec to_binary(msg()) -> binary().
Expand Down
6 changes: 3 additions & 3 deletions src/portal/emqx_portal_rpc.erl
Expand Up @@ -29,7 +29,7 @@
, heartbeat/2
]).

-type batch_ref() :: emqx_portal:batch_ref().
-type ack_ref() :: emqx_portal:ack_ref().
-type batch() :: emqx_portal:batch().

-define(HEARTBEAT_INTERVAL, timer:seconds(1)).
Expand Down Expand Up @@ -59,7 +59,7 @@ stop(Pid, _Remote) when is_pid(Pid) ->
ok.

%% @doc Callback for `emqx_portal_connect' behaviour
-spec send(node(), batch()) -> {ok, batch_ref()} | {error, any()}.
-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
send(Remote, Batch) ->
Sender = self(),
case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of
Expand All @@ -68,7 +68,7 @@ send(Remote, Batch) ->
end.

%% @doc Handle send on receiver side.
-spec handle_send(pid(), batch()) -> {ok, batch_ref()} | {error, any()}.
-spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}.
handle_send(SenderPid, Batch) ->
SenderNode = node(SenderPid),
Ref = make_ref(),
Expand Down

0 comments on commit d866a97

Please sign in to comment.