Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for remote global order writes #3393

Merged
merged 49 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4f12a05
basic code skeleton for serialized global order writes
robertbindar Jun 10, 2022
54cf7ca
serialize fields of GlobalWriteState
robertbindar Jun 22, 2022
fab314e
deserialization of global write state
robertbindar Jun 24, 2022
4aa7442
global write state serialization
robertbindar Jun 27, 2022
64ef300
working round-trip - single submit, draft test case
robertbindar Jul 6, 2022
5896b74
multiple submits, working version
robertbindar Jul 7, 2022
1271bf4
extend testcase for nullable and varsize attributes, fix bugs
robertbindar Jul 7, 2022
a4840c0
Get extra coverage by switching all existing
robertbindar Jul 15, 2022
4d2cb3b
fix uninitialized field in SingleCoord constructor
robertbindar Jul 28, 2022
d29873c
Fix bad initialization of nonempty domain for sparse during deseriali…
robertbindar Jul 28, 2022
2ad5946
tile alignment restrictions for remote global order writes
robertbindar Aug 1, 2022
7889bb2
tile alignment restriction should be waived for submit_and_finalize r…
robertbindar Aug 2, 2022
e5f43a8
regular finalize() call should fail for remote global order writes
robertbindar Aug 2, 2022
31dc09a
rest client submit_and_finalize
robertbindar Aug 16, 2022
ad0c092
remove irelevant todo comments
robertbindar Aug 18, 2022
c839a8a
capnp spec extension for multipart uploads
robertbindar Aug 18, 2022
e915548
multipart upload support for global order writes - part1
robertbindar Aug 22, 2022
1db21c4
multipart upload support for global order writes - part2
robertbindar Aug 23, 2022
baaf528
multipart upload support for global order writes - part3
robertbindar Aug 24, 2022
530906d
fix rebasing compile issues
robertbindar Aug 25, 2022
15101ed
fix bug in query deserialization
robertbindar Aug 26, 2022
ebaabf9
fix rest client deserialization exception
robertbindar Aug 26, 2022
af3f6bb
Query submit should flush the s3 file buffers
robertbindar Aug 30, 2022
2f608ca
Serialize has_delete_meta_ field as part of fragment_metadata
ypatia Aug 31, 2022
b870493
subarray boundary checking should happen after the subarray is deseri…
robertbindar Aug 31, 2022
a1af43c
Serialization should clear S3 multipart uploads
robertbindar Sep 1, 2022
8f457ba
fix check_tile_alignment
ypatia Sep 2, 2022
e4d09f6
return a client error if input buffers are <5mb as a temporary limita…
robertbindar Sep 2, 2022
322adaa
enable serialized global order write tests
robertbindar Sep 8, 2022
9d878e1
serialization support for WrittenFragmentInfo, fix failing test
robertbindar Sep 9, 2022
a56acaf
build failure when serialization is disabled
robertbindar Sep 9, 2022
71c3194
fix CI build failure
robertbindar Sep 13, 2022
c5c0e82
fix wrong last tile boundary calculation in test case
robertbindar Sep 15, 2022
8676cf9
Fix bug in Query for sparse.
robertbindar Sep 15, 2022
e7c3b64
CI should be green - part1
robertbindar Sep 16, 2022
c984e58
Fix bug in unit-cppapi-var-offsets heap buffer overflow
robertbindar Sep 19, 2022
aa4d062
CI should be green - part2
robertbindar Sep 19, 2022
32f1f10
flush_multipart_file_buffer should be noop for not supported backends
robertbindar Sep 19, 2022
0e84ee2
disable serialization wrappers for 32bit offsets gl.o.writes
robertbindar Sep 19, 2022
70bd8b6
Disable temporarily serial wrappers for 32-bit dense writes too
ypatia Sep 20, 2022
f9f280e
fix test failure
robertbindar Sep 20, 2022
8534fa7
Flushing of multipart buffers should only happen
robertbindar Sep 20, 2022
1d98417
win32 capnp headers
robertbindar Sep 20, 2022
66b2aca
ifdef serialization in unit-capi-dense_array
robertbindar Sep 20, 2022
9c43ab6
further fixes in unit-capi-dense_array
robertbindar Sep 21, 2022
4254658
revert incorrect change in unit-capi_dense_array
robertbindar Sep 21, 2022
0bf5bd9
one more unit-capi_dense_array fix
robertbindar Sep 21, 2022
bdcb0b0
add inline comments in the capnp global order writes structures
robertbindar Sep 30, 2022
8d51519
add V1 naming to writer globalWriteState capnp member
robertbindar Sep 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 218 additions & 0 deletions test/src/helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,224 @@ void check_counts(span<T> vals, std::vector<uint64_t> expected) {
CHECK(counts[i] == expected[i]);
}
}
void serialize_query(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these just moving of the code in test/src/unit-capi-serialized_queries.cc?

const Context& ctx,
Query& query,
std::vector<uint8_t>* serialized,
bool clientside) {
ctx.handle_error(serialize_query(
ctx.ptr().get(), query.ptr().get(), serialized, clientside));
}

int serialize_query(
tiledb_ctx_t* ctx,
tiledb_query_t* query,
std::vector<uint8_t>* serialized,
bool clientside) {
// Serialize
tiledb_buffer_list_t* buff_list;

if (tiledb_serialize_query(
ctx, query, TILEDB_CAPNP, clientside ? 1 : 0, &buff_list) !=
TILEDB_OK) {
return TILEDB_ERR;
}

// Flatten
tiledb_buffer_t* c_buff;
if (tiledb_buffer_list_flatten(ctx, buff_list, &c_buff) != TILEDB_OK) {
return TILEDB_ERR;
}

// Wrap in a safe pointer
auto deleter = [](tiledb_buffer_t* b) { tiledb_buffer_free(&b); };
std::unique_ptr<tiledb_buffer_t, decltype(deleter)> buff_ptr(c_buff, deleter);

// Copy into user vector
void* data;
uint64_t num_bytes;
if (tiledb_buffer_get_data(ctx, c_buff, &data, &num_bytes) != TILEDB_OK) {
return TILEDB_ERR;
}
serialized->clear();
serialized->insert(
serialized->end(),
static_cast<const uint8_t*>(data),
static_cast<const uint8_t*>(data) + num_bytes);

// Free buffer list
tiledb_buffer_list_free(&buff_list);

return TILEDB_OK;
}

void deserialize_query(
const Context& ctx,
std::vector<uint8_t>& serialized,
Query* query,
bool clientside) {
ctx.handle_error(deserialize_query(
ctx.ptr().get(), serialized, query->ptr().get(), clientside));
}

int deserialize_query(
tiledb_ctx_t* ctx,
std::vector<uint8_t>& serialized,
tiledb_query_t* query,
bool clientside) {
tiledb_buffer_t* c_buff;
if (tiledb_buffer_alloc(ctx, &c_buff) != TILEDB_OK) {
return TILEDB_ERR;
}

// Wrap in a safe pointer
auto deleter = [](tiledb_buffer_t* b) { tiledb_buffer_free(&b); };
std::unique_ptr<tiledb_buffer_t, decltype(deleter)> buff_ptr(c_buff, deleter);

if (tiledb_buffer_set_data(
ctx,
c_buff,
reinterpret_cast<void*>(&serialized[0]),
static_cast<uint64_t>(serialized.size())) != TILEDB_OK) {
return TILEDB_ERR;
}

// Deserialize
return tiledb_deserialize_query(
ctx, c_buff, TILEDB_CAPNP, clientside ? 1 : 0, query);
}

std::vector<void*> allocate_query_buffers(
const Context& ctx, const Array& array, Query* query) {
(void)array;
std::vector<void*> to_free;
void* unused1;
uint64_t* unused2;
uint8_t* unused3;
uint64_t *a1_size, *a2_size, *a2_validity_size, *a3_size, *a3_offset_size,
*coords_size;
ctx.handle_error(tiledb_query_get_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a1", &unused1, &a1_size));
ctx.handle_error(tiledb_query_get_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a2", &unused1, &a2_size));
ctx.handle_error(tiledb_query_get_validity_buffer(
ctx.ptr().get(), query->ptr().get(), "a2", &unused3, &a2_validity_size));
ctx.handle_error(tiledb_query_get_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a3", &unused1, &a3_size));
ctx.handle_error(tiledb_query_get_offsets_buffer(
ctx.ptr().get(), query->ptr().get(), "a3", &unused2, &a3_offset_size));
ctx.handle_error(tiledb_query_get_data_buffer(
ctx.ptr().get(),
query->ptr().get(),
TILEDB_COORDS,
&unused1,
&coords_size));

if (a1_size != nullptr) {
void* buff = std::malloc(*a1_size);
ctx.handle_error(tiledb_query_set_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a1", buff, a1_size));
to_free.push_back(buff);
}

if (a2_size != nullptr) {
void* buff = std::malloc(*a2_size);
uint8_t* validity = (uint8_t*)std::malloc(*a2_validity_size);
ctx.handle_error(tiledb_query_set_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a2", buff, a2_size));
ctx.handle_error(tiledb_query_set_validity_buffer(
ctx.ptr().get(), query->ptr().get(), "a2", validity, a2_validity_size));
to_free.push_back(buff);
to_free.push_back(validity);
}

if (a3_size != nullptr) {
void* buff = std::malloc(*a3_size);
uint64_t* offsets = (uint64_t*)std::malloc(*a3_offset_size);
ctx.handle_error(tiledb_query_set_data_buffer(
ctx.ptr().get(), query->ptr().get(), "a3", buff, a3_size));
ctx.handle_error(tiledb_query_set_offsets_buffer(
ctx.ptr().get(), query->ptr().get(), "a3", offsets, a3_offset_size));
to_free.push_back(buff);
to_free.push_back(offsets);
}

if (coords_size != nullptr) {
void* buff = std::malloc(*coords_size);
ctx.handle_error(tiledb_query_set_data_buffer(
ctx.ptr().get(), query->ptr().get(), TILEDB_COORDS, buff, coords_size));
to_free.push_back(buff);
}

return to_free;
}

void submit_serialized_query(const Context& ctx, Query& query) {
submit_serialized_query(ctx.ptr().get(), query.ptr().get());
}
void finalize_serialized_query(const Context& ctx, Query& query) {
finalize_serialized_query(ctx.ptr().get(), query.ptr().get());
}

void submit_serialized_query(tiledb_ctx_t* ctx, tiledb_query_t* query) {
Context context(ctx, false);

tiledb_query_type_t type;
tiledb_query_get_type(ctx, query, &type);
auto uri = query->query_->array()->array_uri().to_string();
Array copy_array(context, uri, type);
Query copy_query(context, copy_array);
std::vector<uint8_t> serialized;
context.handle_error(serialize_query(ctx, query, &serialized, true));
context.handle_error(
deserialize_query(ctx, serialized, copy_query.ptr().get(), false));
copy_query.submit();
context.handle_error(
serialize_query(ctx, copy_query.ptr().get(), &serialized, false));
context.handle_error(deserialize_query(ctx, serialized, query, true));
}
void finalize_serialized_query(tiledb_ctx_t* ctx, tiledb_query_t* query) {
Context context(ctx, false);

tiledb_query_type_t type;
tiledb_query_get_type(ctx, query, &type);
auto uri = query->query_->array()->array_uri().to_string();
Array copy_array(context, uri, type);
Query copy_query(context, copy_array);
std::vector<uint8_t> serialized;
context.handle_error(serialize_query(ctx, query, &serialized, true));
context.handle_error(
deserialize_query(ctx, serialized, copy_query.ptr().get(), false));
copy_query.finalize();
context.handle_error(
serialize_query(ctx, copy_query.ptr().get(), &serialized, false));
context.handle_error(deserialize_query(ctx, serialized, query, true));
}

void submit_and_finalize_serialized_query(
tiledb_ctx_t* ctx, tiledb_query_t* query) {
Context context(ctx, false);

tiledb_query_type_t type;
tiledb_query_get_type(ctx, query, &type);
auto uri = query->query_->array()->array_uri().to_string();
uint64_t timestamp = query->query_->array()->timestamp_end_opened_at();
Array copy_array(context, uri, type, timestamp);
Query copy_query(context, copy_array);
std::vector<uint8_t> serialized;
context.handle_error(serialize_query(ctx, query, &serialized, true));
context.handle_error(
deserialize_query(ctx, serialized, copy_query.ptr().get(), false));
copy_query.submit();
copy_query.finalize();
context.handle_error(
serialize_query(ctx, copy_query.ptr().get(), &serialized, false));
context.handle_error(deserialize_query(ctx, serialized, query, true));
}

void submit_and_finalize_serialized_query(const Context& ctx, Query& query) {
submit_and_finalize_serialized_query(ctx.ptr().get(), query.ptr().get());
}

template void check_subarray<int8_t>(
tiledb::sm::Subarray& subarray, const SubarrayRanges<int8_t>& ranges);
Expand Down
61 changes: 61 additions & 0 deletions test/src/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,67 @@ std::string get_commit_dir(std::string array_dir);
template <class T>
void check_counts(span<T> vals, std::vector<uint64_t> expected);

/**
* Helper function that serializes a query from the "client" or "server"
* perspective. The flow being mimicked here is (for read queries):
*
* - Client sets up read query object including buffers.
* - Client submits query to a remote array.
* - Internal code (not C API) serializes that query and send it via curl.
* - Server receives and deserializes the query using the C API.
* - Server submits query.
* - Server serializes (using C API) the query and sends it back.
* - Client receives response and deserializes the query (not C API). This
* copies the query results into the original user buffers.
* - Client's blocking call to tiledb_query_submit() now returns.
*/
void serialize_query(
const Context& ctx,
Query& query,
std::vector<uint8_t>* serialized,
bool clientside);

/**
* Helper function that deserializes a query from the "client" or "server"
* perspective.
*/
void deserialize_query(
const Context& ctx,
std::vector<uint8_t>& serialized,
Query* query,
bool clientside);

int serialize_query(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doxygen doc.

tiledb_ctx_t* ctx,
tiledb_query_t* query,
std::vector<uint8_t>* serialized,
bool clientside);

int deserialize_query(
tiledb_ctx_t* ctx,
std::vector<uint8_t>& serialized,
tiledb_query_t* query,
bool clientside);

void submit_serialized_query(tiledb_ctx_t* ctx, tiledb_query_t* query);

void submit_serialized_query(const Context& ctx, Query& query);

void finalize_serialized_query(tiledb_ctx_t* ctx, tiledb_query_t* query);

void finalize_serialized_query(const Context& ctx, Query& query);

void submit_and_finalize_serialized_query(
tiledb_ctx_t* ctx, tiledb_query_t* query);

void submit_and_finalize_serialized_query(const Context& ctx, Query& query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: line break between function and doc for the next.

/**
* Helper function that allocates buffers on a query object that has been
* deserialized on the "server" side.
*/
std::vector<void*> allocate_query_buffers(
const Context& ctx, const Array& array, Query* query);

} // End of namespace test

} // End of namespace tiledb
Expand Down
18 changes: 16 additions & 2 deletions test/src/unit-backwards_compat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1267,8 +1267,22 @@ TEST_CASE(
query_write.set_data_buffer("d1", d1_write);
query_write.set_data_buffer("d2", d2_write);

query_write.submit();
query_write.finalize();
bool serialized_writes = false;
SECTION("no serialization") {
serialized_writes = false;
}
SECTION("serialization enabled global order write") {
#ifdef TILEDB_SERIALIZATION
serialized_writes = true;
#endif
}
if (!serialized_writes) {
query_write.submit();
query_write.finalize();
} else {
submit_and_finalize_serialized_query(ctx, query_write);
}

array_write.close();

FragmentInfo fragment_info(ctx, array_name);
Expand Down
29 changes: 22 additions & 7 deletions test/src/unit-capi-any.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <cstring>
#include <iostream>
#include "helpers.h"

struct AnyFx {
const int C1 = 5;
Expand Down Expand Up @@ -163,13 +164,27 @@ void AnyFx::write_array(const std::string& array_name) {
ctx, query, attributes[0], (uint64_t*)buffers[0], &buffer_sizes[0]);
REQUIRE(rc == TILEDB_OK);

// Submit query
rc = tiledb_query_submit(ctx, query);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_query_finalize(ctx, query);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_query_finalize(ctx, query); // Second time must create no problem
REQUIRE(rc == TILEDB_OK);
bool serialized_writes = false;
SECTION("no serialization") {
serialized_writes = false;
}
SECTION("serialization enabled global order write") {
#ifdef TILEDB_SERIALIZATION
serialized_writes = true;
#endif
}
if (!serialized_writes) {
rc = tiledb_query_submit(ctx, query);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_finalize(ctx, query);
REQUIRE(rc == TILEDB_OK);
// Second time must create no problem
rc = tiledb_query_finalize(ctx, query);
REQUIRE(rc == TILEDB_OK);
} else {
tiledb::test::submit_and_finalize_serialized_query(ctx, query);
tiledb::test::finalize_serialized_query(ctx, query);
}

// Close array
rc = tiledb_array_close(ctx, array);
Expand Down