Skip to content

Commit

Permalink
schema_registry: Refactor seq_writer
Browse files Browse the repository at this point in the history
* Remove unusual "this" parameter

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Apr 16, 2024
1 parent 23c12f5 commit 60ceea9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 68 deletions.
107 changes: 50 additions & 57 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ void seq_writer::advance_offset_inner(model::offset offset) {
}

ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(
subject_schema schema, model::offset write_at, seq_writer& seq) {
subject_schema schema, model::offset write_at) {
// Check if store already contains this data: if
// so, we do no I/O and return the schema ID.
auto projected = co_await seq._store.project_ids(schema).handle_exception(
auto projected = co_await _store.project_ids(schema).handle_exception(
[](std::exception_ptr e) {
vlog(plog.debug, "write_subject_version: project_ids failed: {}", e);
return ss::make_exception_future<sharded_store::insert_result>(e);
Expand All @@ -153,7 +153,7 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(

auto key = schema_key{
.seq{write_at},
.node{seq._node_id},
.node{_node_id},
.sub{schema.schema.sub()},
.version{projected.version}};
auto value = canonical_schema_value{
Expand All @@ -164,13 +164,12 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(

auto batch = as_record_batch(key, value);

auto success = co_await seq.produce_and_check(
write_at, std::move(batch));
auto success = co_await produce_and_check(write_at, std::move(batch));
if (success) {
auto applier = consume_to_store(seq._store, seq);
auto applier = consume_to_store(_store, *this);
using Tag = decltype(value.schema)::tag;
co_await applier.apply<Tag>(write_at, key, value);
seq.advance_offset_inner(write_at);
advance_offset_inner(write_at);
co_return projected.id;
} else {
co_return std::nullopt;
Expand All @@ -179,17 +178,16 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(
}

ss::future<schema_id> seq_writer::write_subject_version(subject_schema schema) {
return sequenced_write([this, schema{std::move(schema)}](
model::offset write_at, seq_writer& seq) {
return do_write_subject_version(schema, write_at, seq);
});
return sequenced_write(
[schema{std::move(schema)}](model::offset write_at, seq_writer& seq) {
return seq.do_write_subject_version(schema, write_at);
});
}

ss::future<std::optional<bool>> seq_writer::do_write_config(
std::optional<subject> sub,
compatibility_level compat,
model::offset write_at,
seq_writer& seq) {
model::offset write_at) {
vlog(
plog.debug,
"write_config sub={} compat={} offset={}",
Expand All @@ -201,10 +199,10 @@ ss::future<std::optional<bool>> seq_writer::do_write_config(
// Check for no-op case
compatibility_level existing;
if (sub.has_value()) {
existing = co_await seq._store.get_compatibility(
existing = co_await _store.get_compatibility(
sub.value(), default_to_global::no);
} else {
existing = co_await seq._store.get_compatibility();
existing = co_await _store.get_compatibility();
}
if (existing == compat) {
co_return false;
Expand All @@ -213,15 +211,15 @@ ss::future<std::optional<bool>> seq_writer::do_write_config(
// ignore
}

auto key = config_key{.seq{write_at}, .node{seq._node_id}, .sub{sub}};
auto key = config_key{.seq{write_at}, .node{_node_id}, .sub{sub}};
auto value = config_value{.compat = compat};
auto batch = as_record_batch(key, value);

auto success = co_await seq.produce_and_check(write_at, std::move(batch));
auto success = co_await produce_and_check(write_at, std::move(batch));
if (success) {
auto applier = consume_to_store(seq._store, seq);
auto applier = consume_to_store(_store, *this);
co_await applier.apply(write_at, key, value);
seq.advance_offset_inner(write_at);
advance_offset_inner(write_at);
co_return true;
} else {
// Pass up a None, our caller's cue to retry
Expand All @@ -231,18 +229,18 @@ ss::future<std::optional<bool>> seq_writer::do_write_config(

ss::future<bool> seq_writer::write_config(
std::optional<subject> sub, compatibility_level compat) {
return sequenced_write([this, sub{std::move(sub)}, compat](
model::offset write_at, seq_writer& seq) {
return do_write_config(sub, compat, write_at, seq);
});
return sequenced_write(
[sub{std::move(sub)}, compat](model::offset write_at, seq_writer& seq) {
return seq.do_write_config(sub, compat, write_at);
});
}

ss::future<std::optional<bool>> seq_writer::do_delete_config(
subject sub, model::offset write_at, seq_writer& seq) {
ss::future<std::optional<bool>>
seq_writer::do_delete_config(subject sub, model::offset write_at) {
vlog(plog.debug, "delete config sub={} offset={}", sub, write_at);

try {
co_await seq._store.get_compatibility(sub, default_to_global::no);
co_await _store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
// subject config already blank
co_return false;
Expand Down Expand Up @@ -286,11 +284,11 @@ ss::future<std::optional<bool>> seq_writer::do_delete_config(
throw kafka::exception(res.error_code, *res.error_message);
}

auto applier = consume_to_store(seq._store, seq);
auto applier = consume_to_store(_store, *this);
auto offset = res.base_offset;
for (const auto& k : keys) {
co_await applier.apply(offset, k, std::nullopt);
seq.advance_offset_inner(offset);
advance_offset_inner(offset);
++offset;
}

Expand All @@ -299,27 +297,24 @@ ss::future<std::optional<bool>> seq_writer::do_delete_config(

ss::future<bool> seq_writer::delete_config(subject sub) {
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_config(sub, write_at, seq);
[sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return seq.do_delete_config(sub, write_at);
});
}

/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub,
schema_version version,
model::offset write_at,
seq_writer& seq) {
if (co_await seq._store.is_referenced(sub, version)) {
subject sub, schema_version version, model::offset write_at) {
if (co_await _store.is_referenced(sub, version)) {
throw as_exception(has_references(sub, version));
}

auto s_res = co_await seq._store.get_subject_schema(
auto s_res = co_await _store.get_subject_schema(
sub, version, include_deleted::yes);
subject_schema ss = std::move(s_res);

auto key = schema_key{
.seq{write_at}, .node{seq._node_id}, .sub{sub}, .version{version}};
.seq{write_at}, .node{_node_id}, .sub{sub}, .version{version}};
vlog(plog.debug, "seq_writer::delete_subject_version {}", key);
auto value = canonical_schema_value{
.schema{std::move(ss.schema)},
Expand All @@ -329,12 +324,12 @@ ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(

auto batch = as_record_batch(key, value);

auto success = co_await seq.produce_and_check(write_at, std::move(batch));
auto success = co_await produce_and_check(write_at, std::move(batch));
if (success) {
auto applier = consume_to_store(seq._store, seq);
auto applier = consume_to_store(_store, *this);
using Tag = decltype(value.schema)::tag;
co_await applier.apply<Tag>(write_at, key, value);
seq.advance_offset_inner(write_at);
advance_offset_inner(write_at);
co_return true;
} else {
// Pass up a None, our caller's cue to retry
Expand All @@ -344,26 +339,25 @@ ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(

ss::future<bool>
seq_writer::delete_subject_version(subject sub, schema_version version) {
return sequenced_write([this, sub{std::move(sub)}, version](
model::offset write_at, seq_writer& seq) {
return do_delete_subject_version(sub, version, write_at, seq);
});
return sequenced_write(
[sub{std::move(sub)}, version](model::offset write_at, seq_writer& seq) {
return seq.do_delete_subject_version(sub, version, write_at);
});
}

ss::future<std::optional<std::vector<schema_version>>>
seq_writer::do_delete_subject_impermanent(
subject sub, model::offset write_at, seq_writer& seq) {
seq_writer::do_delete_subject_impermanent(subject sub, model::offset write_at) {
// Grab the versions before they're gone.
auto versions = co_await seq._store.get_versions(sub, include_deleted::no);
auto versions = co_await _store.get_versions(sub, include_deleted::no);

// Inspect the subject to see if its already deleted
if (co_await seq._store.is_subject_deleted(sub)) {
if (co_await _store.is_subject_deleted(sub)) {
co_return std::make_optional(versions);
}

auto is_referenced = co_await ssx::parallel_transform(
versions.begin(), versions.end(), [&seq, &sub](auto const& ver) {
return seq._store.is_referenced(sub, ver);
versions.begin(), versions.end(), [this, &sub](auto const& ver) {
return _store.is_referenced(sub, ver);
});
if (std::any_of(is_referenced.begin(), is_referenced.end(), [](auto v) {
return v;
Expand All @@ -372,16 +366,15 @@ seq_writer::do_delete_subject_impermanent(
}

// Proceed to write
auto key = delete_subject_key{
.seq{write_at}, .node{seq._node_id}, .sub{sub}};
auto key = delete_subject_key{.seq{write_at}, .node{_node_id}, .sub{sub}};
auto value = delete_subject_value{.sub{sub}};
auto batch = as_record_batch(key, value);

auto success = co_await seq.produce_and_check(write_at, std::move(batch));
auto success = co_await produce_and_check(write_at, std::move(batch));
if (success) {
auto applier = consume_to_store(seq._store, seq);
auto applier = consume_to_store(_store, *this);
co_await applier.apply(write_at, key, value);
seq.advance_offset_inner(write_at);
advance_offset_inner(write_at);
co_return versions;
} else {
// Pass up a None, our caller's cue to retry
Expand All @@ -393,8 +386,8 @@ ss::future<std::vector<schema_version>>
seq_writer::delete_subject_impermanent(subject sub) {
vlog(plog.debug, "delete_subject_impermanent sub={}", sub);
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_subject_impermanent(sub, write_at, seq);
[sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return seq.do_delete_subject_impermanent(sub, write_at);
});
}

Expand Down
17 changes: 6 additions & 11 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,22 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

void advance_offset_inner(model::offset offset);

ss::future<std::optional<schema_id>> do_write_subject_version(
subject_schema schema, model::offset write_at, seq_writer& seq);
ss::future<std::optional<schema_id>>
do_write_subject_version(subject_schema schema, model::offset write_at);

ss::future<std::optional<bool>> do_write_config(
std::optional<subject> sub,
compatibility_level compat,
model::offset write_at,
seq_writer& seq);
model::offset write_at);

ss::future<std::optional<bool>>
do_delete_config(subject sub, model::offset write_at, seq_writer& seq);
do_delete_config(subject sub, model::offset write_at);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub,
schema_version version,
model::offset write_at,
seq_writer& seq);
subject sub, schema_version version, model::offset write_at);

ss::future<std::optional<std::vector<schema_version>>>
do_delete_subject_impermanent(
subject sub, model::offset write_at, seq_writer& seq);
do_delete_subject_impermanent(subject sub, model::offset write_at);

ss::future<std::vector<schema_version>> delete_subject_permanent_inner(
subject sub, std::optional<schema_version> version);
Expand Down

0 comments on commit 60ceea9

Please sign in to comment.