Skip to content

Commit

Permalink
Merge pull request #9979 from savonarola/file-transfer-add-tests
Browse files Browse the repository at this point in the history
feat(ft): add API tests
  • Loading branch information
savonarola committed Feb 16, 2023
2 parents 9818243 + a40787e commit 7643865
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 37 deletions.
14 changes: 14 additions & 0 deletions apps/emqx/include/asserts.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,17 @@
end
)
).

-define(assertInclude(Pattern, List),
?assert(
lists:any(
fun(El) ->
case El of
Pattern -> true;
_ -> false
end
end,
List
)
)
).
1 change: 1 addition & 0 deletions apps/emqx/src/emqx_cm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Session1 = emqx_persistent_session:persist(
ClientInfo, ConnInfo, Session
),
register_channel(ClientId, Self, ConnInfo),
{ok, #{
session => Session1,
present => true,
Expand Down
3 changes: 1 addition & 2 deletions apps/emqx/test/emqx_common_test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
undefined ->
ok;
_ ->
Res = rpc:call(Node, ekka, join, [JoinTo]),
case Res of
case rpc:call(Node, ekka, join, [JoinTo]) of
ok ->
ok;
ignore ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_conf/src/emqx_conf.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.12"},
{vsn, "0.1.13"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]},
Expand Down
Empty file added apps/emqx_ft/docker-ct
Empty file.
13 changes: 12 additions & 1 deletion apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
emqx_ft_schema {

local {
storage {
desc {
en: "Storage settings for file transfer."
zh: "文件传输的存储设置。"
}
label: {
en: "Storage settings"
zh: "存储设置"
}
}

local_type {
desc {
en: "Use local file system to store uploaded files and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
Expand Down
16 changes: 8 additions & 8 deletions apps/emqx_ft/src/emqx_ft.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ on_init(PacketId, Msg, Transfer) ->
ok ->
emqx_ft_responder:ack(PacketKey, ok);
% Storage operation started, packet will be acked by the responder
{async, Pid} ->
ok = emqx_ft_responder:kickoff(PacketKey, Pid),
ok;
% {async, Pid} ->
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
% ok;
%% Storage operation failed, ack through the responder
{error, _} = Error ->
emqx_ft_responder:ack(PacketKey, Error)
Expand Down Expand Up @@ -227,9 +227,9 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
case store_segment(Transfer, Segment) of
ok ->
emqx_ft_responder:ack(PacketKey, ok);
{async, Pid} ->
ok = emqx_ft_responder:kickoff(PacketKey, Pid),
ok;
% {async, Pid} ->
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
% ok;
{error, _} = Error ->
emqx_ft_responder:ack(PacketKey, Error)
end
Expand All @@ -251,8 +251,8 @@ on_fin(PacketId, Msg, Transfer, Checksum) ->
with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() ->
case assemble(Transfer) of
%% Assembling completed, ack through the responder right away
ok ->
emqx_ft_responder:ack(FinPacketKey, ok);
% ok ->
% emqx_ft_responder:ack(FinPacketKey, ok);
%% Assembling started, packet will be acked by the responder
{async, Pid} ->
ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx_ft/src/emqx_ft_responder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
%% API
%% -------------------------------------------------------------------

-spec start(key(), timeout(), respfun()) -> startlink_ret().
-spec start(key(), respfun(), timeout()) -> startlink_ret().
start(Key, RespFun, Timeout) ->
emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).

Expand Down Expand Up @@ -75,7 +75,7 @@ handle_call({kickoff, Pid}, _From, St) ->
{reply, ok, St};
handle_call({ack, Result}, _From, {Key, RespFun}) ->
Ret = apply(RespFun, [{ack, Result}]),
?tp(ft_responder_ack, #{key => Key, result => Result, return => Ret}),
?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}),
{stop, {shutdown, Ret}, Ret, undefined};
handle_call(Msg, _From, State) ->
?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}),
Expand All @@ -87,11 +87,11 @@ handle_cast(Msg, State) ->

handle_info(timeout, {Key, RespFun}) ->
Ret = apply(RespFun, [timeout]),
?tp(ft_responder_timeout, #{key => Key, return => Ret}),
?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}),
{stop, {shutdown, Ret}, undefined};
handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) ->
Ret = apply(RespFun, [{down, map_down_reason(Reason)}]),
?tp(ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
{stop, {shutdown, Ret}, undefined};
handle_info(Msg, State) ->
?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
Expand All @@ -101,7 +101,7 @@ terminate(_Reason, undefined) ->
ok;
terminate(Reason, {Key, RespFun}) ->
Ret = apply(RespFun, [timeout]),
?tp(ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
ok.

map_down_reason(normal) ->
Expand Down
12 changes: 9 additions & 3 deletions apps/emqx_ft/src/emqx_ft_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").

-export([namespace/0, roots/0, fields/1, tags/0]).
-export([namespace/0, roots/0, fields/1, tags/0, desc/1]).

-export([schema/1]).

Expand Down Expand Up @@ -49,7 +49,8 @@ fields(file_transfer) ->
{storage, #{
type => hoconsc:union([
hoconsc:ref(?MODULE, local_storage)
])
]),
desc => ?DESC("storage")
}}
];
fields(local_storage) ->
Expand All @@ -58,7 +59,7 @@ fields(local_storage) ->
type => local,
default => local,
required => false,
desc => ?DESC("local")
desc => ?DESC("local_type")
}},
{root, #{
type => binary(),
Expand All @@ -67,6 +68,11 @@ fields(local_storage) ->
}}
].

desc(file_transfer) ->
"File transfer settings";
desc(local_storage) ->
"File transfer local storage settings".

schema(filemeta) ->
#{
roots => [
Expand Down
3 changes: 1 addition & 2 deletions apps/emqx_ft/src/emqx_ft_storage_fs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").

%% TODO
-type storage() :: emqx_config:config().
-type storage() :: emqx_ft_storage:storage().

%% Store manifest in the backing filesystem.
%% Atomic operation.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_ft/test/emqx_ft_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ end_per_testcase(_Case, Config) ->
ok.

init_per_group(cluster, Config) ->
Node = emqx_ft_test_helpers:start_additional_node(Config, test2),
Node = emqx_ft_test_helpers:start_additional_node(Config, emqx_ft1),
[{additional_node, Node} | Config];
init_per_group(_Group, Config) ->
Config.
Expand Down
165 changes: 165 additions & 0 deletions apps/emqx_ft/test/emqx_ft_api_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_ft_api_SUITE).

-compile(export_all).
-compile(nowarn_export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").

-include_lib("emqx/include/asserts.hrl").

-import(emqx_mgmt_api_test_util, [request/3, uri/1]).

all() -> emqx_common_test_helpers:all(?MODULE).

init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_ft], set_special_configs(Config)
),
ok = emqx_common_test_helpers:set_gen_rpc_stateless(),
Config.
end_per_suite(_Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
ok.

set_special_configs(Config) ->
fun
(emqx_ft) ->
ok = emqx_config:put([file_transfer, storage], #{
type => local, root => emqx_ft_test_helpers:ft_root(Config, node())
});
(_) ->
ok
end.

init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
ok.

%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------

t_list_ready_transfers(Config) ->
ClientId = client_id(Config),

ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),

{ok, 200, Response} = request(get, uri(["file_transfer", "files"])),

#{<<"files">> := Files} = emqx_json:decode(Response, [return_maps]),

?assertInclude(
#{<<"id">> := #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}},
Files
).

%% This shouldn't happen in real life
%% but we need to test it anyway
t_list_ready_transfers_no_nodes(_Config) ->
_ = meck:new(mria_mnesia, [passthrough]),
_ = meck:expect(mria_mnesia, running_nodes, fun() -> [] end),

?assertMatch(
{ok, 503, _},
request(get, uri(["file_transfer", "files"]))
).

t_download_transfer(Config) ->
ClientId = client_id(Config),

ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),

?assertMatch(
{ok, 503, _},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>
})
)
),

?assertMatch(
{ok, 503, _},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>,
node => <<"nonode@nohost">>
})
)
),

?assertMatch(
{ok, 404, _},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"unknown_file">>,
node => node()
})
)
),

{ok, 200, Response} = request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>,
node => node()
})
),

?assertEqual(
Response,
<<"data">>
).

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------

client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8).

request(Method, Url) ->
request(Method, Url, []).

query(Params) ->
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
"?" ++ string:join(KVs, "&").

uri_encode(T) ->
emqx_http_lib:uri_encode(to_list(T)).

to_list(A) when is_atom(A) ->
atom_to_list(A);
to_list(B) when is_binary(B) ->
binary_to_list(B);
to_list(L) when is_list(L) ->
L.
14 changes: 2 additions & 12 deletions apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").

-define(assertInclude(Pattern, List),
?assert(
lists:any(
fun
(Pattern) -> true;
(_) -> false
end,
List
)
)
).
-include_lib("emqx/include/asserts.hrl").

all() ->
[
Expand Down Expand Up @@ -72,7 +62,7 @@ end_per_testcase(_Case, _Config) ->
ok.

init_per_group(cluster, Config) ->
Node = emqx_ft_test_helpers:start_additional_node(Config, test2),
Node = emqx_ft_test_helpers:start_additional_node(Config, emqx_ft_storage_fs1),
[{additional_node, Node} | Config];
init_per_group(_Group, Config) ->
Config.
Expand Down

0 comments on commit 7643865

Please sign in to comment.