Skip to content

Commit

Permalink
Correctly handle exceptions in commits (because of e.g. constraint fa…
Browse files Browse the repository at this point in the history
…ilures or IO exceptions), when an exception occurs rollback the partially completed commit and rollback instead
  • Loading branch information
Mytherin committed Nov 25, 2019
1 parent 2309ad7 commit 52b9aeb
Show file tree
Hide file tree
Showing 22 changed files with 236 additions and 132 deletions.
6 changes: 6 additions & 0 deletions src/common/exception.cpp
Expand Up @@ -84,6 +84,8 @@ string Exception::ExceptionTypeToString(ExceptionType type) {
return "IO";
case ExceptionType::INTERRUPT:
return "INTERRUPT";
case ExceptionType::FATAL:
return "FATAL";
default:
return "Unknown";
}
Expand Down Expand Up @@ -172,3 +174,7 @@ SequenceException::SequenceException(string msg, ...) : Exception(ExceptionType:

InterruptException::InterruptException() : Exception(ExceptionType::INTERRUPT, "Interrupted!") {
}

FatalException::FatalException(string msg, ...) : Exception(ExceptionType::FATAL, msg) {
FORMAT_CONSTRUCTOR(msg);
}
2 changes: 1 addition & 1 deletion src/common/file_system.cpp
Expand Up @@ -259,7 +259,7 @@ string FileSystem::PathSeparator() {
void FileSystem::FileSync(FileHandle &handle) {
int fd = ((UnixFileHandle &)handle).fd;
if (fsync(fd) != 0) {
throw IOException("FATAL ERROR: fsync failed!");
throw FatalException("fsync failed!");
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/include/duckdb/common/exception.hpp
Expand Up @@ -54,7 +54,8 @@ enum class ExceptionType {
OPTIMIZER = 26, // optimizer related
NULL_POINTER = 27, // nullptr exception
IO = 28, // IO exception
INTERRUPT = 29 // interrupt
INTERRUPT = 29, // interrupt
FATAL = 30 // Fatal exception: fatal exceptions are non-recoverable, and render the entire DB in an unusable state
};

class Exception : public std::exception {
Expand Down Expand Up @@ -167,4 +168,9 @@ class InterruptException : public Exception {
InterruptException();
};

class FatalException : public Exception {
public:
FatalException(string msg, ...);
};

} // namespace duckdb
3 changes: 2 additions & 1 deletion src/include/duckdb/storage/column_data.hpp
Expand Up @@ -47,6 +47,8 @@ class ColumnData {
void InitializeAppend(ColumnAppendState &state);
//! Append a vector of type [type] to the end of the column
void Append(ColumnAppendState &state, Vector &vector);
//! Revert a set of appends to the ColumnData
void RevertAppend(row_t start_row);

//! Update the specified row identifiers
void Update(Transaction &transaction, Vector &updates, row_t *ids);
Expand All @@ -55,7 +57,6 @@ class ColumnData {
void Fetch(ColumnScanState &state, row_t row_id, Vector &result);
//! Fetch a specific row id and append it to the vector
void FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result);

private:
//! Append a transient segment
void AppendTransientSegment(index_t start_row);
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/storage/data_table.hpp
Expand Up @@ -88,6 +88,8 @@ class DataTable {
void InitializeAppend(TableAppendState &state);
//! Append a chunk to the table using the AppendState obtained from BeginAppend
void Append(Transaction &transaction, transaction_t commit_id, DataChunk &chunk, TableAppendState &state);
//! Revert a set of appends made by the given AppendState, used to revert appends in the event of an error during commit (e.g. because of an I/O exception)
void RevertAppend(TableAppendState &state);

//! Append a chunk with the row ids [row_start, ..., row_start + chunk.size()] to all indexes of the table, returns
//! whether or not the append succeeded
Expand Down
3 changes: 3 additions & 0 deletions src/include/duckdb/storage/table/segment_tree.hpp
Expand Up @@ -41,6 +41,9 @@ class SegmentTree {
SegmentBase *GetSegment(index_t row_number);
//! Append a column segment to the tree
void AppendSegment(unique_ptr<SegmentBase> segment);

//! Get the segment index of the column segment for the given row (does not lock the segment tree!)
index_t GetSegmentIndex(index_t row_number);
};

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/duckdb/storage/table/version_manager.hpp
Expand Up @@ -49,6 +49,8 @@ class VersionManager {
void Delete(Transaction &transaction, Vector &row_ids);
//! Append a set of rows to the version manager, setting their inserted id to the given commit_id
void Append(Transaction &transaction, row_t row_start, index_t count, transaction_t commit_id);
//! Revert a set of appends made to the version manager from the rows [row_start] until [row_end]
void RevertAppend(row_t row_start, row_t row_end);

private:
ChunkInsertInfo *GetInsertInfo(index_t chunk_idx);
Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/transaction/commit_state.hpp
Expand Up @@ -35,6 +35,7 @@ class CommitState {

public:
template <bool HAS_LOG> void CommitEntry(UndoFlags type, data_ptr_t data);
void RevertCommit(UndoFlags type, data_ptr_t data);

private:
void SwitchTable(DataTable *table, UndoFlags new_op);
Expand Down
13 changes: 7 additions & 6 deletions src/include/duckdb/transaction/local_storage.hpp
Expand Up @@ -28,8 +28,6 @@ class LocalTableStorage {
vector<unique_ptr<Index>> indexes;
//! The set of deleted entries
unordered_map<index_t, unique_ptr<bool[]>> deleted_entries;
//! The append struct, used during the final append of the LocalTableStorage to the base DataTable
unique_ptr<TableAppendState> state;
//! The max row
row_t max_row;

Expand All @@ -41,6 +39,10 @@ class LocalTableStorage {

//! The LocalStorage class holds appends that have not been committed yet
class LocalStorage {
public:
struct CommitState {
unordered_map<DataTable *, unique_ptr<TableAppendState>> append_states;
};
public:
//! Initialize a scan of the local storage
void InitializeScan(DataTable *table, LocalScanState &state);
Expand All @@ -54,11 +56,10 @@ class LocalStorage {
//! Update a set of rows in the local storage
void Update(DataTable *table, Vector &row_identifiers, vector<column_t> &column_ids, DataChunk &data);

//! Check whether or not committing the local storage is possible, throws an exception if it is not possible
void CheckCommit();

//! Commits the local storage, writing it to the WAL and completing the commit
void Commit(Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) noexcept;
void Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, transaction_t commit_id);
//! Revert the commit made so far by the LocalStorage
void RevertCommit(LocalStorage::CommitState &commit_state);

bool ChangesMade() noexcept {
return table_storage.size() > 0;
Expand Down
6 changes: 2 additions & 4 deletions src/include/duckdb/transaction/transaction.hpp
Expand Up @@ -61,10 +61,8 @@ class Transaction {
//! Push a query into the undo buffer
void PushQuery(string query);

//! Checks whether or not the transaction can be successfully committed,
void CheckCommit();
//! Commit the current transaction with the given commit identifier
void Commit(WriteAheadLog *log, transaction_t commit_id) noexcept;
//! Commit the current transaction with the given commit identifier. Returns true if the transaction commit was successful, or false if it was aborted.
bool Commit(WriteAheadLog *log, transaction_t commit_id) noexcept;
//! Rollback
void Rollback() noexcept {
undo_buffer.Rollback();
Expand Down
4 changes: 2 additions & 2 deletions src/include/duckdb/transaction/transaction_manager.hpp
Expand Up @@ -38,7 +38,7 @@ class TransactionManager {
//! Start a new transaction
Transaction *StartTransaction();
//! Commit the given transaction
void CommitTransaction(Transaction *transaction);
bool CommitTransaction(Transaction *transaction);
//! Rollback the given transaction
void RollbackTransaction(Transaction *transaction);
//! Add the catalog set
Expand All @@ -50,7 +50,7 @@ class TransactionManager {

private:
//! Remove the given transaction from the list of active transactions
void RemoveTransaction(Transaction *transaction);
void RemoveTransaction(Transaction *transaction) noexcept;

//! The current query number
std::atomic<transaction_t> current_query_number;
Expand Down
13 changes: 11 additions & 2 deletions src/include/duckdb/transaction/undo_buffer.hpp
Expand Up @@ -35,6 +35,12 @@ struct UndoChunk {
//! that might be required in the future (because of rollbacks or previous
//! transactions accessing them)
class UndoBuffer {
public:
struct IteratorState {
UndoChunk *current;
data_ptr_t start;
data_ptr_t end;
};
public:
UndoBuffer();

Expand All @@ -47,7 +53,9 @@ class UndoBuffer {
//! Cleanup the undo buffer
void Cleanup();
//! Commit the changes made in the UndoBuffer: should be called on commit
void Commit(WriteAheadLog *log, transaction_t commit_id) noexcept;
void Commit(UndoBuffer::IteratorState iterator_state, WriteAheadLog *log, transaction_t commit_id);
//! Revert committed changes made in the UndoBuffer up until the currently committed state
void RevertCommit(UndoBuffer::IteratorState iterator_state, transaction_t transaction_id);
//! Rollback the changes made in this UndoBuffer: should be called on
//! rollback
void Rollback() noexcept;
Expand All @@ -57,7 +65,8 @@ class UndoBuffer {
UndoChunk *tail;

private:
template <class T> void IterateEntries(T &&callback);
template <class T> void IterateEntries(UndoBuffer::IteratorState &state, T &&callback);
template <class T> void IterateEntries(UndoBuffer::IteratorState &state, UndoBuffer::IteratorState &end_state, T &&callback);
template <class T> void ReverseIterateEntries(T &&callback);
};

Expand Down
14 changes: 14 additions & 0 deletions src/storage/column_data.cpp
Expand Up @@ -94,6 +94,20 @@ void ColumnData::Append(ColumnAppendState &state, Vector &vector) {
}
}

void ColumnData::RevertAppend(row_t start_row) {
lock_guard<mutex> tree_lock(data.node_lock);
// find the segment index that the current row belongs to
index_t segment_index = data.GetSegmentIndex(start_row);
auto segment = data.nodes[segment_index].node;
// remove any segments AFTER this segment: they should be deleted entirely
if (segment_index < data.nodes.size() - 1) {
data.nodes.erase(data.nodes.begin() + segment_index);
}
segment->next = nullptr;
// for the current segment, set the count to the reverted count
segment->count = start_row - segment->start;
}

void ColumnData::Update(Transaction &transaction, Vector &updates, row_t *ids) {
// first find the segment that the update belongs to
index_t first_id = ids[updates.sel_vector ? updates.sel_vector[0] : 0];
Expand Down
15 changes: 15 additions & 0 deletions src/storage/data_table.cpp
Expand Up @@ -350,6 +350,21 @@ void DataTable::Append(Transaction &transaction, transaction_t commit_id, DataCh
state.current_row += chunk.size();
}

void DataTable::RevertAppend(TableAppendState &state) {
if (state.row_start == state.current_row) {
// nothing to revert!
return;
}
// revert changes in the base columns
for (index_t i = 0; i < types.size(); i++) {
columns[i].RevertAppend(state.row_start);
}
// adjust the cardinality
cardinality -= state.current_row - state.row_start;
// revert changes in the transient manager
transient_manager.RevertAppend(state.row_start, state.current_row);
}

//===--------------------------------------------------------------------===//
// Indexes
//===--------------------------------------------------------------------===//
Expand Down
90 changes: 25 additions & 65 deletions src/storage/local_storage.cpp
Expand Up @@ -37,7 +37,6 @@ void LocalTableStorage::Clear() {
collection.chunks.clear();
indexes.clear();
deleted_entries.clear();
state = nullptr;
}

void LocalStorage::InitializeScan(DataTable *table, LocalScanState &state) {
Expand Down Expand Up @@ -256,89 +255,50 @@ template <class T> bool LocalStorage::ScanTableStorage(DataTable *table, LocalTa
}
}

void LocalStorage::CheckCommit() {
// check whether a commit can be made, we do this check by appending to all indices
bool success = true;
for (auto &entry : table_storage) {
auto table = entry.first;
auto storage = entry.second.get();

storage->state = make_unique<TableAppendState>();
table->InitializeAppend(*storage->state);

if (table->indexes.size() == 0) {
continue;
}

row_t current_row = storage->state->row_start;
ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool {
// append this chunk to the indexes of the table
if (!table->AppendToIndexes(*storage->state, chunk, current_row)) {
success = false;
return false;
}
current_row += chunk.size();
return true;
});

storage->max_row = current_row;
if (!success) {
break;
}
}
if (!success) {
// failed to insert in one of the tables: delete already inserted entries
for (auto &entry : table_storage) {
auto table = entry.first;
auto storage = entry.second.get();

if (table->indexes.size() == 0 || storage->max_row == 0) {
continue;
}

row_t current_row = storage->state->row_start;
ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool {
if (current_row >= storage->max_row) {
// done
return false;
}
table->RemoveFromIndexes(chunk, current_row);
current_row += chunk.size();
return true;
});
}
// reset the storage state for each of the entries
for (auto &entry : table_storage) {
auto storage = entry.second.get();
storage->state = nullptr;
}
// throw a constraint violation
throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key");
}
}

void LocalStorage::Commit(Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) noexcept {
void LocalStorage::Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) {
// commit local storage, iterate over all entries in the table storage map
for (auto &entry : table_storage) {
auto table = entry.first;
auto storage = entry.second.get();

// initialize the append state
auto append_state_ptr = make_unique<TableAppendState>();
auto &append_state = *append_state_ptr;
// add it to the set of append states
commit_state.append_states[table] = move(append_state_ptr);
table->InitializeAppend(append_state);

if (log && !table->IsTemporary()) {
log->WriteSetTable(table->schema, table->table);
}

// scan all chunks in this storage
ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool {
// append this chunk to the indexes of the table
if (!table->AppendToIndexes(append_state, chunk, append_state.current_row)) {
throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key");
}

// append to base table
table->Append(transaction, commit_id, chunk, *storage->state);
table->Append(transaction, commit_id, chunk, append_state);
// if there is a WAL, write the chunk to there as well
if (log && !table->IsTemporary()) {
log->WriteInsert(chunk);
}
return true;
});
storage->Clear();
}
// finished commit: clear local storage
for (auto &entry : table_storage) {
entry.second->Clear();
}
table_storage.clear();
}

void LocalStorage::RevertCommit(LocalStorage::CommitState &commit_state) {
for (auto &entry : commit_state.append_states) {
auto table = entry.first;

table->RevertAppend(*entry.second);
}
}
5 changes: 4 additions & 1 deletion src/storage/table/segment_tree.cpp
Expand Up @@ -14,7 +14,10 @@ SegmentBase *SegmentTree::GetLastSegment() {

SegmentBase *SegmentTree::GetSegment(index_t row_number) {
lock_guard<mutex> tree_lock(node_lock);
return nodes[GetSegmentIndex(row_number)].node;
}

index_t SegmentTree::GetSegmentIndex(index_t row_number) {
index_t lower = 0;
index_t upper = nodes.size() - 1;
// binary search to find the node
Expand All @@ -26,7 +29,7 @@ SegmentBase *SegmentTree::GetSegment(index_t row_number) {
} else if (row_number >= entry.row_start + entry.node->count) {
lower = index + 1;
} else {
return entry.node;
return index;
}
}
throw Exception("Could not find node in column segment tree!");
Expand Down

0 comments on commit 52b9aeb

Please sign in to comment.