Skip to content

Commit

Permalink
cluster: Avoid oversize alloc for topic properties update`
Browse files Browse the repository at this point in the history
`topic_properties_update` is 456 bytes.

Switch it to from `std::vector` to `chunked_vector`.

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Mar 8, 2024
1 parent d58c9fa commit 9191f39
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ service::do_update_topic_properties(update_topic_properties_request req) {
// local topic frontend instance will eventually dispatch request to _raft0
// core
auto res = co_await _topics_frontend.local().update_topic_properties(
req.updates,
std::move(req).updates,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
cluster::property_update<std::optional<v8_engine::data_policy>>
data_policy;
Expand All @@ -1350,16 +1350,15 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
cluster::incremental_topic_custom_updates custom_properties{
.data_policy = data_policy,
};
updates.push_back(cluster::topic_properties_update{
updates.emplace_back(
model::random_topic_namespace(),
random_incremental_topic_updates(),
custom_properties,
});
custom_properties);
}
cluster::update_topic_properties_request data{
.updates = updates,
.updates = std::move(updates),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::topic_result data{
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/topic_manifest.h"
#include "cluster/topic_recovery_status_frontend.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"

#include <seastar/http/request.hh>
#include <seastar/util/defer.hh>
Expand Down Expand Up @@ -533,7 +534,7 @@ ss::future<> topic_recovery_service::reset_topic_configurations() {
co_return;
}

std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
updates.reserve(_downloaded_manifests->size());
std::transform(
std::make_move_iterator(_downloaded_manifests->begin()),
Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ cluster::errc map_errc(std::error_code ec) {
}

ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
std::vector<topic_properties_update> updates,
topic_properties_update_vector updates,
model::timeout_clock::time_point timeout) {
auto cluster_leader = _leaders.local().get_leader(model::controller_ntp);

Expand Down Expand Up @@ -249,20 +249,23 @@ ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
co_return results;
}

auto updates2 = updates.copy();
co_return co_await _connections.local()
.with_node_client<controller_client_protocol>(
_self,
ss::this_shard_id(),
*cluster_leader,
timeout,
[updates, timeout](controller_client_protocol client) mutable {
[updates{std::move(updates)},
timeout](controller_client_protocol client) mutable {
return client
.update_topic_properties(
update_topic_properties_request{.updates = std::move(updates)},
rpc::client_opts(timeout))
.then(&rpc::get_ctx_data<update_topic_properties_reply>);
})
.then([updates](result<update_topic_properties_reply> r) {
.then([updates{std::move(updates2)}](
result<update_topic_properties_reply> r) {
if (r.has_error()) {
return make_error_topic_results(updates, map_errc(r.error()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class topics_frontend {
std::optional<model::term_id> = std::nullopt);

ss::future<std::vector<topic_result>> update_topic_properties(
std::vector<topic_properties_update>, model::timeout_clock::time_point);
topic_properties_update_vector, model::timeout_clock::time_point);

ss::future<std::vector<topic_result>> create_partitions(
std::vector<create_partitions_configuration>,
Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,8 @@ struct topic_properties_update
}
};

using topic_properties_update_vector = chunked_vector<topic_properties_update>;

// Structure holding topic configuration, optionals will be replaced by broker
// defaults
struct topic_configuration
Expand Down Expand Up @@ -2307,7 +2309,7 @@ struct update_topic_properties_request
update_topic_properties_request,
serde::version<0>,
serde::compat_version<0>> {
std::vector<topic_properties_update> updates;
topic_properties_update_vector updates;

friend std::ostream&
operator<<(std::ostream&, const update_topic_properties_request&);
Expand All @@ -2318,6 +2320,10 @@ struct update_topic_properties_request
= default;

auto serde_fields() { return std::tie(updates); }

update_topic_properties_request copy() const {
return {.updates = updates.copy()};
}
};

struct update_topic_properties_reply
Expand Down Expand Up @@ -5088,8 +5094,7 @@ struct adl<cluster::update_topic_properties_request> {
serialize(out, std::move(r.updates));
}
cluster::update_topic_properties_request from(iobuf_parser& in) {
auto updates
= adl<std::vector<cluster::topic_properties_update>>{}.from(in);
auto updates = adl<cluster::topic_properties_update_vector>{}.from(in);
return {.updates = std::move(updates)};
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/v/compat/check.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,11 @@ void verify_serde_round_trip(T, compat_binary test) {
} \
\
static std::vector<compat_binary> to_binary(Type obj) { \
return compat_binary::serde_and_adl(obj); \
return compat_binary::serde_and_adl(std::move(obj)); \
} \
\
static void check(Type obj, compat_binary test) { \
verify_adl_or_serde(obj, std::move(test)); \
verify_adl_or_serde(std::move(obj), std::move(test)); \
} \
};

Expand Down
2 changes: 1 addition & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ struct instance_generator<cluster::topic_properties_update> {
template<>
struct instance_generator<cluster::update_topic_properties_request> {
static cluster::update_topic_properties_request random() {
return {.updates = tests::random_vector([] {
return {.updates = tests::random_chunked_vector([] {
return instance_generator<
cluster::topic_properties_update>::random();
})};
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ ss::future<std::vector<R>> do_alter_topics_configuration(
error_code::invalid_config,
"duplicated topic {} alter config request"));
}
std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
for (auto& r : boost::make_iterator_range(resources.begin(), valid_end)) {
auto res = f(r);
if (res.has_error()) {
Expand Down

0 comments on commit 9191f39

Please sign in to comment.