Skip to content

Commit

Permalink
Merge pull request #9642 from thalesmg/refactor-buffer-collect-calls-v50
Browse files Browse the repository at this point in the history
feat(buffer_worker): refactor buffer/resource workers to always use queue and use offload mode
  • Loading branch information
thalesmg committed Jan 5, 2023
2 parents 32922a6 + 70eb5ff commit 3437151
Show file tree
Hide file tree
Showing 29 changed files with 1,046 additions and 550 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ define gen-app-ct-target
$1-ct: $(REBAR)
@$(SCRIPTS)/pre-compile.sh $(PROFILE)
@ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \
--readable=$(CT_READABLE) \
--name $(CT_NODE_NAME) \
--cover_export_name $(CT_COVER_EXPORT_PREFIX)-$(subst /,-,$1) \
--suite $(shell $(SCRIPTS)/find-suites.sh $1)
Expand Down
7 changes: 5 additions & 2 deletions apps/emqx/test/emqx_common_test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,11 @@ is_all_tcp_servers_available(Servers) ->
is_tcp_server_available(Host, Port)
end,
case lists:partition(Fun, Servers) of
{_, []} -> true;
{_, Unavail} -> ct:print("Unavailable servers: ~p", [Unavail])
{_, []} ->
true;
{_, Unavail} ->
ct:print("Unavailable servers: ~p", [Unavail]),
false
end.

-spec is_tcp_server_available(
Expand Down
23 changes: 1 addition & 22 deletions apps/emqx_bridge/i18n/emqx_bridge_schema.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,6 @@ emqx_bridge_schema {
}
}

metric_batching {
desc {
en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
zh: """当前积压在内存里,等待批量发送的消息个数"""
}
label: {
en: "Batched"
zh: "等待批量发送"
}
}

metric_dropped {
desc {
en: """Count of messages dropped."""
Expand Down Expand Up @@ -120,16 +109,6 @@ emqx_bridge_schema {
zh: "队列已满被丢弃"
}
}
metric_dropped_queue_not_enabled {
desc {
en: """Count of messages dropped due to the queue is not enabled."""
zh: """因为队列未启用被丢弃的消息个数。"""
}
label: {
en: "Dropped Queue Disabled"
zh: "队列未启用被丢弃"
}
}
metric_dropped_resource_not_found {
desc {
en: """Count of messages dropped due to the resource is not found."""
Expand Down Expand Up @@ -193,7 +172,7 @@ emqx_bridge_schema {
}
}

metric_sent_inflight {
metric_inflight {
desc {
en: """Count of messages that were sent asynchronously but ACKs are not yet received."""
zh: """已异步地发送但没有收到 ACK 的消息个数。"""
Expand Down
10 changes: 1 addition & 9 deletions apps/emqx_bridge/include/emqx_bridge.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

-define(EMPTY_METRICS,
?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
)
).

-define(METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand All @@ -40,11 +38,9 @@
Rcvd
),
#{
'batching' => Batched,
'dropped' => Dropped,
'dropped.other' => DroppedOther,
'dropped.queue_full' => DroppedQueueFull,
'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
Expand All @@ -61,11 +57,9 @@
).

-define(metrics(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand All @@ -80,11 +74,9 @@
Rcvd
),
#{
'batching' := Batched,
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
Expand Down
14 changes: 3 additions & 11 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ info_example_basic(webhook) ->
auto_restart_interval => 15000,
query_mode => async,
async_inflight_window => 100,
enable_queue => false,
max_queue_bytes => 100 * 1024 * 1024
}
};
Expand All @@ -233,7 +232,6 @@ mqtt_main_example() ->
health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync,
enable_queue => false,
max_queue_bytes => 100 * 1024 * 1024
},
ssl => #{
Expand Down Expand Up @@ -634,11 +632,11 @@ aggregate_metrics(AllMetrics) ->
fun(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15
)
) ->
?METRICS(
Expand All @@ -656,9 +654,7 @@ aggregate_metrics(AllMetrics) ->
M12 + N12,
M13 + N13,
M14 + N14,
M15 + N15,
M16 + N16,
M17 + N17
M15 + N15
)
end,
InitMetrics,
Expand Down Expand Up @@ -691,7 +687,6 @@ format_metrics(#{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
Expand All @@ -705,15 +700,12 @@ format_metrics(#{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
Batched = maps:get('batching', Gauges, 0),
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ default_resource_opts() ->
#{
<<"async_inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"enable_queue">> => false,
<<"health_check_interval">> => <<"15s">>,
<<"max_queue_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,
Expand Down
5 changes: 1 addition & 4 deletions apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,9 @@ fields(bridges) ->
] ++ ee_fields_bridges();
fields("metrics") ->
[
{"batching", mk(integer(), #{desc => ?DESC("metric_batching")})},
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
{"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
{"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
{"dropped.queue_not_enabled",
mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})},
{"dropped.resource_not_found",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
{"dropped.resource_stopped",
Expand All @@ -142,7 +139,7 @@ fields("metrics") ->
{"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"retried", mk(integer(), #{desc => ?DESC("metric_retried")})},
{"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
{"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
{"inflight", mk(integer(), #{desc => ?DESC("metric_inflight")})},
{"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
{"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"reconnect_interval">> => <<"1s">>,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"enable_queue">> => true,
<<"query_mode">> => <<"sync">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ When disabled the messages are buffered in RAM only."""

batch_size {
desc {
en: """Maximum batch count."""
zh: """批量请求大小。"""
en: """Maximum batch count. If equal to 1, there's effectively no batching."""
zh: """批量请求大小。如果设为1,则无批处理。"""
}
label {
en: """Batch size"""
Expand Down
4 changes: 1 addition & 3 deletions apps/emqx_resource/include/emqx_resource.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_restart_interval => pos_integer(),
enable_batch => boolean(),
batch_size => pos_integer(),
batch_time => pos_integer(),
enable_queue => boolean(),
max_queue_bytes => pos_integer(),
query_mode => query_mode(),
resume_interval => pos_integer(),
Expand All @@ -90,7 +88,7 @@
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).

%% count
-define(DEFAULT_BATCH_SIZE, 100).
-define(DEFAULT_BATCH_SIZE, 1).

%% milliseconds
-define(DEFAULT_BATCH_TIME, 20).
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'success',
'failed',
'dropped',
'dropped.queue_not_enabled',
'dropped.queue_full',
'dropped.resource_not_found',
'dropped.resource_stopped',
Expand Down
58 changes: 0 additions & 58 deletions apps/emqx_resource/src/emqx_resource_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
]).

-export([
batching_set/3,
batching_shift/3,
batching_get/1,
inflight_set/3,
inflight_get/1,
queuing_set/3,
Expand All @@ -40,9 +37,6 @@
dropped_queue_full_inc/1,
dropped_queue_full_inc/2,
dropped_queue_full_get/1,
dropped_queue_not_enabled_inc/1,
dropped_queue_not_enabled_inc/2,
dropped_queue_not_enabled_get/1,
dropped_resource_not_found_inc/1,
dropped_resource_not_found_inc/2,
dropped_resource_not_found_get/1,
Expand Down Expand Up @@ -80,10 +74,8 @@ events() ->
[
[?TELEMETRY_PREFIX, Event]
|| Event <- [
batching,
dropped_other,
dropped_queue_full,
dropped_queue_not_enabled,
dropped_resource_not_found,
dropped_resource_stopped,
failed,
Expand Down Expand Up @@ -125,9 +117,6 @@ handle_telemetry_event(
dropped_queue_full ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
dropped_queue_not_enabled ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val);
dropped_resource_not_found ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val);
Expand Down Expand Up @@ -160,54 +149,19 @@ handle_telemetry_event(
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
inflight ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
queuing ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
_ ->
ok
end;
handle_telemetry_event(
[?TELEMETRY_PREFIX, Event],
_Measurements = #{gauge_shift := Val},
_Metadata = #{resource_id := ID, worker_id := WorkerID},
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
_ ->
ok
end;
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
ok.

%% Gauges (value can go both up and down):
%% --------------------------------------

%% @doc Count of messages that are currently accumulated in memory waiting for
%% being sent in one batch
batching_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).

batching_shift(_ID, _WorkerID = undefined, _Val) ->
ok;
batching_shift(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_shift => Val},
#{resource_id => ID, worker_id => WorkerID}
).

batching_get(ID) ->
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching').

%% @doc Count of batches of messages that are currently
%% queuing. [Gauge]
queuing_set(ID, WorkerID, Val) ->
Expand Down Expand Up @@ -269,18 +223,6 @@ dropped_queue_full_inc(ID, Val) ->
dropped_queue_full_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').

%% @doc Count of messages dropped because the queue was not enabled
dropped_queue_not_enabled_inc(ID) ->
dropped_queue_not_enabled_inc(ID, 1).

dropped_queue_not_enabled_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{
resource_id => ID
}).

dropped_queue_not_enabled_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').

%% @doc Count of messages dropped because the resource was not found
dropped_resource_not_found_inc(ID) ->
dropped_resource_not_found_inc(ID, 1).
Expand Down

0 comments on commit 3437151

Please sign in to comment.