Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deletes: implement serialization. #3450

Merged
merged 3 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 108 additions & 9 deletions test/src/unit-capi-serialized_queries.cc
Expand Up @@ -141,6 +141,17 @@ struct SerializationFx {
REQUIRE(loop_num->second > 0);
}

static void check_delete_stats(const Query& query) {
auto stats = ((sm::Reader*)query.ptr()->query_->strategy())->stats();
REQUIRE(stats != nullptr);
auto counters = stats->counters();
REQUIRE(counters != nullptr);
auto dowork_num = counters->find(
"Context.StorageManager.Query.Deletes.dowork.timer_count");
REQUIRE((dowork_num != counters->end()));
REQUIRE(dowork_num->second > 0);
}

void create_array(tiledb_array_type_t type) {
ArraySchema schema(ctx, type);
Domain domain(ctx);
Expand Down Expand Up @@ -317,6 +328,33 @@ struct SerializationFx {
check_write_stats(query);
}

void write_sparse_delete() {
Array array(ctx, array_uri, TILEDB_DELETE);
Query query(ctx, array);

// Define query condition (a1 < 5).
QueryCondition qc(ctx);
int32_t val = 5;
qc.init("a1", &val, sizeof(int32_t), TILEDB_LT);
query.set_condition(qc);

// Serialize into a copy and submit.
std::vector<uint8_t> serialized;
serialize_query(ctx, query, &serialized, true);
Array array2(ctx, array_uri, TILEDB_DELETE);
Query query2(ctx, array2);
deserialize_query(ctx, serialized, &query2, false);
query2.submit();

// Make sure query2 has logged stats
check_delete_stats(query2);
serialize_query(ctx, query2, &serialized, false);
deserialize_query(ctx, serialized, &query, true);

// The deserialized query should also include the delete stats
check_delete_stats(query);
}

void write_sparse_array_split_coords() {
std::vector<int32_t> d1 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

Expand Down Expand Up @@ -586,7 +624,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -649,7 +687,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

// We expect all cells where `a1` >= `cmp_value` to be filtered
Expand Down Expand Up @@ -728,7 +766,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -796,7 +834,7 @@ TEST_CASE_METHOD(
// Deserialize into original query (client side).
deserialize_query(ctx, serialized, &q, true);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

for (void* b : to_free)
Expand Down Expand Up @@ -888,7 +926,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -948,7 +986,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -1012,7 +1050,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -1067,7 +1105,7 @@ TEST_CASE_METHOD(
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
Expand Down Expand Up @@ -1124,7 +1162,7 @@ TEST_CASE_METHOD(
// Deserialize into original query (client side).
deserialize_query(ctx, serialized, &q, true);

// The deserialized query should also include the write stats
// The deserialized query should also include the read stats
check_read_stats(query);

for (void* b : to_free)
Expand Down Expand Up @@ -1170,3 +1208,64 @@ TEST_CASE_METHOD(
// TODO: check results
}
}

TEST_CASE_METHOD(
SerializationFx,
"Query serialization, sparse delete",
"[query][sparse][delete][serialization]") {
create_array(TILEDB_SPARSE);
write_sparse_array();
write_sparse_delete();

SECTION("- Read all") {
Array array(ctx, array_uri, TILEDB_READ);
Query query(ctx, array);
std::vector<int32_t> coords(1000);
std::vector<uint32_t> a1(1000);
std::vector<uint32_t> a2(1000);
std::vector<uint8_t> a2_nullable(1000);
std::vector<char> a3_data(1000 * 100);
std::vector<uint64_t> a3_offsets(1000);
std::vector<int32_t> subarray = {1, 10, 1, 10};

query.set_subarray(subarray);
query.set_coordinates(coords);
query.set_data_buffer("a1", a1);
query.set_data_buffer("a2", a2);
query.set_validity_buffer("a2", a2_nullable);
query.set_data_buffer("a3", a3_data);
query.set_offsets_buffer("a3", a3_offsets);

// Serialize into a copy and submit.
std::vector<uint8_t> serialized;
serialize_query(ctx, query, &serialized, true);
Array array2(ctx, array_uri, TILEDB_READ);
Query query2(ctx, array2);
deserialize_query(ctx, serialized, &query2, false);
auto to_free = allocate_query_buffers(ctx, array2, &query2);
query2.submit();
serialize_query(ctx, query2, &serialized, false);

// Make sure query2 has logged stats
check_read_stats(query2);

// Deserialize into original query
deserialize_query(ctx, serialized, &query, true);
REQUIRE(query.query_status() == Query::Status::COMPLETE);

// The deserialized query should also include the read stats
check_read_stats(query);

auto result_el = query.result_buffer_elements_nullable();
REQUIRE(std::get<1>(result_el["a1"]) == 5);
REQUIRE(std::get<1>(result_el["a2"]) == 10);
REQUIRE(std::get<2>(result_el["a2"]) == 5);
REQUIRE(std::get<0>(result_el["a3"]) == 5);
REQUIRE(std::get<1>(result_el["a3"]) == 40);

// TODO: check results

for (void* b : to_free)
std::free(b);
}
}
4 changes: 4 additions & 0 deletions tiledb/sm/query/DELETES_AND_UPDATES.md
@@ -0,0 +1,4 @@
# Deletes

Deletes are implemented as a query type, `TILEDB_QUERY_TYPE_DELETE`, which requires
a QueryCondition to be set on the Query. Upon query submission, TileDB creates a [delete commit file](https://github.com/TileDB-Inc/TileDB/blob/dev/format_spec/delete_commit_file.md) containing a serialized version of the query condition. All attribute values matching the query condition expression are considered to be deleted as of the timestamp of the commit file. During reads, any delete commit files within the query range are loaded and deserialized, then the _negation_ of the stored query condition is applied to the final result.
5 changes: 3 additions & 2 deletions tiledb/sm/query/deletes_and_updates/deletes.cc
Expand Up @@ -63,7 +63,8 @@ Deletes::Deletes(
std::unordered_map<std::string, QueryBuffer>& buffers,
Subarray& subarray,
Layout layout,
QueryCondition& condition)
QueryCondition& condition,
bool skip_checks_serialization)
: StrategyBase(
stats,
logger->clone("Deletes", ++logger_id_),
Expand Down Expand Up @@ -94,7 +95,7 @@ Deletes::Deletes(
"Cannot initialize deletes; Subarrays are not supported");
}

if (condition_.empty()) {
if (!skip_checks_serialization && condition_.empty()) {
throw DeleteStatusException(
"Cannot initialize deletes; One condition is needed");
}
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/query/deletes_and_updates/deletes.h
Expand Up @@ -60,7 +60,8 @@ class Deletes : public StrategyBase, public IQueryStrategy {
std::unordered_map<std::string, QueryBuffer>& buffers,
Subarray& subarray,
Layout layout,
QueryCondition& condition);
QueryCondition& condition,
bool skip_checks_serialization = false);

/** Destructor. */
~Deletes();
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/query/query.cc
Expand Up @@ -1350,7 +1350,8 @@ Status Query::create_strategy(bool skip_checks_serialization) {
buffers_,
subarray_,
layout_,
condition_));
condition_,
skip_checks_serialization));
} else {
return logger_->status(
Status_QueryError("Cannot create strategy; unsupported query type"));
Expand Down