diff --git a/src/common/exception.cpp b/src/common/exception.cpp index a1748f9455c..e829fc7a4dd 100644 --- a/src/common/exception.cpp +++ b/src/common/exception.cpp @@ -84,6 +84,8 @@ string Exception::ExceptionTypeToString(ExceptionType type) { return "IO"; case ExceptionType::INTERRUPT: return "INTERRUPT"; + case ExceptionType::FATAL: + return "FATAL"; default: return "Unknown"; } @@ -172,3 +174,7 @@ SequenceException::SequenceException(string msg, ...) : Exception(ExceptionType: InterruptException::InterruptException() : Exception(ExceptionType::INTERRUPT, "Interrupted!") { } + +FatalException::FatalException(string msg, ...) : Exception(ExceptionType::FATAL, msg) { + FORMAT_CONSTRUCTOR(msg); +} diff --git a/src/common/file_system.cpp b/src/common/file_system.cpp index 3ccdefc051d..4359d0ec84f 100644 --- a/src/common/file_system.cpp +++ b/src/common/file_system.cpp @@ -259,7 +259,7 @@ string FileSystem::PathSeparator() { void FileSystem::FileSync(FileHandle &handle) { int fd = ((UnixFileHandle &)handle).fd; if (fsync(fd) != 0) { - throw IOException("FATAL ERROR: fsync failed!"); + throw FatalException("fsync failed!"); } } diff --git a/src/include/duckdb/common/exception.hpp b/src/include/duckdb/common/exception.hpp index dd1a91d2b97..95ee7fb70de 100644 --- a/src/include/duckdb/common/exception.hpp +++ b/src/include/duckdb/common/exception.hpp @@ -54,7 +54,8 @@ enum class ExceptionType { OPTIMIZER = 26, // optimizer related NULL_POINTER = 27, // nullptr exception IO = 28, // IO exception - INTERRUPT = 29 // interrupt + INTERRUPT = 29, // interrupt + FATAL = 30 // Fatal exception: fatal exceptions are non-recoverable, and render the entire DB in an unusable state }; class Exception : public std::exception { @@ -167,4 +168,9 @@ class InterruptException : public Exception { InterruptException(); }; +class FatalException : public Exception { +public: + FatalException(string msg, ...); +}; + } // namespace duckdb diff --git a/src/include/duckdb/storage/column_data.hpp b/src/include/duckdb/storage/column_data.hpp index 3929a09ba20..18b3492fa6b 100644 --- a/src/include/duckdb/storage/column_data.hpp +++ b/src/include/duckdb/storage/column_data.hpp @@ -47,6 +47,8 @@ class ColumnData { void InitializeAppend(ColumnAppendState &state); //! Append a vector of type [type] to the end of the column void Append(ColumnAppendState &state, Vector &vector); + //! Revert a set of appends to the ColumnData + void RevertAppend(row_t start_row); //! Update the specified row identifiers void Update(Transaction &transaction, Vector &updates, row_t *ids); @@ -55,7 +57,6 @@ class ColumnData { void Fetch(ColumnScanState &state, row_t row_id, Vector &result); //! Fetch a specific row id and append it to the vector void FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result); - private: //! Append a transient segment void AppendTransientSegment(index_t start_row); diff --git a/src/include/duckdb/storage/data_table.hpp b/src/include/duckdb/storage/data_table.hpp index 594221ad570..97966ade0da 100644 --- a/src/include/duckdb/storage/data_table.hpp +++ b/src/include/duckdb/storage/data_table.hpp @@ -88,6 +88,8 @@ class DataTable { void InitializeAppend(TableAppendState &state); //! Append a chunk to the table using the AppendState obtained from BeginAppend void Append(Transaction &transaction, transaction_t commit_id, DataChunk &chunk, TableAppendState &state); + //! Revert a set of appends made by the given AppendState, used to revert appends in the event of an error during commit (e.g. because of an I/O exception) + void RevertAppend(TableAppendState &state); //! Append a chunk with the row ids [row_start, ..., row_start + chunk.size()] to all indexes of the table, returns //! whether or not the append succeeded diff --git a/src/include/duckdb/storage/table/segment_tree.hpp b/src/include/duckdb/storage/table/segment_tree.hpp index 43219bec2f3..55807446168 100644 --- a/src/include/duckdb/storage/table/segment_tree.hpp +++ b/src/include/duckdb/storage/table/segment_tree.hpp @@ -41,6 +41,9 @@ class SegmentTree { SegmentBase *GetSegment(index_t row_number); //! Append a column segment to the tree void AppendSegment(unique_ptr segment); + + //! Get the segment index of the column segment for the given row (does not lock the segment tree!) + index_t GetSegmentIndex(index_t row_number); }; } // namespace duckdb diff --git a/src/include/duckdb/storage/table/version_manager.hpp b/src/include/duckdb/storage/table/version_manager.hpp index ed41b8df6ea..781a8b56706 100644 --- a/src/include/duckdb/storage/table/version_manager.hpp +++ b/src/include/duckdb/storage/table/version_manager.hpp @@ -49,6 +49,8 @@ class VersionManager { void Delete(Transaction &transaction, Vector &row_ids); //! Append a set of rows to the version manager, setting their inserted id to the given commit_id void Append(Transaction &transaction, row_t row_start, index_t count, transaction_t commit_id); + //! Revert a set of appends made to the version manager from the rows [row_start] until [row_end] + void RevertAppend(row_t row_start, row_t row_end); private: ChunkInsertInfo *GetInsertInfo(index_t chunk_idx); diff --git a/src/include/duckdb/transaction/commit_state.hpp b/src/include/duckdb/transaction/commit_state.hpp index 650d4434533..c36aab20417 100644 --- a/src/include/duckdb/transaction/commit_state.hpp +++ b/src/include/duckdb/transaction/commit_state.hpp @@ -35,6 +35,7 @@ class CommitState { public: template void CommitEntry(UndoFlags type, data_ptr_t data); + void RevertCommit(UndoFlags type, data_ptr_t data); private: void SwitchTable(DataTable *table, UndoFlags new_op); diff --git a/src/include/duckdb/transaction/local_storage.hpp b/src/include/duckdb/transaction/local_storage.hpp index 50ed91d117d..0c4fdb7b085 100644 --- a/src/include/duckdb/transaction/local_storage.hpp +++ b/src/include/duckdb/transaction/local_storage.hpp @@ -28,8 +28,6 @@ class LocalTableStorage { vector> indexes; //! The set of deleted entries unordered_map> deleted_entries; - //! The append struct, used during the final append of the LocalTableStorage to the base DataTable - unique_ptr state; //! The max row row_t max_row; @@ -41,6 +39,10 @@ class LocalTableStorage { //! The LocalStorage class holds appends that have not been committed yet class LocalStorage { +public: + struct CommitState { + unordered_map> append_states; + }; public: //! Initialize a scan of the local storage void InitializeScan(DataTable *table, LocalScanState &state); @@ -54,11 +56,10 @@ class LocalStorage { //! Update a set of rows in the local storage void Update(DataTable *table, Vector &row_identifiers, vector &column_ids, DataChunk &data); - //! Check whether or not committing the local storage is possible, throws an exception if it is not possible - void CheckCommit(); - //! Commits the local storage, writing it to the WAL and completing the commit - void Commit(Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) noexcept; + void Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, transaction_t commit_id); + //! Revert the commit made so far by the LocalStorage + void RevertCommit(LocalStorage::CommitState &commit_state); bool ChangesMade() noexcept { return table_storage.size() > 0; diff --git a/src/include/duckdb/transaction/transaction.hpp b/src/include/duckdb/transaction/transaction.hpp index 916cbe43e2c..20aeb87ec2a 100644 --- a/src/include/duckdb/transaction/transaction.hpp +++ b/src/include/duckdb/transaction/transaction.hpp @@ -61,10 +61,8 @@ class Transaction { //! Push a query into the undo buffer void PushQuery(string query); - //! Checks whether or not the transaction can be successfully committed, - void CheckCommit(); - //! Commit the current transaction with the given commit identifier - void Commit(WriteAheadLog *log, transaction_t commit_id) noexcept; + //! Commit the current transaction with the given commit identifier. Returns true if the transaction commit was successful, or false if it was aborted. + bool Commit(WriteAheadLog *log, transaction_t commit_id) noexcept; //! Rollback void Rollback() noexcept { undo_buffer.Rollback(); diff --git a/src/include/duckdb/transaction/transaction_manager.hpp b/src/include/duckdb/transaction/transaction_manager.hpp index 81522dca514..4a4c5330197 100644 --- a/src/include/duckdb/transaction/transaction_manager.hpp +++ b/src/include/duckdb/transaction/transaction_manager.hpp @@ -38,7 +38,7 @@ class TransactionManager { //! Start a new transaction Transaction *StartTransaction(); //! Commit the given transaction - void CommitTransaction(Transaction *transaction); + bool CommitTransaction(Transaction *transaction); //! Rollback the given transaction void RollbackTransaction(Transaction *transaction); //! Add the catalog set @@ -50,7 +50,7 @@ class TransactionManager { private: //! Remove the given transaction from the list of active transactions - void RemoveTransaction(Transaction *transaction); + void RemoveTransaction(Transaction *transaction) noexcept; //! The current query number std::atomic current_query_number; diff --git a/src/include/duckdb/transaction/undo_buffer.hpp b/src/include/duckdb/transaction/undo_buffer.hpp index 4702557f6f8..bb9b09dad0c 100644 --- a/src/include/duckdb/transaction/undo_buffer.hpp +++ b/src/include/duckdb/transaction/undo_buffer.hpp @@ -35,6 +35,12 @@ struct UndoChunk { //! that might be required in the future (because of rollbacks or previous //! transactions accessing them) class UndoBuffer { +public: + struct IteratorState { + UndoChunk *current; + data_ptr_t start; + data_ptr_t end; + }; public: UndoBuffer(); @@ -47,7 +53,9 @@ class UndoBuffer { //! Cleanup the undo buffer void Cleanup(); //! Commit the changes made in the UndoBuffer: should be called on commit - void Commit(WriteAheadLog *log, transaction_t commit_id) noexcept; + void Commit(UndoBuffer::IteratorState iterator_state, WriteAheadLog *log, transaction_t commit_id); + //! Revert committed changes made in the UndoBuffer up until the currently committed state + void RevertCommit(UndoBuffer::IteratorState iterator_state, transaction_t transaction_id); //! Rollback the changes made in this UndoBuffer: should be called on //! rollback void Rollback() noexcept; @@ -57,7 +65,8 @@ class UndoBuffer { UndoChunk *tail; private: - template void IterateEntries(T &&callback); + template void IterateEntries(UndoBuffer::IteratorState &state, T &&callback); + template void IterateEntries(UndoBuffer::IteratorState &state, UndoBuffer::IteratorState &end_state, T &&callback); template void ReverseIterateEntries(T &&callback); }; diff --git a/src/storage/column_data.cpp b/src/storage/column_data.cpp index c50cb38048a..a231aac8d03 100644 --- a/src/storage/column_data.cpp +++ b/src/storage/column_data.cpp @@ -94,6 +94,20 @@ void ColumnData::Append(ColumnAppendState &state, Vector &vector) { } } +void ColumnData::RevertAppend(row_t start_row) { + lock_guard tree_lock(data.node_lock); + // find the segment index that the current row belongs to + index_t segment_index = data.GetSegmentIndex(start_row); + auto segment = data.nodes[segment_index].node; + // remove any segments AFTER this segment: they should be deleted entirely + if (segment_index < data.nodes.size() - 1) { + data.nodes.erase(data.nodes.begin() + segment_index); + } + segment->next = nullptr; + // for the current segment, set the count to the reverted count + segment->count = start_row - segment->start; +} + void ColumnData::Update(Transaction &transaction, Vector &updates, row_t *ids) { // first find the segment that the update belongs to index_t first_id = ids[updates.sel_vector ? updates.sel_vector[0] : 0]; diff --git a/src/storage/data_table.cpp b/src/storage/data_table.cpp index dba3b29f33c..d7cf2ee216d 100644 --- a/src/storage/data_table.cpp +++ b/src/storage/data_table.cpp @@ -350,6 +350,21 @@ void DataTable::Append(Transaction &transaction, transaction_t commit_id, DataCh state.current_row += chunk.size(); } +void DataTable::RevertAppend(TableAppendState &state) { + if (state.row_start == state.current_row) { + // nothing to revert! + return; + } + // revert changes in the base columns + for (index_t i = 0; i < types.size(); i++) { + columns[i].RevertAppend(state.row_start); + } + // adjust the cardinality + cardinality -= state.current_row - state.row_start; + // revert changes in the transient manager + transient_manager.RevertAppend(state.row_start, state.current_row); +} + //===--------------------------------------------------------------------===// // Indexes //===--------------------------------------------------------------------===// diff --git a/src/storage/local_storage.cpp b/src/storage/local_storage.cpp index ac091911997..f7d0b3083bb 100644 --- a/src/storage/local_storage.cpp +++ b/src/storage/local_storage.cpp @@ -37,7 +37,6 @@ void LocalTableStorage::Clear() { collection.chunks.clear(); indexes.clear(); deleted_entries.clear(); - state = nullptr; } void LocalStorage::InitializeScan(DataTable *table, LocalScanState &state) { @@ -256,89 +255,50 @@ template bool LocalStorage::ScanTableStorage(DataTable *table, LocalTa } } -void LocalStorage::CheckCommit() { - // check whether a commit can be made, we do this check by appending to all indices - bool success = true; - for (auto &entry : table_storage) { - auto table = entry.first; - auto storage = entry.second.get(); - - storage->state = make_unique(); - table->InitializeAppend(*storage->state); - - if (table->indexes.size() == 0) { - continue; - } - - row_t current_row = storage->state->row_start; - ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { - // append this chunk to the indexes of the table - if (!table->AppendToIndexes(*storage->state, chunk, current_row)) { - success = false; - return false; - } - current_row += chunk.size(); - return true; - }); - - storage->max_row = current_row; - if (!success) { - break; - } - } - if (!success) { - // failed to insert in one of the tables: delete already inserted entries - for (auto &entry : table_storage) { - auto table = entry.first; - auto storage = entry.second.get(); - - if (table->indexes.size() == 0 || storage->max_row == 0) { - continue; - } - - row_t current_row = storage->state->row_start; - ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { - if (current_row >= storage->max_row) { - // done - return false; - } - table->RemoveFromIndexes(chunk, current_row); - current_row += chunk.size(); - return true; - }); - } - // reset the storage state for each of the entries - for (auto &entry : table_storage) { - auto storage = entry.second.get(); - storage->state = nullptr; - } - // throw a constraint violation - throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key"); - } -} - -void LocalStorage::Commit(Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) noexcept { +void LocalStorage::Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, transaction_t commit_id) { // commit local storage, iterate over all entries in the table storage map for (auto &entry : table_storage) { auto table = entry.first; auto storage = entry.second.get(); + // initialize the append state + auto append_state_ptr = make_unique(); + auto &append_state = *append_state_ptr; + // add it to the set of append states + commit_state.append_states[table] = move(append_state_ptr); + table->InitializeAppend(append_state); + if (log && !table->IsTemporary()) { log->WriteSetTable(table->schema, table->table); } // scan all chunks in this storage ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { + // append this chunk to the indexes of the table + if (!table->AppendToIndexes(append_state, chunk, append_state.current_row)) { + throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key"); + } + // append to base table - table->Append(transaction, commit_id, chunk, *storage->state); + table->Append(transaction, commit_id, chunk, append_state); // if there is a WAL, write the chunk to there as well if (log && !table->IsTemporary()) { log->WriteInsert(chunk); } return true; }); - storage->Clear(); } // finished commit: clear local storage + for (auto &entry : table_storage) { + entry.second->Clear(); + } table_storage.clear(); } + +void LocalStorage::RevertCommit(LocalStorage::CommitState &commit_state) { + for (auto &entry : commit_state.append_states) { + auto table = entry.first; + + table->RevertAppend(*entry.second); + } +} \ No newline at end of file diff --git a/src/storage/table/segment_tree.cpp b/src/storage/table/segment_tree.cpp index 2db8d4177ea..848e137f7d2 100644 --- a/src/storage/table/segment_tree.cpp +++ b/src/storage/table/segment_tree.cpp @@ -14,7 +14,10 @@ SegmentBase *SegmentTree::GetLastSegment() { SegmentBase *SegmentTree::GetSegment(index_t row_number) { lock_guard tree_lock(node_lock); + return nodes[GetSegmentIndex(row_number)].node; +} +index_t SegmentTree::GetSegmentIndex(index_t row_number) { index_t lower = 0; index_t upper = nodes.size() - 1; // binary search to find the node @@ -26,7 +29,7 @@ SegmentBase *SegmentTree::GetSegment(index_t row_number) { } else if (row_number >= entry.row_start + entry.node->count) { lower = index + 1; } else { - return entry.node; + return index; } } throw Exception("Could not find node in column segment tree!"); diff --git a/src/storage/table/version_manager.cpp b/src/storage/table/version_manager.cpp index c4880b595ac..7538ae031b0 100644 --- a/src/storage/table/version_manager.cpp +++ b/src/storage/table/version_manager.cpp @@ -146,3 +146,13 @@ ChunkInsertInfo *VersionManager::GetInsertInfo(index_t chunk_idx) { } } } + +void VersionManager::RevertAppend(row_t row_start, row_t row_end) { + auto write_lock = lock.GetExclusiveLock(); + + index_t chunk_start = row_start / STANDARD_VECTOR_SIZE + (row_start % STANDARD_VECTOR_SIZE == 0 ? 0 : 1); + index_t chunk_end = row_end / STANDARD_VECTOR_SIZE; + for(; chunk_start <= chunk_end; chunk_start++) { + info.erase(chunk_start); + } +} \ No newline at end of file diff --git a/src/transaction/commit_state.cpp b/src/transaction/commit_state.cpp index 1e4ccf00b4c..e9c3967cf5f 100644 --- a/src/transaction/commit_state.cpp +++ b/src/transaction/commit_state.cpp @@ -172,5 +172,39 @@ template void CommitState::CommitEntry(UndoFlags type, data_ptr_t } } +void CommitState::RevertCommit(UndoFlags type, data_ptr_t data) { + transaction_t transaction_id = commit_id; + switch (type) { + case UndoFlags::CATALOG_ENTRY: { + // set the commit timestamp of the catalog entry to the given id + CatalogEntry *catalog_entry = *((CatalogEntry **)data); + assert(catalog_entry->parent); + catalog_entry->parent->timestamp = transaction_id; + break; + } + case UndoFlags::DELETE_TUPLE: { + // deletion: + auto info = (DeleteInfo *)data; + info->GetTable().cardinality += info->count; + // revert the commit by writing the (uncommitted) transaction_id back into the version info + info->vinfo->CommitDelete(transaction_id, info->rows, info->count); + break; + } + case UndoFlags::UPDATE_TUPLE: { + // update: + auto info = (UpdateInfo *)data; + info->version_number = transaction_id; + break; + } + case UndoFlags::QUERY: { + break; + } + case UndoFlags::DATA: + break; + default: + throw NotImplementedException("UndoBuffer - don't know how to revert commit of this type!"); + } +} + template void CommitState::CommitEntry(UndoFlags type, data_ptr_t data); template void CommitState::CommitEntry(UndoFlags type, data_ptr_t data); diff --git a/src/transaction/transaction.cpp b/src/transaction/transaction.cpp index 45c0c7b29ce..c347712d757 100644 --- a/src/transaction/transaction.cpp +++ b/src/transaction/transaction.cpp @@ -56,25 +56,34 @@ void Transaction::PushQuery(string query) { strcpy(blob, query.c_str()); } -void Transaction::CheckCommit() { - storage.CheckCommit(); -} - -void Transaction::Commit(WriteAheadLog *log, transaction_t commit_id) noexcept { +bool Transaction::Commit(WriteAheadLog *log, transaction_t commit_id) noexcept { this->commit_id = commit_id; - // commit the undo buffer + UndoBuffer::IteratorState iterator_state; + LocalStorage::CommitState commit_state; bool changes_made = undo_buffer.ChangesMade() || storage.ChangesMade() || sequence_usage.size() > 0; - undo_buffer.Commit(log, commit_id); - storage.Commit(*this, log, commit_id); - if (log) { - // commit any sequences that were used to the WAL - for (auto &entry : sequence_usage) { - log->WriteSequenceValue(entry.first, entry.second); + try { + // commit the undo buffer + undo_buffer.Commit(iterator_state, log, commit_id); + storage.Commit(commit_state, *this, log, commit_id); + if (log) { + // commit any sequences that were used to the WAL + for (auto &entry : sequence_usage) { + log->WriteSequenceValue(entry.first, entry.second); + } + // flush the WAL + if (changes_made) { + log->Flush(); + } } - // flush the WAL - if (changes_made) { - log->Flush(); + return true; + } catch(Exception &ex) { + undo_buffer.RevertCommit(iterator_state, transaction_id); + storage.RevertCommit(commit_state); + if (log && changes_made) { + throw Exception("FIXME: flush undo into log"); + // log->FlushUndo(); } + return false; } } diff --git a/src/transaction/transaction_context.cpp b/src/transaction/transaction_context.cpp index 745e2d0b0ab..a52d0e08f6e 100644 --- a/src/transaction/transaction_context.cpp +++ b/src/transaction/transaction_context.cpp @@ -32,7 +32,9 @@ void TransactionContext::Commit() { throw TransactionException("No transaction is currently active - cannot commit!"); } current_transaction = nullptr; - transaction_manager.CommitTransaction(transaction); + if (!transaction_manager.CommitTransaction(transaction)) { + throw TransactionException("Failed to commit!"); + } } void TransactionContext::Rollback() { diff --git a/src/transaction/transaction_manager.cpp b/src/transaction/transaction_manager.cpp index b2c3fcc5086..a6ffe3cd181 100644 --- a/src/transaction/transaction_manager.cpp +++ b/src/transaction/transaction_manager.cpp @@ -50,29 +50,24 @@ Transaction *TransactionManager::StartTransaction() { return transaction_ptr; } -void TransactionManager::CommitTransaction(Transaction *transaction) { +bool TransactionManager::CommitTransaction(Transaction *transaction) { // obtain the transaction lock during this function lock_guard lock(transaction_lock); - // first check whether we can commit this transaction - try { - transaction->CheckCommit(); - } catch (Exception &ex) { - // cannot commit transaction! roll it back instead of committing it - transaction->Rollback(); - RemoveTransaction(transaction); - throw ex; - } - // obtain a commit id for the transaction transaction_t commit_id = current_start_timestamp++; - + bool success = true; // commit the UndoBuffer of the transaction - transaction->Commit(storage.GetWriteAheadLog(), commit_id); + if (!transaction->Commit(storage.GetWriteAheadLog(), commit_id)) { + // commit unsuccessful: rollback the transaction instead + transaction->Rollback(); + success = false; + } - // remove the transaction id from the list of active transactions + // commit successful: remove the transaction id from the list of active transactions // potentially resulting in garbage collection RemoveTransaction(transaction); + return success; } void TransactionManager::RollbackTransaction(Transaction *transaction) { @@ -87,7 +82,7 @@ void TransactionManager::RollbackTransaction(Transaction *transaction) { RemoveTransaction(transaction); } -void TransactionManager::RemoveTransaction(Transaction *transaction) { +void TransactionManager::RemoveTransaction(Transaction *transaction) noexcept { // remove the transaction from the list of active transactions index_t t_index = active_transactions.size(); // check for the lowest and highest start time in the list of transactions diff --git a/src/transaction/undo_buffer.cpp b/src/transaction/undo_buffer.cpp index ae55db281bb..9c7e918d9bf 100644 --- a/src/transaction/undo_buffer.cpp +++ b/src/transaction/undo_buffer.cpp @@ -12,9 +12,9 @@ #include -using namespace duckdb; using namespace std; +namespace duckdb { constexpr uint32_t DEFAULT_UNDO_CHUNK_SIZE = 4096 * 3; constexpr uint32_t UNDO_ENTRY_HEADER_SIZE = sizeof(UndoFlags) + sizeof(uint32_t); @@ -61,21 +61,43 @@ data_ptr_t UndoBuffer::CreateEntry(UndoFlags type, index_t len) { return head->WriteEntry(type, len); } -template void UndoBuffer::IterateEntries(T &&callback) { +template void UndoBuffer::IterateEntries(UndoBuffer::IteratorState &state, T &&callback) { // iterate in insertion order: start with the tail - auto current = tail; - while (current) { - data_ptr_t start = current->data.get(); - data_ptr_t end = start + current->current_position; - while (start < end) { - UndoFlags type = *((UndoFlags *)start); - start += sizeof(UndoFlags); - uint32_t len = *((uint32_t *)start); - start += sizeof(uint32_t); - callback(type, start); - start += len; + state.current = tail; + while (state.current) { + state.start = state.current->data.get(); + state.end = state.start + state.current->current_position; + while (state.start < state.end) { + UndoFlags type = *((UndoFlags *)state.start); + state.start += sizeof(UndoFlags); + uint32_t len = *((uint32_t *)state.start); + state.start += sizeof(uint32_t); + callback(type, state.start); + state.start += len; } - current = current->prev; + state.current = state.current->prev; + } +} + +template void UndoBuffer::IterateEntries(UndoBuffer::IteratorState &state, UndoBuffer::IteratorState &end_state, T &&callback) { + // iterate in insertion order: start with the tail + state.current = tail; + while (state.current) { + state.start = state.current->data.get(); + state.end = state.current == end_state.current ? end_state.start : state.start + state.current->current_position; + while (state.start < state.end) { + UndoFlags type = *((UndoFlags *)state.start); + state.start += sizeof(UndoFlags); + uint32_t len = *((uint32_t *)state.start); + state.start += sizeof(uint32_t); + callback(type, state.start); + state.start += len; + } + if (state.current == end_state.current) { + // finished executing until the current end state + return; + } + state.current = state.current->prev; } } @@ -117,22 +139,33 @@ void UndoBuffer::Cleanup() { // (2) there is no active transaction with start_id < commit_id of this // transaction CleanupState state; - IterateEntries([&](UndoFlags type, data_ptr_t data) { state.CleanupEntry(type, data); }); + UndoBuffer::IteratorState iterator_state; + IterateEntries(iterator_state, [&](UndoFlags type, data_ptr_t data) { state.CleanupEntry(type, data); }); } -void UndoBuffer::Commit(WriteAheadLog *log, transaction_t commit_id) noexcept { +void UndoBuffer::Commit(UndoBuffer::IteratorState iterator_state, WriteAheadLog *log, transaction_t commit_id) { CommitState state(commit_id, log); if (log) { // commit WITH write ahead log - IterateEntries([&](UndoFlags type, data_ptr_t data) { state.CommitEntry(type, data); }); + IterateEntries(iterator_state, [&](UndoFlags type, data_ptr_t data) { state.CommitEntry(type, data); }); } else { // comit WITHOUT write ahead log - IterateEntries([&](UndoFlags type, data_ptr_t data) { state.CommitEntry(type, data); }); + IterateEntries(iterator_state, [&](UndoFlags type, data_ptr_t data) { state.CommitEntry(type, data); }); } } +void UndoBuffer::RevertCommit(UndoBuffer::IteratorState end_state, transaction_t transaction_id) { + CommitState state(transaction_id, nullptr); + UndoBuffer::IteratorState start_state; + IterateEntries(start_state, end_state, [&](UndoFlags type, data_ptr_t data) { + state.RevertCommit(type, data); + }); + +} + void UndoBuffer::Rollback() noexcept { // rollback needs to be performed in reverse RollbackState state; ReverseIterateEntries([&](UndoFlags type, data_ptr_t data) { state.RollbackEntry(type, data); }); } +} \ No newline at end of file