Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement delete_fragments API #3400

Merged
merged 4 commits into from Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 93 additions & 0 deletions test/src/unit-cppapi-array.cc
Expand Up @@ -32,6 +32,7 @@

#include <test/support/tdb_catch.h>
#include "helpers.h"
#include "tiledb/common/stdx_string.h"
#include "tiledb/sm/cpp_api/tiledb"
#include "tiledb/sm/filesystem/uri.h"
#include "tiledb/sm/misc/constants.h"
Expand Down Expand Up @@ -683,6 +684,98 @@ TEST_CASE(
vfs.remove_dir(array_name);
}

TEST_CASE(
"C++ API: Deletion of sequential fragment writes",
"[cppapi][fragments][delete]") {
/* Note: An array must be open, in WRITE_EXCLUSIVE mode to delete_fragments */
Context ctx;
VFS vfs(ctx);
const std::string array_name = "cpp_unit_array";

if (vfs.is_dir(array_name))
vfs.remove_dir(array_name);

Domain domain(ctx);
domain.add_dimension(Dimension::create<int>(ctx, "d", {{0, 11}}, 12));

ArraySchema schema(ctx, TILEDB_DENSE);
schema.set_domain(domain).set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
schema.add_attribute(Attribute::create<int>(ctx, "a"));
std::vector<int> data = {0, 1};
uint64_t timestamp_start = 0;
uint64_t timestamp_end = UINT64_MAX;
tiledb::Array::create(array_name, schema);

SECTION("WRITE") {
auto array = tiledb::Array(ctx, array_name, TILEDB_WRITE);
auto query = tiledb::Query(ctx, array, TILEDB_WRITE);
query.set_data_buffer("a", data).set_subarray({0, 1}).submit();
query.set_data_buffer("a", data).set_subarray({2, 3}).submit();
query.set_data_buffer("a", data).set_subarray({4, 5}).submit();
query.finalize();

// Delete fragments
CHECK(tiledb::test::num_fragments(array_name) == 3);
REQUIRE_THROWS_WITH(
array.delete_fragments(array_name, timestamp_start, timestamp_end),
Catch::Contains("Query type must be WRITE_EXCLUSIVE"));
CHECK(tiledb::test::num_fragments(array_name) == 3);
array.close();
}

SECTION("WRITE_EXCLUSIVE") {
auto array = tiledb::Array(ctx, array_name, TILEDB_WRITE_EXCLUSIVE);
auto query = tiledb::Query(ctx, array, TILEDB_WRITE_EXCLUSIVE);
query.set_data_buffer("a", data).set_subarray({0, 1}).submit();
query.set_data_buffer("a", data).set_subarray({2, 3}).submit();
query.set_data_buffer("a", data).set_subarray({4, 5}).submit();
query.finalize();
CHECK(tiledb::test::num_fragments(array_name) == 3);

SECTION("no consolidation") {
// Delete fragments
array.delete_fragments(array_name, timestamp_start, timestamp_end);
CHECK(tiledb::test::num_fragments(array_name) == 0);
array.close();
}

SECTION("consolidation") {
array.close();

// Consolidate and reopen array
Array::consolidate(ctx, array_name);
CHECK(tiledb::test::num_fragments(array_name) == 4);
array.open(TILEDB_WRITE_EXCLUSIVE);
CHECK(tiledb::test::num_fragments(array_name) == 4);

// Check commits directory after consolidation
int vac_file_count = 0;
std::string commit_dir = tiledb::test::get_commit_dir(array_name);
std::vector<std::string> commits{vfs.ls(commit_dir)};
for (auto commit : commits) {
if (tiledb::sm::utils::parse::ends_with(
commit, tiledb::sm::constants::vacuum_file_suffix))
vac_file_count++;
}
CHECK(commits.size() == 5);
CHECK(vac_file_count == 1);

// Delete fragments
array.delete_fragments(array_name, timestamp_start, timestamp_end);
CHECK(tiledb::test::num_fragments(array_name) == 0);

// Check commits directory after deletion
commits = vfs.ls(commit_dir);
CHECK(commits.size() == 1);
CHECK(!tiledb::sm::utils::parse::ends_with(
commits[0], tiledb::sm::constants::vacuum_file_suffix));
}
}

if (vfs.is_dir(array_name))
vfs.remove_dir(array_name);
}

TEST_CASE("C++ API: Encrypted array", "[cppapi][encryption]") {
const char key[] = "0123456789abcdeF0123456789abcdeF";
tiledb::Config cfg;
Expand Down
67 changes: 49 additions & 18 deletions tiledb/sm/array/array.cc
Expand Up @@ -158,7 +158,7 @@ Status Array::open_without_fragments(
/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
set_array_open();
set_array_open(query_type_);

// Copy the key bytes.
st = encryption_key_->set_key(encryption_type, encryption_key, key_length);
Expand Down Expand Up @@ -224,6 +224,34 @@ Status Array::load_fragments(
return Status::Ok();
}

Status Array::delete_fragments(
const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end) {
// Check that query type is WRITE_EXCLUSIVE
if (query_type_ != QueryType::WRITE_EXCLUSIVE) {
return LOG_STATUS(Status_ArrayError(
"[Array::delete_fragments] Query type must be WRITE_EXCLUSIVE"));
}

// Check that array is open
if (!is_open() && !controller().is_open(uri)) {
return LOG_STATUS(
Status_ArrayError("[Array::delete_fragments] Array is closed"));
}

// Check that array is not in the process of opening or closing
if (is_opening_or_closing_) {
return LOG_STATUS(Status_ArrayError(
"[Array::delete_fragments] "
"May not perform simultaneous open or close operations."));
}

// Vacuum fragments for deletes
RETURN_NOT_OK(storage_manager_->fragments_vacuum(
uri.c_str(), timestamp_start, timestamp_end, true));

return Status::Ok();
}

void Array::set_delete_tiles_location(
const std::vector<ArrayDirectory::DeleteTileLocation>&
delete_tiles_location) {
Expand Down Expand Up @@ -253,7 +281,7 @@ Status Array::open(
uint32_t key_length) {
Status st;
// Checks
if (is_open_) {
if (is_open()) {
return LOG_STATUS(
Status_ArrayError("Cannot open array; Array already open"));
}
Expand All @@ -263,15 +291,12 @@ Status Array::open(
non_empty_domain_computed_ = false;
timestamp_start_ = timestamp_start;
timestamp_end_opened_at_ = timestamp_end;

/* Note: query_type_ MUST be set before calling set_array_open()
because it will be examined by the ConsistencyController. */
query_type_ = query_type;

/* Note: the open status MUST be exception safe. If anything interrupts the
* opening process, it will throw and the array will be set as closed. */
try {
set_array_open();
set_array_open(query_type);

// Get encryption key from config
std::string encryption_key_from_cfg;
Expand Down Expand Up @@ -323,7 +348,9 @@ Status Array::open(
if (query_type == QueryType::READ) {
timestamp_end_opened_at_ = utils::time::timestamp_now_ms();
} else if (
query_type == QueryType::WRITE || query_type == QueryType::DELETE) {
query_type == QueryType::WRITE ||
query_type == QueryType::WRITE_EXCLUSIVE ||
query_type == QueryType::DELETE) {
timestamp_end_opened_at_ = 0;
} else {
throw Status_ArrayError("Cannot open array; Unsupported query type.");
Expand Down Expand Up @@ -427,7 +454,9 @@ Status Array::close() {
if (remote_) {
// Update array metadata for write queries if metadata was written by the
// user
if (query_type_ == QueryType::WRITE && metadata_.num() > 0) {
if ((query_type_ == QueryType::WRITE ||
query_type_ == QueryType::WRITE_EXCLUSIVE) &&
metadata_.num() > 0) {
// Set metadata loaded to be true so when serialization fetchs the
// metadata it won't trigger a deadlock
metadata_loaded_ = true;
Expand All @@ -449,7 +478,9 @@ Status Array::close() {
st = storage_manager_->array_close_for_reads(this);
if (!st.ok())
throw StatusException(st);
} else if (query_type_ == QueryType::WRITE) {
} else if (
query_type_ == QueryType::WRITE ||
query_type_ == QueryType::WRITE_EXCLUSIVE) {
st = storage_manager_->array_close_for_writes(this);
if (!st.ok())
throw StatusException(st);
Expand Down Expand Up @@ -503,16 +534,14 @@ tuple<Status, optional<shared_ptr<ArraySchema>>> Array::get_array_schema()
return {Status::Ok(), array_schema_latest_};
}

Status Array::get_query_type(QueryType* query_type) const {
QueryType Array::get_query_type() const {
// Error if the array is not open
if (!is_open_) {
return LOG_STATUS(
throw StatusException(
Status_ArrayError("Cannot get query_type; Array is not open"));
}

*query_type = query_type_;

return Status::Ok();
return query_type_;
}

Status Array::get_max_buffer_size(
Expand Down Expand Up @@ -757,7 +786,8 @@ Status Array::delete_metadata(const char* key) {
}

// Check mode
if (query_type_ != QueryType::WRITE) {
if (query_type_ != QueryType::WRITE &&
query_type_ != QueryType::WRITE_EXCLUSIVE) {
return LOG_STATUS(
Status_ArrayError("Cannot delete metadata. Array was "
"not opened in write mode"));
Expand Down Expand Up @@ -786,7 +816,8 @@ Status Array::put_metadata(
}

// Check mode
if (query_type_ != QueryType::WRITE) {
if (query_type_ != QueryType::WRITE &&
query_type_ != QueryType::WRITE_EXCLUSIVE) {
return LOG_STATUS(
Status_ArrayError("Cannot put metadata; Array was "
"not opened in write mode"));
Expand Down Expand Up @@ -1163,7 +1194,7 @@ Status Array::compute_non_empty_domain() {
return Status::Ok();
}

void Array::set_array_open() {
void Array::set_array_open(const QueryType& query_type) {
std::lock_guard<std::mutex> lock(mtx_);
if (is_opening_or_closing_) {
is_opening_or_closing_ = false;
Expand All @@ -1177,7 +1208,7 @@ void Array::set_array_open() {
* only the pointer value is used and nothing is called on the Array objects.
*/
consistency_sentry_.emplace(
consistency_controller_.make_sentry(array_uri_, *this));
consistency_controller_.make_sentry(array_uri_, *this, query_type));
is_open_ = true;
}

Expand Down
25 changes: 21 additions & 4 deletions tiledb/sm/array/array.h
Expand Up @@ -178,6 +178,19 @@ class Array {
*/
Status load_fragments(const std::vector<TimestampedURI>& fragments_to_load);

/**
* Deletes the fragments from the Array with given URI.
*
* @param uri The uri of the Array whose fragments are to be deleted.
* @param timestamp_start The start timestamp at which to delete fragments.
* @param timestamp_end The end timestamp at which to delete fragments.
* @return Status
*
* @pre The Array must be open for exclusive writes
*/
Status delete_fragments(
const URI& uri, uint64_t timestamp_start, uint64_t timstamp_end);

/**
* Sets the delete tiles location.
*
Expand Down Expand Up @@ -237,8 +250,8 @@ class Array {
/** Retrieves the array schema. Errors if the array is not open. */
tuple<Status, optional<shared_ptr<ArraySchema>>> get_array_schema() const;

/** Retrieves the query type. Errors if the array is not open. */
Status get_query_type(QueryType* qyery_type) const;
/** Retrieves the query type. Throws if the array is not open. */
QueryType get_query_type() const;

/**
* Returns the max buffer size given a fixed-sized attribute/dimension and
Expand Down Expand Up @@ -587,8 +600,12 @@ class Array {
/** Computes the non-empty domain of the array. */
Status compute_non_empty_domain();

/** Sets the array state as open. */
void set_array_open();
/**
* Sets the array state as open.
*
* @param query_type The QueryType of the Array.
*/
void set_array_open(const QueryType& query_type);

/** Sets the array state as closed.
*
Expand Down
23 changes: 20 additions & 3 deletions tiledb/sm/array/consistency.cc
Expand Up @@ -32,6 +32,7 @@

#include <iostream>

#include "tiledb/sm/array/array.h"
#include "tiledb/sm/array/consistency.h"

using namespace tiledb::common;
Expand All @@ -57,17 +58,33 @@ ConsistencySentry::~ConsistencySentry() {
}

ConsistencySentry ConsistencyController::make_sentry(
const URI uri, Array& array) {
return ConsistencySentry{*this, register_array(uri, array)};
const URI uri, Array& array, const QueryType& query_type) {
return ConsistencySentry{*this, register_array(uri, array, query_type)};
}

ConsistencyController::entry_type ConsistencyController::register_array(
const URI uri, Array& array) {
const URI uri, Array& array, const QueryType& query_type) {
if (uri.empty()) {
throw std::runtime_error(
"[ConsistencyController::register_array] URI cannot be empty.");
}

std::lock_guard<std::mutex> lock(mtx_);
if (this->is_open(uri)) {
if (query_type == QueryType::WRITE_EXCLUSIVE) {
throw std::runtime_error(
"[ConsistencyController::register_array] Array already open; must "
"close array before opening for exclusive write.");
} else {
auto iter = array_registry_.find(uri);
if (iter->second.get_query_type() == QueryType::WRITE_EXCLUSIVE) {
throw std::runtime_error(
"[ConsistencyController::register_array] Must close array opened "
"for exclusive write before opening an array at the same address.");
}
}
}

return array_registry_.insert({uri, array});
}

Expand Down
7 changes: 5 additions & 2 deletions tiledb/sm/array/consistency.h
Expand Up @@ -38,6 +38,7 @@

#include "tiledb/common/common.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/enums/query_type.h"
#include "tiledb/sm/filesystem/uri.h"

using namespace tiledb::common;
Expand Down Expand Up @@ -99,7 +100,8 @@ class ConsistencyController {
*
* @return Sentry object whose lifespan is the same as the registration.
*/
ConsistencySentry make_sentry(const URI uri, Array& array);
ConsistencySentry make_sentry(
const URI uri, Array& array, const QueryType& query_type);

/** Returns true if the array is open, i.e. registered in the multimap. */
bool is_open(const URI uri);
Expand All @@ -113,7 +115,8 @@ class ConsistencyController {
*
* @pre the given URI is the root directory of the Array and is not empty.
*/
entry_type register_array(const URI uri, Array& array);
entry_type register_array(
const URI uri, Array& array, const QueryType& query_type);

/**
* Wrapper around a multimap deregistration operation.
Expand Down