Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(buffer_worker): refactor buffer/resource workers to always use queue and use offload mode #9642

Merged
merged 8 commits into from
Jan 5, 2023
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ define gen-app-ct-target
$1-ct: $(REBAR)
@$(SCRIPTS)/pre-compile.sh $(PROFILE)
@ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \
--readable=$(CT_READABLE) \
--name $(CT_NODE_NAME) \
--cover_export_name $(CT_COVER_EXPORT_PREFIX)-$(subst /,-,$1) \
--suite $(shell $(SCRIPTS)/find-suites.sh $1)
Expand Down
7 changes: 5 additions & 2 deletions apps/emqx/test/emqx_common_test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,11 @@ is_all_tcp_servers_available(Servers) ->
is_tcp_server_available(Host, Port)
end,
case lists:partition(Fun, Servers) of
{_, []} -> true;
{_, Unavail} -> ct:print("Unavailable servers: ~p", [Unavail])
{_, []} ->
true;
{_, Unavail} ->
ct:print("Unavailable servers: ~p", [Unavail]),
false
end.

-spec is_tcp_server_available(
Expand Down
23 changes: 1 addition & 22 deletions apps/emqx_bridge/i18n/emqx_bridge_schema.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,6 @@ emqx_bridge_schema {
}
}

metric_batching {
desc {
en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
zh: """当前积压在内存里,等待批量发送的消息个数"""
}
label: {
en: "Batched"
zh: "等待批量发送"
}
}

metric_dropped {
desc {
en: """Count of messages dropped."""
Expand Down Expand Up @@ -120,16 +109,6 @@ emqx_bridge_schema {
zh: "队列已满被丢弃"
}
}
metric_dropped_queue_not_enabled {
desc {
en: """Count of messages dropped due to the queue is not enabled."""
zh: """因为队列未启用被丢弃的消息个数。"""
}
label: {
en: "Dropped Queue Disabled"
zh: "队列未启用被丢弃"
}
}
metric_dropped_resource_not_found {
desc {
en: """Count of messages dropped due to the resource is not found."""
Expand Down Expand Up @@ -193,7 +172,7 @@ emqx_bridge_schema {
}
}

metric_sent_inflight {
metric_inflight {
desc {
en: """Count of messages that were sent asynchronously but ACKs are not yet received."""
zh: """已异步地发送但没有收到 ACK 的消息个数。"""
Expand Down
10 changes: 1 addition & 9 deletions apps/emqx_bridge/include/emqx_bridge.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

-define(EMPTY_METRICS,
?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
)
).

-define(METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand All @@ -40,11 +38,9 @@
Rcvd
),
#{
'batching' => Batched,
'dropped' => Dropped,
'dropped.other' => DroppedOther,
'dropped.queue_full' => DroppedQueueFull,
'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
Expand All @@ -61,11 +57,9 @@
).

-define(metrics(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand All @@ -80,11 +74,9 @@
Rcvd
),
#{
'batching' := Batched,
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
Expand Down
14 changes: 3 additions & 11 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ info_example_basic(webhook) ->
auto_restart_interval => 15000,
query_mode => async,
async_inflight_window => 100,
enable_queue => false,
max_queue_bytes => 100 * 1024 * 1024
}
};
Expand All @@ -233,7 +232,6 @@ mqtt_main_example() ->
health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync,
enable_queue => false,
max_queue_bytes => 100 * 1024 * 1024
},
ssl => #{
Expand Down Expand Up @@ -634,11 +632,11 @@ aggregate_metrics(AllMetrics) ->
fun(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15
)
) ->
?METRICS(
Expand All @@ -656,9 +654,7 @@ aggregate_metrics(AllMetrics) ->
M12 + N12,
M13 + N13,
M14 + N14,
M15 + N15,
M16 + N16,
M17 + N17
M15 + N15
)
end,
InitMetrics,
Expand Down Expand Up @@ -691,7 +687,6 @@ format_metrics(#{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
Expand All @@ -705,15 +700,12 @@ format_metrics(#{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
Batched = maps:get('batching', Gauges, 0),
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ default_resource_opts() ->
#{
<<"async_inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"enable_queue">> => false,
<<"health_check_interval">> => <<"15s">>,
<<"max_queue_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,
Expand Down
5 changes: 1 addition & 4 deletions apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,9 @@ fields(bridges) ->
] ++ ee_fields_bridges();
fields("metrics") ->
[
{"batching", mk(integer(), #{desc => ?DESC("metric_batching")})},
zmstone marked this conversation as resolved.
Show resolved Hide resolved
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
{"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
{"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
{"dropped.queue_not_enabled",
mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})},
{"dropped.resource_not_found",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
{"dropped.resource_stopped",
Expand All @@ -142,7 +139,7 @@ fields("metrics") ->
{"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"retried", mk(integer(), #{desc => ?DESC("metric_retried")})},
{"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
{"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
{"inflight", mk(integer(), #{desc => ?DESC("metric_inflight")})},
{"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
{"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"reconnect_interval">> => <<"1s">>,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"enable_queue">> => true,
<<"query_mode">> => <<"sync">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ When disabled the messages are buffered in RAM only."""

batch_size {
desc {
en: """Maximum batch count."""
zh: """批量请求大小。"""
en: """Maximum batch count. If equal to 1, there's effectively no batching."""
zh: """批量请求大小。如果设为1,则无批处理。"""
}
label {
en: """Batch size"""
Expand Down
4 changes: 1 addition & 3 deletions apps/emqx_resource/include/emqx_resource.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_restart_interval => pos_integer(),
enable_batch => boolean(),
batch_size => pos_integer(),
batch_time => pos_integer(),
enable_queue => boolean(),
max_queue_bytes => pos_integer(),
query_mode => query_mode(),
resume_interval => pos_integer(),
Expand All @@ -90,7 +88,7 @@
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).

%% count
-define(DEFAULT_BATCH_SIZE, 100).
-define(DEFAULT_BATCH_SIZE, 1).

%% milliseconds
-define(DEFAULT_BATCH_TIME, 20).
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'success',
'failed',
'dropped',
'dropped.queue_not_enabled',
'dropped.queue_full',
'dropped.resource_not_found',
'dropped.resource_stopped',
Expand Down
58 changes: 0 additions & 58 deletions apps/emqx_resource/src/emqx_resource_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
]).

-export([
batching_set/3,
batching_shift/3,
batching_get/1,
inflight_set/3,
inflight_get/1,
queuing_set/3,
Expand All @@ -40,9 +37,6 @@
dropped_queue_full_inc/1,
dropped_queue_full_inc/2,
dropped_queue_full_get/1,
dropped_queue_not_enabled_inc/1,
dropped_queue_not_enabled_inc/2,
dropped_queue_not_enabled_get/1,
dropped_resource_not_found_inc/1,
dropped_resource_not_found_inc/2,
dropped_resource_not_found_get/1,
Expand Down Expand Up @@ -80,10 +74,8 @@ events() ->
[
[?TELEMETRY_PREFIX, Event]
|| Event <- [
batching,
dropped_other,
dropped_queue_full,
dropped_queue_not_enabled,
dropped_resource_not_found,
dropped_resource_stopped,
failed,
Expand Down Expand Up @@ -125,9 +117,6 @@ handle_telemetry_event(
dropped_queue_full ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
dropped_queue_not_enabled ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val);
dropped_resource_not_found ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val);
Expand Down Expand Up @@ -160,54 +149,19 @@ handle_telemetry_event(
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
inflight ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
queuing ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
_ ->
ok
end;
handle_telemetry_event(
[?TELEMETRY_PREFIX, Event],
_Measurements = #{gauge_shift := Val},
_Metadata = #{resource_id := ID, worker_id := WorkerID},
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
_ ->
ok
end;
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
ok.

%% Gauges (value can go both up and down):
%% --------------------------------------

%% @doc Count of messages that are currently accumulated in memory waiting for
%% being sent in one batch
batching_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).

batching_shift(_ID, _WorkerID = undefined, _Val) ->
ok;
batching_shift(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_shift => Val},
#{resource_id => ID, worker_id => WorkerID}
).

batching_get(ID) ->
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching').

%% @doc Count of batches of messages that are currently
%% queuing. [Gauge]
queuing_set(ID, WorkerID, Val) ->
Expand Down Expand Up @@ -269,18 +223,6 @@ dropped_queue_full_inc(ID, Val) ->
dropped_queue_full_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').

%% @doc Count of messages dropped because the queue was not enabled
dropped_queue_not_enabled_inc(ID) ->
dropped_queue_not_enabled_inc(ID, 1).

dropped_queue_not_enabled_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{
resource_id => ID
}).

dropped_queue_not_enabled_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').

%% @doc Count of messages dropped because the resource was not found
dropped_resource_not_found_inc(ID) ->
dropped_resource_not_found_inc(ID, 1).
Expand Down