Skip to content

Commit

Permalink
Implement delete_fragments API (#3400)
Browse files Browse the repository at this point in the history
Implement delete_fragments API.

This API uses the existing FragmentConsolidator::vacuum to:

- vacuum fragments between specified timestamps
- delete the vacuumed fragments
- Note that this API can only be invoked on an array that is open with the new QueryType::MODIFY_EXCLUSIVE.

[sc-15852]

---
TYPE: FEATURE
DESC: Implement delete_fragments API
  • Loading branch information
bekadavis9 committed Aug 22, 2022
1 parent dae5d97 commit 7a8e2b1
Show file tree
Hide file tree
Showing 24 changed files with 582 additions and 155 deletions.
93 changes: 93 additions & 0 deletions test/src/unit-cppapi-array.cc
Expand Up @@ -32,6 +32,7 @@

#include <test/support/tdb_catch.h>
#include "helpers.h"
#include "tiledb/common/stdx_string.h"
#include "tiledb/sm/cpp_api/tiledb"
#include "tiledb/sm/filesystem/uri.h"
#include "tiledb/sm/misc/constants.h"
Expand Down Expand Up @@ -683,6 +684,98 @@ TEST_CASE(
vfs.remove_dir(array_name);
}

TEST_CASE(
"C++ API: Deletion of sequential fragment writes",
"[cppapi][fragments][delete]") {
/* Note: An array must be open in MODIFY_EXCLUSIVE mode to delete_fragments */
Context ctx;
VFS vfs(ctx);
const std::string array_name = "cpp_unit_array";

if (vfs.is_dir(array_name))
vfs.remove_dir(array_name);

Domain domain(ctx);
domain.add_dimension(Dimension::create<int>(ctx, "d", {{0, 11}}, 12));

ArraySchema schema(ctx, TILEDB_DENSE);
schema.set_domain(domain).set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
schema.add_attribute(Attribute::create<int>(ctx, "a"));
std::vector<int> data = {0, 1};
uint64_t timestamp_start = 0;
uint64_t timestamp_end = UINT64_MAX;
tiledb::Array::create(array_name, schema);

SECTION("WRITE") {
auto array = tiledb::Array(ctx, array_name, TILEDB_WRITE);
auto query = tiledb::Query(ctx, array, TILEDB_WRITE);
query.set_data_buffer("a", data).set_subarray({0, 1}).submit();
query.set_data_buffer("a", data).set_subarray({2, 3}).submit();
query.set_data_buffer("a", data).set_subarray({4, 5}).submit();
query.finalize();

// Delete fragments
CHECK(tiledb::test::num_fragments(array_name) == 3);
REQUIRE_THROWS_WITH(
array.delete_fragments(array_name, timestamp_start, timestamp_end),
Catch::Contains("Query type must be MODIFY_EXCLUSIVE"));
CHECK(tiledb::test::num_fragments(array_name) == 3);
array.close();
}

SECTION("MODIFY_EXCLUSIVE") {
auto array = tiledb::Array(ctx, array_name, TILEDB_MODIFY_EXCLUSIVE);
auto query = tiledb::Query(ctx, array, TILEDB_MODIFY_EXCLUSIVE);
query.set_data_buffer("a", data).set_subarray({0, 1}).submit();
query.set_data_buffer("a", data).set_subarray({2, 3}).submit();
query.set_data_buffer("a", data).set_subarray({4, 5}).submit();
query.finalize();
CHECK(tiledb::test::num_fragments(array_name) == 3);

SECTION("no consolidation") {
// Delete fragments
array.delete_fragments(array_name, timestamp_start, timestamp_end);
CHECK(tiledb::test::num_fragments(array_name) == 0);
array.close();
}

SECTION("consolidation") {
array.close();

// Consolidate and reopen array
Array::consolidate(ctx, array_name);
CHECK(tiledb::test::num_fragments(array_name) == 4);
array.open(TILEDB_MODIFY_EXCLUSIVE);
CHECK(tiledb::test::num_fragments(array_name) == 4);

// Check commits directory after consolidation
int vac_file_count = 0;
std::string commit_dir = tiledb::test::get_commit_dir(array_name);
std::vector<std::string> commits{vfs.ls(commit_dir)};
for (auto commit : commits) {
if (tiledb::sm::utils::parse::ends_with(
commit, tiledb::sm::constants::vacuum_file_suffix))
vac_file_count++;
}
CHECK(commits.size() == 5);
CHECK(vac_file_count == 1);

// Delete fragments
array.delete_fragments(array_name, timestamp_start, timestamp_end);
CHECK(tiledb::test::num_fragments(array_name) == 0);

// Check commits directory after deletion
commits = vfs.ls(commit_dir);
CHECK(commits.size() == 1);
CHECK(!tiledb::sm::utils::parse::ends_with(
commits[0], tiledb::sm::constants::vacuum_file_suffix));
}
}

if (vfs.is_dir(array_name))
vfs.remove_dir(array_name);
}

TEST_CASE("C++ API: Encrypted array", "[cppapi][encryption]") {
const char key[] = "0123456789abcdeF0123456789abcdeF";
tiledb::Config cfg;
Expand Down
71 changes: 51 additions & 20 deletions tiledb/sm/array/array.cc
Expand Up @@ -159,7 +159,7 @@ Status Array::open_without_fragments(
/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
set_array_open();
set_array_open(query_type_);

// Copy the key bytes.
st = encryption_key_->set_key(encryption_type, encryption_key, key_length);
Expand Down Expand Up @@ -225,6 +225,34 @@ Status Array::load_fragments(
return Status::Ok();
}

Status Array::delete_fragments(
const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end) {
// Check that query type is MODIFY_EXCLUSIVE
if (query_type_ != QueryType::MODIFY_EXCLUSIVE) {
return LOG_STATUS(Status_ArrayError(
"[Array::delete_fragments] Query type must be MODIFY_EXCLUSIVE"));
}

// Check that array is open
if (!is_open() && !controller().is_open(uri)) {
return LOG_STATUS(
Status_ArrayError("[Array::delete_fragments] Array is closed"));
}

// Check that array is not in the process of opening or closing
if (is_opening_or_closing_) {
return LOG_STATUS(Status_ArrayError(
"[Array::delete_fragments] "
"May not perform simultaneous open or close operations."));
}

// Vacuum fragments for deletes
RETURN_NOT_OK(storage_manager_->fragments_vacuum(
uri.c_str(), timestamp_start, timestamp_end, true));

return Status::Ok();
}

void Array::set_delete_tiles_location(
const std::vector<ArrayDirectory::DeleteTileLocation>&
delete_tiles_location) {
Expand Down Expand Up @@ -254,7 +282,7 @@ Status Array::open(
uint32_t key_length) {
Status st;
// Checks
if (is_open_) {
if (is_open()) {
return LOG_STATUS(
Status_ArrayError("Cannot open array; Array already open"));
}
Expand All @@ -264,15 +292,12 @@ Status Array::open(
non_empty_domain_computed_ = false;
timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;

/* Note: query_type_ MUST be set before calling set_array_open()
because it will be examined by the ConsistencyController. */
query_type_ = query_type;

/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
set_array_open();
set_array_open(query_type);

// Get encryption key from config
std::string encryption_key_from_cfg;
Expand Down Expand Up @@ -324,7 +349,9 @@ Status Array::open(
if (query_type == QueryType::READ) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
} else if (
query_type == QueryType::WRITE || query_type == QueryType::DELETE) {
query_type == QueryType::WRITE ||
query_type == QueryType::MODIFY_EXCLUSIVE ||
query_type == QueryType::DELETE) {
timestamp_end_opened_at_ = 0;
} else {
throw Status_ArrayError("Cannot open array; Unsupported query type.");
Expand Down Expand Up @@ -428,7 +455,9 @@ Status Array::close() {
if (remote_) {
// Update array metadata for write queries if metadata was written by the
// user
if (query_type_ == QueryType::WRITE && metadata_.num() > 0) {
if ((query_type_ == QueryType::WRITE ||
query_type_ == QueryType::MODIFY_EXCLUSIVE) &&
metadata_.num() > 0) {
// Set metadata loaded to be true so when serialization fetchs the
// metadata it won't trigger a deadlock
metadata_loaded_ = true;
Expand All @@ -450,7 +479,9 @@ Status Array::close() {
st = storage_manager_->array_close_for_reads(this);
if (!st.ok())
throw StatusException(st);
} else if (query_type_ == QueryType::WRITE) {
} else if (
query_type_ == QueryType::WRITE ||
query_type_ == QueryType::MODIFY_EXCLUSIVE) {
st = storage_manager_->array_close_for_writes(this);
if (!st.ok())
throw StatusException(st);
Expand Down Expand Up @@ -504,16 +535,14 @@ tuple<Status, optional<shared_ptr<ArraySchema>>> Array::get_array_schema()
return {Status::Ok(), array_schema_latest_};
}

Status Array::get_query_type(QueryType* query_type) const {
QueryType Array::get_query_type() const {
// Error if the array is not open
if (!is_open_) {
return LOG_STATUS(
throw StatusException(
Status_ArrayError("Cannot get query_type; Array is not open"));
}

*query_type = query_type_;

return Status::Ok();
return query_type_;
}

Status Array::get_max_buffer_size(
Expand Down Expand Up @@ -758,10 +787,11 @@ Status Array::delete_metadata(const char* key) {
}

// Check mode
if (query_type_ != QueryType::WRITE) {
if (query_type_ != QueryType::WRITE &&
query_type_ != QueryType::MODIFY_EXCLUSIVE) {
return LOG_STATUS(
Status_ArrayError("Cannot delete metadata. Array was "
"not opened in write mode"));
"not opened in write or modify_exclusive mode"));
}

// Check if key is null
Expand All @@ -787,10 +817,11 @@ Status Array::put_metadata(
}

// Check mode
if (query_type_ != QueryType::WRITE) {
if (query_type_ != QueryType::WRITE &&
query_type_ != QueryType::MODIFY_EXCLUSIVE) {
return LOG_STATUS(
Status_ArrayError("Cannot put metadata; Array was "
"not opened in write mode"));
"not opened in write or modify_exclusive mode"));
}

// Check if key is null
Expand Down Expand Up @@ -1164,7 +1195,7 @@ Status Array::compute_non_empty_domain() {
return Status::Ok();
}

void Array::set_array_open() {
void Array::set_array_open(const QueryType& query_type) {
std::lock_guard<std::mutex> lock(mtx_);
if (is_opening_or_closing_) {
is_opening_or_closing_ = false;
Expand All @@ -1178,7 +1209,7 @@ void Array::set_array_open() {
* only the pointer value is used and nothing is called on the Array objects.
*/
consistency_sentry_.emplace(
consistency_controller_.make_sentry(array_uri_, *this));
consistency_controller_.make_sentry(array_uri_, *this, query_type));
is_open_ = true;
}

Expand Down
25 changes: 21 additions & 4 deletions tiledb/sm/array/array.h
Expand Up @@ -178,6 +178,19 @@ class Array {
*/
Status load_fragments(const std::vector<TimestampedURI>& fragments_to_load);

/**
* Deletes the fragments from the Array with given URI.
*
* @param uri The uri of the Array whose fragments are to be deleted.
* @param timestamp_start The start timestamp at which to delete fragments.
* @param timestamp_end The end timestamp at which to delete fragments.
* @return Status
*
* @pre The Array must be open for exclusive writes
*/
Status delete_fragments(
const URI& uri, uint64_t timestamp_start, uint64_t timstamp_end);

/**
* Sets the delete tiles location.
*
Expand Down Expand Up @@ -237,8 +250,8 @@ class Array {
/** Retrieves the array schema. Errors if the array is not open. */
tuple<Status, optional<shared_ptr<ArraySchema>>> get_array_schema() const;

/** Retrieves the query type. Errors if the array is not open. */
Status get_query_type(QueryType* qyery_type) const;
/** Retrieves the query type. Throws if the array is not open. */
QueryType get_query_type() const;

/**
* Returns the max buffer size given a fixed-sized attribute/dimension and
Expand Down Expand Up @@ -587,8 +600,12 @@ class Array {
/** Computes the non-empty domain of the array. */
Status compute_non_empty_domain();

/** Sets the array state as open. */
void set_array_open();
/**
* Sets the array state as open.
*
* @param query_type The QueryType of the Array.
*/
void set_array_open(const QueryType& query_type);

/** Sets the array state as closed.
*
Expand Down
24 changes: 21 additions & 3 deletions tiledb/sm/array/consistency.cc
Expand Up @@ -32,6 +32,7 @@

#include <iostream>

#include "tiledb/sm/array/array.h"
#include "tiledb/sm/array/consistency.h"

using namespace tiledb::common;
Expand All @@ -57,17 +58,34 @@ ConsistencySentry::~ConsistencySentry() {
}

ConsistencySentry ConsistencyController::make_sentry(
const URI uri, Array& array) {
return ConsistencySentry{*this, register_array(uri, array)};
const URI uri, Array& array, const QueryType& query_type) {
return ConsistencySentry{*this, register_array(uri, array, query_type)};
}

ConsistencyController::entry_type ConsistencyController::register_array(
const URI uri, Array& array) {
const URI uri, Array& array, const QueryType& query_type) {
if (uri.empty()) {
throw std::runtime_error(
"[ConsistencyController::register_array] URI cannot be empty.");
}

std::lock_guard<std::mutex> lock(mtx_);
if (this->is_open(uri)) {
if (query_type == QueryType::MODIFY_EXCLUSIVE) {
throw std::runtime_error(
"[ConsistencyController::register_array] Array already open; must "
"close array before opening for exclusive modification.");
} else {
auto iter = array_registry_.find(uri);
if (iter->second.get_query_type() == QueryType::MODIFY_EXCLUSIVE) {
throw std::runtime_error(
"[ConsistencyController::register_array] Must close array opened "
"for exclusive modification before opening an array at the same "
"address.");
}
}
}

return array_registry_.insert({uri, array});
}

Expand Down
7 changes: 5 additions & 2 deletions tiledb/sm/array/consistency.h
Expand Up @@ -38,6 +38,7 @@

#include "tiledb/common/common.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/enums/query_type.h"
#include "tiledb/sm/filesystem/uri.h"

using namespace tiledb::common;
Expand Down Expand Up @@ -99,7 +100,8 @@ class ConsistencyController {
*
* @return Sentry object whose lifespan is the same as the registration.
*/
ConsistencySentry make_sentry(const URI uri, Array& array);
ConsistencySentry make_sentry(
const URI uri, Array& array, const QueryType& query_type);

/** Returns true if the array is open, i.e. registered in the multimap. */
bool is_open(const URI uri);
Expand All @@ -113,7 +115,8 @@ class ConsistencyController {
*
* @pre the given URI is the root directory of the Array and is not empty.
*/
entry_type register_array(const URI uri, Array& array);
entry_type register_array(
const URI uri, Array& array, const QueryType& query_type);

/**
* Wrapper around a multimap deregistration operation.
Expand Down

0 comments on commit 7a8e2b1

Please sign in to comment.