Skip to content

Commit

Permalink
Adjust serialized query buffer sizes (#2115) (#2117)
Browse files Browse the repository at this point in the history
Adjust serialized query buffer sizes

This change the client/server flow to always send the server the
original user requested buffer sizes. This solves a bug in which with
serialized queries incompletes would cause the "server" to use smaller
buffers for each iteration of the incomplete query. This yield
decreasing performance as the buffers approached zero. The fix here lets
the server always get the original user's buffer size.
  • Loading branch information
Shelnutt2 committed Mar 4, 2021
1 parent f91a6fa commit a206ee1
Show file tree
Hide file tree
Showing 5 changed files with 662 additions and 563 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Expand Up @@ -16,6 +16,8 @@

## Bug fixes

* Always use original buffer size in serialized read queries serverside. [#2115](https://github.com/TileDB-Inc/TileDB/pull/2115)

## API additions

### C API
Expand Down
175 changes: 131 additions & 44 deletions tiledb/sm/serialization/query.cc
Expand Up @@ -527,11 +527,22 @@ Status query_to_capnp(
attr_buffer_builder.setVarLenBufferSizeInBytes(*buff.buffer_var_size_);
total_fixed_len_bytes += *buff.buffer_size_;
attr_buffer_builder.setFixedLenBufferSizeInBytes(*buff.buffer_size_);

// Set original user requested sizes
attr_buffer_builder.setOriginalVarLenBufferSizeInBytes(
buff.original_buffer_var_size_);
attr_buffer_builder.setOriginalFixedLenBufferSizeInBytes(
buff.original_buffer_size_);
} else if (buff.buffer_ != nullptr && buff.buffer_size_ != nullptr) {
// Fixed-length buffer
total_fixed_len_bytes += *buff.buffer_size_;
attr_buffer_builder.setFixedLenBufferSizeInBytes(*buff.buffer_size_);
attr_buffer_builder.setVarLenBufferSizeInBytes(0);

// Set original user requested sizes
attr_buffer_builder.setOriginalVarLenBufferSizeInBytes(0);
attr_buffer_builder.setOriginalFixedLenBufferSizeInBytes(
buff.original_buffer_size_);
} else {
assert(false);
}
Expand All @@ -540,6 +551,10 @@ Status query_to_capnp(
total_validity_len_bytes += *buff.validity_vector_.buffer_size();
attr_buffer_builder.setValidityLenBufferSizeInBytes(
*buff.validity_vector_.buffer_size());

// Set original user requested sizes
attr_buffer_builder.setOriginalValidityLenBufferSizeInBytes(
buff.original_validity_vector_size_);
}
}

Expand Down Expand Up @@ -636,43 +651,104 @@ Status query_from_capnp(

// Get any buffers already set on this query object.
uint64_t* existing_offset_buffer = nullptr;
uint64_t* existing_offset_buffer_size = nullptr;
uint64_t existing_offset_buffer_size = 0;
void* existing_buffer = nullptr;
uint64_t* existing_buffer_size = nullptr;
uint64_t existing_buffer_size = 0;
uint8_t* existing_validity_buffer = nullptr;
uint64_t* existing_validity_buffer_size = nullptr;
uint64_t existing_validity_buffer_size = 0;

// For writes and read (client side) we need ptrs to set the sizes properly
uint64_t* existing_buffer_size_ptr = nullptr;
uint64_t* existing_offset_buffer_size_ptr = nullptr;
uint64_t* existing_validity_buffer_size_ptr = nullptr;

auto var_size = schema->var_size(name);
auto nullable = schema->is_nullable(name);
if (var_size) {
if (!nullable) {
RETURN_NOT_OK(query->get_buffer(
name.c_str(),
&existing_offset_buffer,
&existing_offset_buffer_size,
&existing_buffer,
&existing_buffer_size));
if (type == QueryType::READ && context == SerializationContext::SERVER) {
const QueryBuffer& query_buffer = query->buffer(name);
// We use the query_buffer directly in order to get the original buffer
// sizes This avoid a problem where an incomplete query will change the
// users buffer size to the smaller results and we end up not being able
// to correctly calcuate if the new results can fit into the users buffer
if (var_size) {
if (!nullable) {
existing_offset_buffer = static_cast<uint64_t*>(query_buffer.buffer_);
existing_offset_buffer_size = query_buffer.original_buffer_size_;
existing_buffer = query_buffer.buffer_var_;
existing_buffer_size = query_buffer.original_buffer_var_size_;
} else {
existing_offset_buffer = static_cast<uint64_t*>(query_buffer.buffer_);
existing_offset_buffer_size = query_buffer.original_buffer_size_;
existing_buffer = query_buffer.buffer_var_;
existing_buffer_size = query_buffer.original_buffer_var_size_;
existing_validity_buffer = query_buffer.validity_vector_.buffer();
existing_validity_buffer_size =
query_buffer.original_validity_vector_size_;
}
} else {
RETURN_NOT_OK(query->get_buffer_vbytemap(
name.c_str(),
&existing_offset_buffer,
&existing_offset_buffer_size,
&existing_buffer,
&existing_buffer_size,
&existing_validity_buffer,
&existing_validity_buffer_size));
if (!nullable) {
existing_buffer = query_buffer.buffer_;
existing_buffer_size = query_buffer.original_buffer_size_;
} else {
existing_buffer = query_buffer.buffer_;
existing_buffer_size = query_buffer.original_buffer_size_;
existing_validity_buffer = query_buffer.validity_vector_.buffer();
existing_validity_buffer_size =
query_buffer.original_validity_vector_size_;
}
}
} else {
if (!nullable) {
RETURN_NOT_OK(query->get_buffer(
name.c_str(), &existing_buffer, &existing_buffer_size));
// For writes we need to use get_buffer
if (var_size) {
if (!nullable) {
RETURN_NOT_OK(query->get_buffer(
name.c_str(),
&existing_offset_buffer,
&existing_offset_buffer_size_ptr,
&existing_buffer,
&existing_buffer_size_ptr));

if (existing_offset_buffer_size_ptr != nullptr)
existing_offset_buffer_size = *existing_offset_buffer_size_ptr;
if (existing_offset_buffer_size_ptr != nullptr)
existing_buffer_size = *existing_buffer_size_ptr;
} else {
RETURN_NOT_OK(query->get_buffer_vbytemap(
name.c_str(),
&existing_offset_buffer,
&existing_offset_buffer_size_ptr,
&existing_buffer,
&existing_buffer_size_ptr,
&existing_validity_buffer,
&existing_validity_buffer_size_ptr));

if (existing_offset_buffer_size_ptr != nullptr)
existing_offset_buffer_size = *existing_offset_buffer_size_ptr;
if (existing_buffer_size_ptr != nullptr)
existing_buffer_size = *existing_buffer_size_ptr;
if (existing_validity_buffer_size_ptr != nullptr)
existing_validity_buffer_size = *existing_validity_buffer_size_ptr;
}
} else {
RETURN_NOT_OK(query->get_buffer_vbytemap(
name.c_str(),
&existing_buffer,
&existing_buffer_size,
&existing_validity_buffer,
&existing_validity_buffer_size));
if (!nullable) {
RETURN_NOT_OK(query->get_buffer(
name.c_str(), &existing_buffer, &existing_buffer_size_ptr));

if (existing_buffer_size_ptr != nullptr)
existing_buffer_size = *existing_buffer_size_ptr;
} else {
RETURN_NOT_OK(query->get_buffer_vbytemap(
name.c_str(),
&existing_buffer,
&existing_buffer_size_ptr,
&existing_validity_buffer,
&existing_validity_buffer_size_ptr));

if (existing_buffer_size_ptr != nullptr)
existing_buffer_size = *existing_buffer_size_ptr;
if (existing_validity_buffer_size_ptr != nullptr)
existing_validity_buffer_size = *existing_validity_buffer_size_ptr;
}
}
}

Expand All @@ -682,20 +758,19 @@ Status query_from_capnp(
// data.
const uint64_t curr_data_size =
attr_copy_state == nullptr ? 0 : attr_copy_state->data_size;
assert(existing_buffer_size != nullptr);
const uint64_t data_size_left = *existing_buffer_size - curr_data_size;
const uint64_t data_size_left = existing_buffer_size - curr_data_size;
const uint64_t curr_offset_size =
attr_copy_state == nullptr ? 0 : attr_copy_state->offset_size;
const uint64_t offset_size_left =
existing_offset_buffer_size == nullptr ?
existing_offset_buffer_size == 0 ?
0 :
*existing_offset_buffer_size - curr_offset_size;
existing_offset_buffer_size - curr_offset_size;
const uint64_t curr_validity_size =
attr_copy_state == nullptr ? 0 : attr_copy_state->validity_size;
const uint64_t validity_size_left =
existing_validity_buffer_size == nullptr ?
existing_validity_buffer_size == 0 ?
0 :
*existing_validity_buffer_size - curr_validity_size;
existing_validity_buffer_size - curr_validity_size;

const bool has_mem_for_data =
(var_size && data_size_left >= varlen_size) ||
Expand Down Expand Up @@ -759,10 +834,12 @@ Status query_from_capnp(
if (attr_copy_state == nullptr) {
// Set the size directly on the query (so user can introspect on
// result size).
*existing_offset_buffer_size = fixedlen_size;
*existing_buffer_size = varlen_size;
if (nullable)
*existing_validity_buffer_size = validitylen_size;
if (existing_offset_buffer_size_ptr != nullptr)
*existing_offset_buffer_size_ptr = fixedlen_size;
if (existing_buffer_size_ptr != nullptr)
*existing_buffer_size_ptr = varlen_size;
if (nullable && existing_validity_buffer_size_ptr != nullptr)
*existing_validity_buffer_size_ptr = validitylen_size;
} else {
// Accumulate total bytes copied (caller's responsibility to
// eventually update the query).
Expand All @@ -785,9 +862,10 @@ Status query_from_capnp(
}

if (attr_copy_state == nullptr) {
*existing_buffer_size = fixedlen_size;
if (nullable)
*existing_validity_buffer_size = validitylen_size;
if (existing_buffer_size_ptr != nullptr)
*existing_buffer_size_ptr = fixedlen_size;
if (nullable && existing_validity_buffer_size_ptr != nullptr)
*existing_validity_buffer_size_ptr = validitylen_size;
} else {
attr_copy_state->data_size += fixedlen_size;
if (nullable)
Expand All @@ -811,9 +889,18 @@ Status query_from_capnp(
Buffer offsets_buff(nullptr, fixedlen_size);
Buffer varlen_buff(nullptr, varlen_size);
Buffer validitylen_buff(nullptr, validitylen_size);
attr_state->fixed_len_size = fixedlen_size;
attr_state->var_len_size = varlen_size;
attr_state->validity_len_size = validitylen_size;
// For the server on reads we want to set the original user requested
// buffer sizes This handles the case of incomplete queries where on the
// second `submit()` call the client's buffer size will be the first
// submit's result size not the original user set buffer size. To work
// around this we revert the server to always use the full original user
// requested buffer sizes.
attr_state->fixed_len_size =
buffer_header.getOriginalFixedLenBufferSizeInBytes();
attr_state->var_len_size =
buffer_header.getOriginalVarLenBufferSizeInBytes();
attr_state->validity_len_size =
buffer_header.getOriginalValidityLenBufferSizeInBytes();
attr_state->fixed_len_data.swap(offsets_buff);
attr_state->var_len_data.swap(varlen_buff);
attr_state->validity_len_data.swap(validitylen_buff);
Expand Down
9 changes: 9 additions & 0 deletions tiledb/sm/serialization/tiledb-rest.capnp
Expand Up @@ -105,6 +105,15 @@ struct AttributeBufferHeader {

validityLenBufferSizeInBytes @3 :UInt64;
# Number of bytes in the validity data buffer

originalFixedLenBufferSizeInBytes @4 :UInt64;
# Original user set number of bytes in the fixed-length attribute data buffer

originalVarLenBufferSizeInBytes @5 :UInt64;
# Original user set number of bytes in the var-length attribute data buffer

originalValidityLenBufferSizeInBytes @6 :UInt64;
# Original user set number of bytes in the validity data buffer
}

struct Dimension {
Expand Down

0 comments on commit a206ee1

Please sign in to comment.