-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement azure blob storage action #13069
feat: implement azure blob storage action #13069
Conversation
f5c3cc2
to
acb4c32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
desc => ?DESC("aggregation_max_records") | ||
} | ||
)}, | ||
{max_block_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting it here feels odd, because AFAICS it doesn't affect the aggregation itself, only the upload process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Moved to action parameters. ✔️
case BufferSize > MaxBlockSize of | ||
true -> | ||
{IOData, NewBuffer} = take_chunk(Buffer, MaxBlockSize), | ||
?tp(azure_blob_storage_will_write_chunk, #{}), | ||
do_process_write(IOData, NewBuffer, TransferState0); | ||
false -> | ||
NewBuffer = [], | ||
do_process_write(Buffer, NewBuffer, TransferState0) | ||
end. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently having this append / write split backfires slightly. 🫠
Honestly both of those things feel suboptimal:
- Call Azure API on every append, which is usually on the scale of 1-10s of KiBs assuming moderate event rate.
- Cut >4GiB large buffer somewhere close to its end, which will likely cause 2-3x of that in heap memory consumption. This most likely will never happen, but still.
I wonder what if:
- Introduce
min_block_size
config parameter, with something like 10 MiB as the default. - During append: append as usual, but put the buffer under some
next_block
map entry if its size is larger thatmin_block_size
(i.e. minimize latency impact / overhead / cost (?) of too frequent API requests). - However, if it's to become larger than
max_block_size
does not append but instead put existing buffer undernext_block
key (i.e. avoid cutting iolists). - During write, write
next_block
if it's defined.
On more thing: currently, having a single append larger than 4 GiB is impossible I think, because of this. Admittedly, it's sort of an implicitly defined limit, but I'd argue still could be relied upon to get rid of complexity of "cut off exactly this much of this iolist()
".
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) -> | ||
{ok, format_timestamp(Since, Format)}; | ||
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) -> | ||
{ok, format_timestamp(Until, Format)}; | ||
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) -> | ||
{ok, Seq}; | ||
lookup_buffer_var([<<"node">>], #{}) -> | ||
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; | ||
lookup_buffer_var(_Binding, _Context) -> | ||
{error, undefined}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a good time to introduce something like a small emqx_connector_aggreg_buffer_ctx
module, that implements emqx_template
behaviour for #buffer{}
s, and put this stuff (except for <<"node">>
) there? Though we'd have to keep mk_fs_safe_string/1
on this level. Not the prettiest solution, but could help minimize code duplication and the need for that weird buffer_map()
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔️
acb4c32
to
35c16ff
Compare
48c4e20
to
c8ae506
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! A couple of nits.
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl
Outdated
Show resolved
Hide resolved
]; | ||
fields(common_action_parameters) -> | ||
[ | ||
{max_block_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Is there anything this parameter may affect for direct uploads? The idea is to prematurely drop the event on the floor if it's for some reason >4GiB in size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, currently we don't check it, but this limit indeed is Azure's maximum upload per block.
(More precisely, for a single "Put Block Blob" call, it's possible to send 5000 MiB, and each "Put Blob" is 4000 MiB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check and test here: a92eb3c
container() -> | ||
{container, | ||
hoconsc:mk( | ||
%% TODO: Support selectors once there are more than one container. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment was just transported from its previous location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. also the product of a refactoring
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl
Show resolved
Hide resolved
a92eb3c
to
efa4432
Compare
Fixes https://emqx.atlassian.net/browse/EMQX-12280
Release version: e5.8
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update