Skip to content

Commit

Permalink
Fix for concurrent appends to index: obtain append lock BEFORE initia…
Browse files Browse the repository at this point in the history
…lizing the scan to ensure we do not miss any freshly appended rows
  • Loading branch information
Mytherin committed Nov 2, 2019
1 parent aa21adf commit 19b9fcb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/storage/data_table.cpp
Expand Up @@ -558,11 +558,13 @@ void DataTable::Update(TableCatalogEntry &table, ClientContext &context, Vector
// Create Index Scan
//===--------------------------------------------------------------------===//
void DataTable::InitializeCreateIndexScan(CreateIndexScanState &state, vector<column_t> column_ids) {
InitializeScan(state, column_ids);
// we grab the append lock to make sure nothing is appended until AFTER we finish the index scan
state.append_lock = make_unique<lock_guard<mutex>>(append_lock);
// get a read lock on the VersionManagers to prevent any further deletions
state.locks.push_back(persistent_manager.lock.GetSharedLock());
state.locks.push_back(transient_manager.lock.GetSharedLock());

InitializeScan(state, column_ids);
}

void DataTable::CreateIndexScan(CreateIndexScanState &state, DataChunk &result) {
Expand Down
16 changes: 10 additions & 6 deletions test/sql/index/test_concurrent_index.cpp
Expand Up @@ -66,13 +66,12 @@ TEST_CASE("Concurrent reads during index creation", "[index][.]") {

static void append_to_integers(DuckDB *db, index_t threadnr) {
Connection con(*db);
auto appender = con.OpenAppender(DEFAULT_SCHEMA, "integers");
for (index_t i = 0; i < INSERT_COUNT; i++) {
appender->BeginRow();
appender->AppendInteger(1);
appender->EndRow();
auto result = con.Query("INSERT INTO integers VALUES (1)");
if (!result->success) {
FAIL();
}
}
con.CloseAppender();
}

TEST_CASE("Concurrent writes during index creation", "[index][.]") {
Expand Down Expand Up @@ -104,7 +103,12 @@ TEST_CASE("Concurrent writes during index creation", "[index][.]") {
threads[i].join();
}

// now test that we can probe the index correctly
// first scan the actual base table to verify the count, we avoid using a filter here to prevent the optimizer from using an index scan
result = con.Query("SELECT i, COUNT(*) FROM integers GROUP BY i ORDER BY i LIMIT 1 OFFSET 1");
REQUIRE(CHECK_COLUMN(result, 0, {1}));
REQUIRE(CHECK_COLUMN(result, 1, {1 + THREAD_COUNT * INSERT_COUNT}));

// now test that we can probe the index correctly too
result = con.Query("SELECT COUNT(*) FROM integers WHERE i=1");
REQUIRE(CHECK_COLUMN(result, 0, {1 + THREAD_COUNT * INSERT_COUNT}));
}
Expand Down

0 comments on commit 19b9fcb

Please sign in to comment.