Skip to content

Commit

Permalink
Deletes consolidation: switch from marker hashes to condition indexes.
Browse files Browse the repository at this point in the history
std::hash is not guaranteed to give consistent results on different
platforms so it cannot be used to store on disk data. It was used to
hash the delete condition marker to know which delete condition deleted
a specific cell. Fortunately, as we already store the processed
conditions in the fragment metadata, we can store an index to the
processed conditions and that will allow to retrieve the same
information. The only complexity comes from when a previous fragment
consolidated with deletes gets processed by consolidation, we need to
convert the index into the original fragment processed condition to the
new fragment processed conditions. This can be done by building a hash
table with a key of the condition marker and a value of the index into
the new processed condition array for constant time conversion.

---
TYPE: IMPROVEMENT
DESC: Deletes condolidation: switch from marker hashes to condition indexes.
  • Loading branch information
KiterLuc committed Aug 13, 2022
1 parent ac13401 commit 894c8f5
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 71 deletions.
6 changes: 3 additions & 3 deletions format_spec/fragment.md
Expand Up @@ -26,7 +26,7 @@ my_array # array folder
| |_ ...
| |_ dt.tdb # delete timestamp attribute
| |_ ...
| |_ dcmh.tdb # delete condition marker hash attribute
| |_ dci.tdb # delete condition index attribute
| |_ ...
|_ ...
```
Expand All @@ -43,7 +43,7 @@ There can be any number of fragments in an array. The fragment folder contains:
* The names of the data files are not dependent on the names of the attributes/dimensions. The file names are determined by the order of the attributes and dimensions in the array schema.
* The timestamp fixed attribute (`t.tdb`) is, for fragments consolidated with timestamps, the time at which a cell was added.
* The delete timestamp fixed attribute (`dt.tdb`) is, for fragments consolidated with delete conditions, the time at which a cell was deleted.
* The delete condition marker hash fixed attribute (`dcmh.tdb`) is, for fragments consolidated with delete conditions, the hash of the delete condition marker that deleted the cell. The delete condition marker is the file path of the delete condition relative to the array URI.
* The delete condition index fixed attribute (`dci.tdb`) is, for fragments consolidated with delete conditions, the index of the delete condition (inside of [Tile Processed Conditions](#tile-processed-conditions)) that deleted the cell.

## Fragment Metadata File

Expand Down Expand Up @@ -203,7 +203,7 @@ The fragment min max sum null count is a [generic tile](./generic_tile.md) with

### Tile Processed Conditions

The processed conditions is a [generic tile](./generic_tile.md) and is the list of delete/update conditions that have already been applied for this fragment and don't need to be applied again, in no particular order, with the following internal format:
The processed conditions is a [generic tile](./generic_tile.md) and is the list of delete/update conditions that have already been applied for this fragment and don't need to be applied again, sorted by filename, with the following internal format:

| **Field** | **Type** | **Description** |
| :--- | :--- | :--- |
Expand Down
8 changes: 4 additions & 4 deletions tiledb/sm/array_schema/array_schema.cc
Expand Up @@ -260,8 +260,8 @@ uint64_t ArraySchema::cell_size(const std::string& name) const {
return constants::timestamp_size;
}

if (name == constants::delete_condition_marker_hash) {
return sizeof(size_t);
if (name == constants::delete_condition_index) {
return sizeof(uint64_t);
}

// Attribute
Expand Down Expand Up @@ -626,8 +626,8 @@ Datatype ArraySchema::type(const std::string& name) const {
return constants::timestamp_type;
}

if (name == constants::delete_condition_marker_hash) {
return constants::delete_condition_marker_hash_type;
if (name == constants::delete_condition_index) {
return constants::delete_condition_index_type;
}

// Attribute
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/array_schema/array_schema.h
Expand Up @@ -152,7 +152,7 @@ class ArraySchema {
static inline bool is_special_attribute(const std::string& name) {
return name == constants::coords || name == constants::timestamps ||
name == constants::delete_timestamps ||
name == constants::delete_condition_marker_hash;
name == constants::delete_condition_index;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Expand Up @@ -569,7 +569,7 @@ Status FragmentConsolidator::create_buffers(
}

// Adding buffers for delete meta, one for timestamp and one for condition
// marker hash.
// index.
if (config_.with_delete_meta_) {
buffer_num += 2;
}
Expand Down Expand Up @@ -852,7 +852,7 @@ Status FragmentConsolidator::set_query_buffers(
&(*buffer_sizes)[bid]));
++bid;
RETURN_NOT_OK(query->set_data_buffer(
constants::delete_condition_marker_hash,
constants::delete_condition_index,
(void*)&(*buffers)[bid][0],
&(*buffer_sizes)[bid]));
++bid;
Expand Down
26 changes: 20 additions & 6 deletions tiledb/sm/fragment/fragment_metadata.cc
Expand Up @@ -1342,8 +1342,8 @@ tuple<Status, optional<std::string>> FragmentMetadata::encode_name(
return {Status::Ok(), "dt"};
}

if (name == constants::delete_condition_marker_hash) {
return {Status::Ok(), "dcmh"};
if (name == constants::delete_condition_index) {
return {Status::Ok(), "dci"};
}

auto err = "Unable to locate dimension/attribute " + name;
Expand Down Expand Up @@ -1922,18 +1922,28 @@ tuple<Status, optional<uint64_t>> FragmentMetadata::get_null_count(

void FragmentMetadata::set_processed_conditions(
std::vector<std::string>& processed_conditions) {
processed_conditions_ = std::unordered_set<std::string>(
processed_conditions_ = processed_conditions;
processed_conditions_set_ = std::unordered_set<std::string>(
processed_conditions.begin(), processed_conditions.end());
}

std::unordered_set<std::string>& FragmentMetadata::get_processed_conditions() {
std::vector<std::string>& FragmentMetadata::get_processed_conditions() {
if (!loaded_metadata_.processed_conditions_) {
throw std::logic_error("Trying to access metadata that's not present");
}

return processed_conditions_;
}

std::unordered_set<std::string>&
FragmentMetadata::get_processed_conditions_set() {
if (!loaded_metadata_.processed_conditions_) {
throw std::logic_error("Trying to access metadata that's not present");
}

return processed_conditions_set_;
}

uint64_t FragmentMetadata::first_timestamp() const {
return timestamp_range_.first;
}
Expand Down Expand Up @@ -3512,6 +3522,7 @@ Status FragmentMetadata::load_processed_conditions(ConstBuffer* buff) {
" num processed conditions failed"));
}

processed_conditions_.reserve(num);
for (uint64_t i = 0; i < num; i++) {
uint64_t size;
st = buff->read(&size, sizeof(uint64_t));
Expand All @@ -3530,9 +3541,12 @@ Status FragmentMetadata::load_processed_conditions(ConstBuffer* buff) {
" processed conditions failed"));
}

processed_conditions_.emplace(condition);
processed_conditions_.emplace_back(condition);
}

processed_conditions_set_ = std::unordered_set<std::string>(
processed_conditions_.begin(), processed_conditions_.end());

return Status::Ok();
}

Expand Down Expand Up @@ -5209,7 +5223,7 @@ void FragmentMetadata::build_idx_map() {
}
if (has_delete_meta_) {
idx_map_[constants::delete_timestamps] = idx++;
idx_map_[constants::delete_condition_marker_hash] = idx++;
idx_map_[constants::delete_condition_index] = idx++;
}
}

Expand Down
19 changes: 17 additions & 2 deletions tiledb/sm/fragment/fragment_metadata.h
Expand Up @@ -728,7 +728,16 @@ class FragmentMetadata {
*
* @return Processed conditions.
*/
std::unordered_set<std::string>& get_processed_conditions();
std::vector<std::string>& get_processed_conditions();

/**
* Retrieves the processed conditions set. The processed conditions is the
* list of delete/update conditions that have already been applied for this
* fragment and don't need to be applied again.
*
* @return Processed conditions set.
*/
std::unordered_set<std::string>& get_processed_conditions_set();

/** Returns the first timestamp of the fragment timestamp range. */
uint64_t first_timestamp() const;
Expand Down Expand Up @@ -1085,7 +1094,13 @@ class FragmentMetadata {
URI array_uri_;

/** Set of already processed delete/update conditions for this fragment. */
std::unordered_set<std::string> processed_conditions_;
std::unordered_set<std::string> processed_conditions_set_;

/**
* Ordered list of already processed delete/update conditions for this
* fragment.
*/
std::vector<std::string> processed_conditions_;

/* ********************************* */
/* PRIVATE METHODS */
Expand Down
9 changes: 4 additions & 5 deletions tiledb/sm/misc/constants.cc
Expand Up @@ -132,18 +132,17 @@ const std::string timestamps = "__timestamps";
/** Special name reserved for the delete timestamp attribute. */
const std::string delete_timestamps = "__delete_timestamps";

/** Special name reserved for the delete condition marker hash attribute. */
const std::string delete_condition_marker_hash =
"__delete_condition_marker_hash";
/** Special name reserved for the delete condition index attribute. */
const std::string delete_condition_index = "__delete_condition_index";

/** The size of a timestamp cell. */
const uint64_t timestamp_size = sizeof(uint64_t);

/** The type of a timestamp cell. */
extern const Datatype timestamp_type = Datatype::UINT64;

/** The type of a delete condition marker hash cell. */
extern const Datatype delete_condition_marker_hash_type = Datatype::UINT64;
/** The type of a delete condition index cell. */
extern const Datatype delete_condition_index_type = Datatype::UINT64;

/** The default compressor for the coordinates. */
Compressor coords_compression = Compressor::ZSTD;
Expand Down
8 changes: 4 additions & 4 deletions tiledb/sm/misc/constants.h
Expand Up @@ -128,17 +128,17 @@ extern const std::string timestamps;
/** Special name reserved for the delete timestamp attribute. */
extern const std::string delete_timestamps;

/** Special name reserved for the delete condition marker hash attribute. */
extern const std::string delete_condition_marker_hash;
/** Special name reserved for the delete condition index attribute. */
extern const std::string delete_condition_index;

/** The size of a timestamp cell. */
extern const uint64_t timestamp_size;

/** The type of a timestamp cell. */
extern const Datatype timestamp_type;

/** The type of a delete condition marker hash cell. */
extern const Datatype delete_condition_marker_hash_type;
/** The type of a delete condition index cell. */
extern const Datatype delete_condition_index_type;

/** The special value for an empty int32. */
extern const int empty_int32;
Expand Down
5 changes: 4 additions & 1 deletion tiledb/sm/query/deletes_and_updates/serialization.cc
Expand Up @@ -144,12 +144,15 @@ tdb_unique_ptr<ASTNode> deserialize_condition_impl(Deserializer& deserializer) {
}

QueryCondition deserialize_condition(
const uint64_t condition_index,
const std::string& condition_marker,
const void* buff,
const storage_size_t size) {
Deserializer deserializer(buff, size);
return QueryCondition(
condition_marker, deserialize_condition_impl(deserializer));
condition_index,
condition_marker,
deserialize_condition_impl(deserializer));
}

} // namespace tiledb::sm::deletes_and_updates::serialization
2 changes: 2 additions & 0 deletions tiledb/sm/query/deletes_and_updates/serialization.h
Expand Up @@ -52,13 +52,15 @@ std::vector<uint8_t> serialize_condition(const QueryCondition& query_condition);
/**
* Deserializes the condition.
*
* @param condition_index Index for this condition.
* @param condition_marker Marker used to know which file the condition came
* from.
* @param buff Pointer to the serialized data.
* @param size Size of the serialized data.
* @return Deserialized query condition.
*/
QueryCondition deserialize_condition(
const uint64_t condition_index,
const std::string& condition_marker,
const void* buff,
const storage_size_t size);
Expand Down
Expand Up @@ -48,7 +48,7 @@ using namespace tiledb::sm::deletes_and_updates::serialization;
void serialize_deserialize_check(QueryCondition& query_condition) {
auto serialized = serialize_condition(query_condition);
auto deserialized =
deserialize_condition("", serialized.data(), serialized.size());
deserialize_condition(0, "", serialized.data(), serialized.size());

CHECK(tiledb::test::ast_equal(query_condition.ast(), deserialized.ast()));
}
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/query/query.cc
Expand Up @@ -1161,7 +1161,7 @@ Status Query::check_buffer_names() {
expected_num += static_cast<decltype(expected_num)>(
buffers_.count(constants::delete_timestamps));
expected_num += static_cast<decltype(expected_num)>(
buffers_.count(constants::delete_condition_marker_hash));
buffers_.count(constants::delete_condition_index));
expected_num += (coord_buffer_is_set_ || coord_data_buffer_is_set_ ||
coord_offsets_buffer_is_set_) ?
array_schema_->dim_num() :
Expand Down
19 changes: 10 additions & 9 deletions tiledb/sm/query/query_condition.cc
Expand Up @@ -59,30 +59,31 @@ QueryCondition::QueryCondition() {

QueryCondition::QueryCondition(const std::string& condition_marker)
: condition_marker_(condition_marker)
, condition_marker_hash_(std::hash<std::string>{}(condition_marker)) {
, condition_index_(0) {
}

QueryCondition::QueryCondition(tdb_unique_ptr<tiledb::sm::ASTNode>&& tree)
: tree_(std::move(tree)) {
}

QueryCondition::QueryCondition(
const uint64_t condition_index,
const std::string& condition_marker,
tdb_unique_ptr<tiledb::sm::ASTNode>&& tree)
: condition_marker_(condition_marker)
, condition_marker_hash_(std::hash<std::string>{}(condition_marker))
, condition_index_(condition_index)
, tree_(std::move(tree)) {
}

QueryCondition::QueryCondition(const QueryCondition& rhs)
: condition_marker_(rhs.condition_marker_)
, condition_marker_hash_(rhs.condition_marker_hash_)
, condition_index_(rhs.condition_index_)
, tree_(rhs.tree_ == nullptr ? nullptr : rhs.tree_->clone()) {
}

QueryCondition::QueryCondition(QueryCondition&& rhs)
: condition_marker_(std::move(rhs.condition_marker_))
, condition_marker_hash_(rhs.condition_marker_hash_)
, condition_index_(rhs.condition_index_)
, tree_(std::move(rhs.tree_)) {
}

Expand All @@ -92,7 +93,7 @@ QueryCondition::~QueryCondition() {
QueryCondition& QueryCondition::operator=(const QueryCondition& rhs) {
if (this != &rhs) {
condition_marker_ = rhs.condition_marker_;
condition_marker_hash_ = rhs.condition_marker_hash_;
condition_index_ = rhs.condition_index_;
tree_ = rhs.tree_ == nullptr ? nullptr : rhs.tree_->clone();
}

Expand All @@ -101,7 +102,7 @@ QueryCondition& QueryCondition::operator=(const QueryCondition& rhs) {

QueryCondition& QueryCondition::operator=(QueryCondition&& rhs) {
condition_marker_ = std::move(rhs.condition_marker_);
condition_marker_hash_ = std::move(rhs.condition_marker_hash_);
condition_index_ = std::move(rhs.condition_index_);
tree_ = std::move(rhs.tree_);
return *this;
}
Expand Down Expand Up @@ -409,7 +410,7 @@ void QueryCondition::apply_ast_node(
// delete condition was already processed, GT condition is always true.
if (field_name == constants::delete_timestamps &&
(!fragment_metadata[f]->has_delete_meta() ||
fragment_metadata[f]->get_processed_conditions().count(
fragment_metadata[f]->get_processed_conditions_set().count(
condition_marker_) != 0)) {
assert(Op == QueryConditionOp::GT);
for (size_t c = starting_index; c < starting_index + length; ++c) {
Expand Down Expand Up @@ -2143,8 +2144,8 @@ const std::string& QueryCondition::condition_marker() const {
return condition_marker_;
}

size_t QueryCondition::condition_marker_hash() const {
return condition_marker_hash_;
uint64_t QueryCondition::condition_index() const {
return condition_index_;
}

void QueryCondition::set_ast(tdb_unique_ptr<ASTNode>&& ast) {
Expand Down
9 changes: 5 additions & 4 deletions tiledb/sm/query/query_condition.h
Expand Up @@ -70,6 +70,7 @@ class QueryCondition {

/** Constructor from a tree and marker. */
QueryCondition(
const uint64_t condition_index,
const std::string& condition_marker,
tdb_unique_ptr<tiledb::sm::ASTNode>&& tree);

Expand Down Expand Up @@ -228,9 +229,9 @@ class QueryCondition {
const std::string& condition_marker() const;

/**
* Returns the condition marker hash.
* Returns the condition index.
*/
size_t condition_marker_hash() const;
uint64_t condition_index() const;

private:
/* ********************************* */
Expand Down Expand Up @@ -262,8 +263,8 @@ class QueryCondition {
/** Marker used to reference which file the condition came from. */
std::string condition_marker_;

/** Hash of `condition_marker_`. */
size_t condition_marker_hash_;
/** Index for the condition. */
size_t condition_index_;

/** AST Tree structure representing the condition. **/
tdb_unique_ptr<tiledb::sm::ASTNode> tree_{};
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/query/readers/reader_base.h
Expand Up @@ -286,7 +286,7 @@ class ReaderBase : public StrategyBase {
inline bool delete_meta_not_present(
const std::string& name, const unsigned f) const {
return (name == constants::delete_timestamps ||
name == constants::delete_condition_marker_hash) &&
name == constants::delete_condition_index) &&
!fragment_metadata_[f]->has_delete_meta();
}

Expand Down

0 comments on commit 894c8f5

Please sign in to comment.