Skip to content

Commit

Permalink
feat(gcp_pubsub_bridges): make service account json a binary
Browse files Browse the repository at this point in the history
Fixes https://emqx.atlassian.net/browse/EMQX-11384

Today, service_account_json config field is an embedded object (map()).

This requires user to embed a JSON object into the config file instead of embedding it as
a string.

We should support binary() type as input, but keep supporting map() for backward
compatibility.
  • Loading branch information
thalesmg committed Feb 23, 2024
1 parent 5af01c0 commit a6ddbc5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 35 deletions.
75 changes: 59 additions & 16 deletions apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl
Expand Up @@ -21,12 +21,11 @@
service_account_json_converter/2
]).

-export([upgrade_raw_conf/1]).

%% emqx_bridge_enterprise "unofficial" API
-export([conn_bridge_examples/1]).

-type service_account_json() :: map().
-reflect_type([service_account_json/0]).

-define(DEFAULT_PIPELINE_SIZE, 100).

%%-------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -101,7 +100,7 @@ fields(connector_config) ->
)},
{service_account_json,
sc(
?MODULE:service_account_json(),
binary(),
#{
required => true,
validator => fun ?MODULE:service_account_json_validator/1,
Expand Down Expand Up @@ -354,6 +353,22 @@ values(consumer, _Method) ->
}
}.

upgrade_raw_conf(RawConf0) ->
lists:foldl(
fun(Path, Acc) ->
deep_update(
Path,
fun ensure_binary_service_account_json/1,
Acc
)
end,
RawConf0,
[
[<<"connectors">>, <<"gcp_pubsub_producer">>],
[<<"connectors">>, <<"gcp_pubsub_consumer">>]
]
).

%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
Expand All @@ -371,11 +386,12 @@ type_field_consumer() ->
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

-spec service_account_json_validator(map()) ->
-spec service_account_json_validator(binary()) ->
ok
| {error, {wrong_type, term()}}
| {error, {missing_keys, [binary()]}}.
service_account_json_validator(Map) ->
service_account_json_validator(Val) ->
Map = emqx_utils_json:decode(Val, [return_maps]),
ExpectedKeys = [
<<"type">>,
<<"project_id">>,
Expand All @@ -399,18 +415,19 @@ service_account_json_validator(Map) ->
end.

service_account_json_converter(Val, #{make_serializable := true}) ->
Val;
case is_map(Val) of
true -> emqx_utils_json:encode(Val);
false -> Val
end;
service_account_json_converter(Map, _Opts) when is_map(Map) ->
ExpectedKeys = [
<<"type">>,
<<"project_id">>,
<<"private_key_id">>,
<<"private_key">>,
<<"client_email">>
],
maps:with(ExpectedKeys, Map);
emqx_utils_json:encode(Map);
service_account_json_converter(Val, _Opts) ->
Val.
case emqx_utils_json:safe_decode(Val, [return_maps]) of
{ok, Str} when is_binary(Str) ->
emqx_utils_json:decode(Str, [return_maps]);
_ ->
Val
end.

consumer_topic_mapping_validator(_TopicMapping = []) ->
{error, "There must be at least one GCP PubSub-MQTT topic mapping"};
Expand All @@ -425,3 +442,29 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
false ->
{error, "GCP PubSub topics must not be repeated in a bridge"}
end.

deep_update(Path, Fun, Map) ->
case emqx_utils_maps:deep_get(Path, Map, #{}) of
M when map_size(M) > 0 ->
NewM = Fun(M),
emqx_utils_maps:deep_put(Path, Map, NewM);
_ ->
Map
end.

ensure_binary_service_account_json(Connectors) ->
maps:map(
fun(_Name, Conf) ->
maps:update_with(
<<"service_account_json">>,
fun(JSON) ->
case is_map(JSON) of
true -> emqx_utils_json:encode(JSON);
false -> JSON
end
end,
Conf
)
end,
Connectors
).
Expand Up @@ -25,7 +25,7 @@

-export([get_jwt_authorization_header/1]).

-type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
-type service_account_json() :: map().
-type project_id() :: binary().
-type duration() :: non_neg_integer().
-type config() :: #{
Expand Down
Expand Up @@ -94,8 +94,9 @@ query_mode(_Config) -> no_queries.
-spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, term()}.
on_start(ConnectorResId, Config0) ->
%% ensure it's a binary key map
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
Config = maps:update_with(
service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0
),
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of
{ok, Client} ->
Expand Down
Expand Up @@ -73,7 +73,9 @@ on_start(InstanceId, Config0) ->
msg => "starting_gcp_pubsub_bridge",
instance_id => InstanceId
}),
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
Config = maps:update_with(
service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0
),
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
{ok, Client} ->
Expand Down
Expand Up @@ -110,6 +110,23 @@ source_config(Overrides0) ->
},
maps:merge(CommonConfig, Overrides).

assert_persisted_service_account_json_is_binary(ConnectorName) ->
%% ensure cluster.hocon has a binary encoded json string as the value
{ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]),
?assertMatch(
Bin when is_binary(Bin),
emqx_utils_maps:deep_get(
[
<<"connectors">>,
<<"gcp_pubsub_consumer">>,
ConnectorName,
<<"service_account_json">>
],
Hocon
)
),
ok.

%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
Expand All @@ -122,6 +139,27 @@ t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.

t_create_via_http_json_object_service_account(Config0) ->
%% After the config goes through the roundtrip with `hocon_tconf:check_plain', service
%% account json comes back as a binary even if the input is a json object.
ConnectorName = ?config(connector_name, Config0),
ConnConfig0 = ?config(connector_config, Config0),
Config1 = proplists:delete(connector_config, Config0),
ConnConfig1 = maps:update_with(
<<"service_account_json">>,
fun(X) ->
?assert(is_binary(X), #{json => X}),
JSON = emqx_utils_json:decode(X, [return_maps]),
?assert(is_map(JSON)),
JSON
end,
ConnConfig0
),
Config = [{connector_config, ConnConfig1} | Config1],
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
assert_persisted_service_account_json_is_binary(ConnectorName),
ok.

t_consume(Config) ->
Topic = ?config(pubsub_topic, Config),
Payload = #{<<"key">> => <<"value">>},
Expand Down
14 changes: 0 additions & 14 deletions apps/emqx_conf/src/emqx_conf_schema_types.erl
Expand Up @@ -261,20 +261,6 @@ readable("comma_separated_atoms()") ->
dashboard => #{type => comma_separated_string},
docgen => #{type => "String", example => <<"item1,item2">>}
};
readable("service_account_json()") ->
%% This is a bit special,
%% service_account_josn in swagger spec is an object
%% the same in documenation.
%% However, dashboard wish it to be a string
%% TODO:
%% - Change type definition to stirng().
%% - Convert the embedded object to a escaped JSON string.
%% - Delete this function clause once the above is done.
#{
swagger => #{type => object},
dashboard => #{type => string},
docgen => #{type => "Map"}
};
readable("json_binary()") ->
#{
swagger => #{type => string, example => <<"{\"a\": [1,true]}">>},
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_enterprise/src/emqx_enterprise_schema.erl
Expand Up @@ -19,7 +19,8 @@
]).

%% Callback to upgrade config after loaded from config file but before validation.
upgrade_raw_conf(RawConf) ->
upgrade_raw_conf(RawConf0) ->
RawConf = emqx_bridge_gcp_pubsub:upgrade_raw_conf(RawConf0),
emqx_conf_schema:upgrade_raw_conf(RawConf).

namespace() ->
Expand Down
1 change: 1 addition & 0 deletions changes/ee/feat-12577.en.md
@@ -0,0 +1 @@
Changed the type of `service_account_json` of both GCP PubSub Producer and Consumer connectors to a string. Now, it's possible to set this field to a JSON-encoded string. Using the previous format (a HOCON map) is still supported but not encouraged.

0 comments on commit a6ddbc5

Please sign in to comment.