Skip to content

Commit

Permalink
cluster: Avoid oversize alloc for topic configuration
Browse files Browse the repository at this point in the history
`topic_configuration` is 392 bytes
`custom_assignable_topic_configuration`	is 416 bytes

Switch vectors of them to `chunked_vector`

Fixes redpanda-data#16758

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Mar 8, 2024
1 parent 99247c5 commit d58c9fa
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/topic_recovery_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ FIXTURE_TEST(recovery_with_existing_topic, fixture) {
{model::ns{"kafka"}, model::topic{"test"}, 1, 1}}}};
auto topic_create_result = app.controller->get_topics_frontend()
.local()
.create_topics(topic_cfg, model::no_timeout)
.create_topics(
std::move(topic_cfg), model::no_timeout)
.get();
wait_for_topics(std::move(topic_create_result)).get();
set_expectations_and_listen(
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/controller_api_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ FIXTURE_TEST(test_querying_ntp_status, cluster_test_fixture) {
leader->controller->get_topics_frontend()
.local()
.create_topics(
cluster::without_custom_assignments(topics),
cluster::without_custom_assignments(std::move(topics)),
1s + model::timeout_clock::now())
.get();

Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1703,13 +1703,13 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
topics.push_back(random_topic_configuration());
}
cluster::create_topics_request data{
.topics = topics,
.topics = std::move(topics),
.timeout = random_timeout_clock_duration(),
};
// adl encoding for topic_configuration doesn't encode/decode to exact
// equality, but also already existed prior to serde support being added
// so only testing the serde case.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
auto data = random_partition_metadata();
Expand Down Expand Up @@ -1743,7 +1743,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
// adl serialization doesn't preserve equality for topic_configuration.
// serde serialization does and was added after support for adl so adl
// semantics are preserved.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
raft::transfer_leadership_request data{
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ topic_recovery_service::create_topics(const recovery_request& request) {
[&request](const auto& m) { return make_topic_config(m, request); });

co_return co_await _topics_frontend.local().autocreate_topics(
topic_configs, config::shard_local_cfg().create_topic_timeout_ms());
std::move(topic_configs),
config::shard_local_cfg().create_topic_timeout_ms());
}

ss::future<std::vector<cloud_storage::topic_manifest>>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ topics_frontend::dispatch_create_to_leader(
ss::this_shard_id(),
leader,
timeout,
[topics, timeout](controller_client_protocol cp) mutable {
[topics{topics.copy()},
timeout](controller_client_protocol cp) mutable {
return cp.create_topics(
create_topics_request{
.topics = std::move(topics), .timeout = timeout},
Expand Down
12 changes: 10 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,7 @@ struct topic_configuration
= default;
};

using topic_configuration_vector = std::vector<topic_configuration>;
using topic_configuration_vector = chunked_vector<topic_configuration>;

struct custom_partition_assignment {
model::partition_id id;
Expand Down Expand Up @@ -1972,7 +1972,7 @@ struct custom_assignable_topic_configuration {
};

using custom_assignable_topic_configuration_vector
= std::vector<custom_assignable_topic_configuration>;
= chunked_vector<custom_assignable_topic_configuration>;

struct create_partitions_configuration
: serde::envelope<
Expand Down Expand Up @@ -2188,6 +2188,10 @@ struct create_topics_request
operator<<(std::ostream&, const create_topics_request&);

auto serde_fields() { return std::tie(topics, timeout); }

create_topics_request copy() const {
return {.topics = topics.copy(), .timeout = timeout};
}
};

struct create_topics_reply
Expand Down Expand Up @@ -2215,6 +2219,10 @@ struct create_topics_reply
friend std::ostream& operator<<(std::ostream&, const create_topics_reply&);

auto serde_fields() { return std::tie(results, metadata, configs); }

create_topics_reply copy() const {
return {results, metadata, configs.copy()};
}
};

struct purged_topic_request
Expand Down
12 changes: 7 additions & 5 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ struct compat_check<cluster::topic_configuration> {
static constexpr std::string_view name = "cluster::topic_configuration";

static cluster::topic_configuration_vector create_test_cases() {
return generate_instances<cluster::topic_configuration>();
auto i = generate_instances<cluster::topic_configuration>();
return {
std::make_move_iterator(i.begin()), std::make_move_iterator(i.end())};
}

static void to_json(
Expand Down Expand Up @@ -521,12 +523,12 @@ struct compat_check<cluster::create_topics_request> {

static std::vector<compat_binary>
to_binary(cluster::create_topics_request obj) {
return compat_binary::serde_and_adl(obj);
return compat_binary::serde_and_adl(std::move(obj));
}

static void check(cluster::create_topics_request obj, compat_binary test) {
if (test.name == "serde") {
verify_serde_only(obj, test);
verify_serde_only(obj.copy(), test);
return;
}
vassert(test.name == "adl", "Unknown compat_binary format encounterd");
Expand Down Expand Up @@ -586,12 +588,12 @@ struct compat_check<cluster::create_topics_reply> {

static std::vector<compat_binary>
to_binary(cluster::create_topics_reply obj) {
return compat_binary::serde_and_adl(obj);
return compat_binary::serde_and_adl(std::move(obj));
}

static void check(cluster::create_topics_reply obj, compat_binary test) {
if (test.name == "serde") {
verify_serde_only(obj, test);
verify_serde_only(obj.copy(), test);
return;
}
vassert(test.name == "adl", "Unknown compat_binary format encounterd");
Expand Down
4 changes: 2 additions & 2 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ template<>
struct instance_generator<cluster::create_topics_request> {
static cluster::create_topics_request random() {
return {
.topics = tests::random_vector(
.topics = tests::random_chunked_vector(
[] {
return instance_generator<
cluster::topic_configuration>::random();
Expand All @@ -689,7 +689,7 @@ struct instance_generator<cluster::create_topics_reply> {
[] { return instance_generator<cluster::topic_result>::random(); }),
tests::random_vector(
[] { return instance_generator<model::topic_metadata>::random(); }),
tests::random_vector([] {
tests::random_chunked_vector([] {
return instance_generator<cluster::topic_configuration>::random();
})};
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/topics/topic_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ requires(KafkaApiTypeIter it) {
}
// clang-format on
auto to_cluster_type(KafkaApiTypeIter begin, KafkaApiTypeIter end)
-> std::vector<decltype(to_cluster_type(*begin))> {
std::vector<decltype(to_cluster_type(*begin))> cluster_types;
-> chunked_vector<decltype(to_cluster_type(*begin))> {
chunked_vector<decltype(to_cluster_type(*begin))> cluster_types;
cluster_types.reserve(std::distance(begin, end));
std::transform(
begin,
Expand Down
10 changes: 9 additions & 1 deletion src/v/test_utils/randoms.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,21 @@ auto random_tristate(Func f) {
}

template<typename Fn, typename T = std::invoke_result_t<Fn>>
inline auto random_vector(Fn&& gen, size_t size = 20) -> std::vector<T> {
auto random_vector(Fn&& gen, size_t size = 20) -> std::vector<T> {
std::vector<T> v;
v.resize(size);
std::generate_n(v.begin(), size, gen);
return v;
}

template<typename Fn, typename T = std::invoke_result_t<Fn>>
auto random_chunked_vector(Fn&& gen, size_t size = 20) -> chunked_vector<T> {
chunked_vector<T> v;
v.reserve(size);
std::generate_n(std::back_inserter(v), size, gen);
return v;
}

template<
typename Fn,
typename... Args,
Expand Down

0 comments on commit d58c9fa

Please sign in to comment.