Skip to content

Commit

Permalink
Merge pull request #9449 from thalesmg/gcp-pubsub-ee50
Browse files Browse the repository at this point in the history
feat(gcp_pubsub): implement GCP PubSub bridge (ee5.0)
  • Loading branch information
thalesmg committed Dec 14, 2022
2 parents 948d417 + 2932ace commit f827062
Show file tree
Hide file tree
Showing 26 changed files with 3,486 additions and 79 deletions.
33 changes: 32 additions & 1 deletion apps/emqx/test/emqx_common_test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@

-export([clear_screen/0]).
-export([with_mock/4]).
-export([
on_exit/1
]).

%% Toxiproxy API
-export([
Expand Down Expand Up @@ -161,7 +164,17 @@ boot_modules(Mods) ->

-spec start_apps(Apps :: apps()) -> ok.
start_apps(Apps) ->
start_apps(Apps, fun(_) -> ok end).
%% to avoid keeping the `db_hostname' that is set when loading
%% `system_monitor' application in `emqx_machine', and then it
%% crashing when trying to connect.
%% FIXME: add an `enable' option to sysmon_top and use that to
%% decide whether to start it or not.
DefaultHandler =
fun(_) ->
application:set_env(system_monitor, db_hostname, ""),
ok
end,
start_apps(Apps, DefaultHandler).

-spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok.
start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) ->
Expand Down Expand Up @@ -920,3 +933,21 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
[],
[{body_format, binary}]
).

%%-------------------------------------------------------------------------------
%% Testcase teardown utilities
%%-------------------------------------------------------------------------------

get_or_spawn_janitor() ->
case get({?MODULE, janitor_proc}) of
undefined ->
{ok, Janitor} = emqx_test_janitor:start_link(),
put({?MODULE, janitor_proc}, Janitor),
Janitor;
Janitor ->
Janitor
end.

on_exit(Fun) ->
Janitor = get_or_spawn_janitor(),
ok = emqx_test_janitor:push_on_exit_callback(Janitor, Fun).
69 changes: 69 additions & 0 deletions apps/emqx/test/emqx_test_janitor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_test_janitor).

-behaviour(gen_server).

%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).

%% API
-export([
start_link/0,
push_on_exit_callback/2
]).

%%----------------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------------

start_link() ->
gen_server:start_link(?MODULE, self(), []).

push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
gen_server:call(Server, {push, Callback}).

%%----------------------------------------------------------------------------------
%% `gen_server' API
%%----------------------------------------------------------------------------------

init(Parent) ->
process_flag(trap_exit, true),
Ref = monitor(process, Parent),
{ok, #{callbacks => [], owner => {Ref, Parent}}}.

terminate(_Reason, #{callbacks := Callbacks}) ->
lists:foreach(fun(Fun) -> Fun() end, Callbacks).

handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
handle_call(_Req, _From, State) ->
{reply, error, State}.

handle_cast(_Req, State) ->
{noreply, State}.

handle_info({'DOWN', Ref, process, Parent, _Reason}, State = #{owner := {Ref, Parent}}) ->
{stop, normal, State};
handle_info(_Msg, State) ->
{noreply, State}.
2 changes: 1 addition & 1 deletion apps/emqx_bridge/src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
-define(EGRESS_DIR_BRIDGES(T),
T == webhook;
T == mysql;
T == gcp_pubsub;
T == influxdb_api_v1;
T == influxdb_api_v2
%% T == influxdb_udp
).

load() ->
Expand Down
17 changes: 17 additions & 0 deletions apps/emqx_connector/include/emqx_connector_tables.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-define(JWT_TABLE, emqx_connector_jwt).
1 change: 1 addition & 0 deletions apps/emqx_connector/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

{deps, [
{emqx, {path, "../emqx"}},
{emqx_resource, {path, "../emqx_resource"}},
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}},
Expand Down
1 change: 1 addition & 0 deletions apps/emqx_connector/src/emqx_connector.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mysql,
mongodb,
ehttpc,
jose,
emqx,
emqtt
]},
Expand Down
46 changes: 46 additions & 0 deletions apps/emqx_connector/src/emqx_connector_jwt.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_connector_jwt).

-include("emqx_connector_tables.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").

%% API
-export([
lookup_jwt/1,
lookup_jwt/2
]).

-type jwt() :: binary().

-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(ResourceId) ->
?MODULE:lookup_jwt(?JWT_TABLE, ResourceId).

-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(TId, ResourceId) ->
try
case ets:lookup(TId, {ResourceId, jwt}) of
[{{ResourceId, jwt}, JWT}] ->
{ok, JWT};
[] ->
{error, not_found}
end
catch
error:badarg ->
{error, not_found}
end.
99 changes: 99 additions & 0 deletions apps/emqx_connector/src/emqx_connector_jwt_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_connector_jwt_sup).

-behaviour(supervisor).

-include("emqx_connector_tables.hrl").

-export([
start_link/0,
ensure_worker_present/2,
ensure_worker_deleted/1
]).

-export([init/1]).

-type worker_id() :: term().

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
ensure_jwt_table(),
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 5,
auto_shutdown => never
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.

%% @doc Starts a new JWT worker. The caller should use
%% `emqx_connector_jwt_sup:ensure_jwt/1' to ensure that a JWT has
%% been stored, if synchronization is needed.
-spec ensure_worker_present(worker_id(), map()) ->
{ok, supervisor:child()} | {error, term()}.
ensure_worker_present(Id, Config) ->
ChildSpec = jwt_worker_child_spec(Id, Config),
case supervisor:start_child(?MODULE, ChildSpec) of
{ok, Pid} ->
{ok, Pid};
{error, {already_started, Pid}} ->
{ok, Pid};
{error, already_present} ->
supervisor:restart_child(?MODULE, Id)
end.

%% @doc Stops a given JWT worker by its id.
-spec ensure_worker_deleted(worker_id()) -> ok.
ensure_worker_deleted(Id) ->
case supervisor:terminate_child(?MODULE, Id) of
ok ->
_ = supervisor:delete_child(?MODULE, Id),
ok;
{error, not_found} ->
ok
end.

jwt_worker_child_spec(Id, Config) ->
#{
id => Id,
start => {emqx_connector_jwt_worker, start_link, [Config]},
restart => transient,
type => worker,
significant => false,
shutdown => brutal_kill,
modules => [emqx_connector_jwt_worker]
}.

-spec ensure_jwt_table() -> ok.
ensure_jwt_table() ->
case ets:whereis(?JWT_TABLE) of
undefined ->
Opts = [
named_table,
public,
{read_concurrency, true},
ordered_set
],
_ = ets:new(?JWT_TABLE, Opts),
ok;
_ ->
ok
end.

0 comments on commit f827062

Please sign in to comment.