Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open JSON files lock-free if there are many #10188

Merged
merged 4 commits into from Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/json/include/json_scan.hpp
Expand Up @@ -235,6 +235,7 @@ struct JSONScanLocalState {
void ThrowObjectSizeError(const idx_t object_size);
void ThrowInvalidAtEndError();

//! Must hold the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our current approach for requiring a lock in a protected/private method is to pass a unique_lock reference as a param. Like we do in CatalogSet::CreateDefaultEntries(CatalogTransaction transaction, unique_lock<mutex> &lock);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock must be held for the code within the function. There are two types of uses for this. One where we grab the lock and call the function right after, and one where we've been holding on to the lock and then need to call the function.

I don't understand what using a unique_lock and passing it to the function would achieve here. We then would also need to add a parameter bool i_already_have_the_lock and then not lock it if this is true, or we would need to unlock and then call the function, which would grab the lock again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is that if you should only call a function with the lock, making it impossible to call the function without the lock makes things a bit cleaner. I see that we do both throughout the code for protected/private methods: adding a comment that holding the lock is required and passing a unique_lock or lock_guard as an argument (sometimes in a wrapper class like ClientContextLock)

I have no strong opinion here, Im fine with merging it like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with being safe, but I will leave this as it is. Thanks!

void TryIncrementFileIndex(JSONScanGlobalState &gstate) const;
bool IsParallel(JSONScanGlobalState &gstate) const;

Expand Down
67 changes: 33 additions & 34 deletions extension/json/json_scan.cpp
Expand Up @@ -437,7 +437,6 @@ void JSONScanLocalState::ThrowInvalidAtEndError() {
}

void JSONScanLocalState::TryIncrementFileIndex(JSONScanGlobalState &gstate) const {
lock_guard<mutex> guard(gstate.lock);
if (gstate.file_index < gstate.json_readers.size() &&
current_reader.get() == gstate.json_readers[gstate.file_index].get()) {
gstate.file_index++;
Expand Down Expand Up @@ -564,6 +563,7 @@ bool JSONScanLocalState::ReadNextBuffer(JSONScanGlobalState &gstate) {
if (current_reader) {
// If we performed the final read of this reader in the previous iteration, close it now
if (is_last) {
lock_guard<mutex> guard(gstate.lock);
TryIncrementFileIndex(gstate);
current_reader->CloseJSONFile();
current_reader = nullptr;
Expand All @@ -572,57 +572,57 @@ bool JSONScanLocalState::ReadNextBuffer(JSONScanGlobalState &gstate) {

// Try to read
ReadNextBufferInternal(gstate, buffer_index);
if (buffer_index.GetIndex() == 0 && current_reader->GetFormat() == JSONFormat::ARRAY) {
SkipOverArrayStart();
}

// If this is the last read, end the parallel scan now so threads can move on
if (is_last && IsParallel(gstate)) {
lock_guard<mutex> guard(gstate.lock);
TryIncrementFileIndex(gstate);
}

if (buffer_size == 0) {
// We didn't read anything, re-enter the loop
continue;
} else {
// We read something!
break;
}
// We read something!
break;
}

// If we got here, we don't have a reader (anymore). Try to get one
is_last = false;
{
lock_guard<mutex> guard(gstate.lock);
if (gstate.file_index == gstate.json_readers.size()) {
return false; // No more files left
}
unique_lock<mutex> guard(gstate.lock);
if (gstate.file_index == gstate.json_readers.size()) {
return false; // No more files left
}

// Assign the next reader to this thread
current_reader = gstate.json_readers[gstate.file_index].get();
// Assign the next reader to this thread
current_reader = gstate.json_readers[gstate.file_index].get();

// Open the file if it is not yet open
if (!current_reader->IsOpen()) {
current_reader->OpenJSONFile();
}
batch_index = gstate.batch_index++;
batch_index = gstate.batch_index++;
if (!gstate.enable_parallel_scans) {
// Non-parallel scans, increment file index and unlock
gstate.file_index++;
guard.unlock();
}

// Auto-detect format / record type
if (gstate.enable_parallel_scans) {
// Auto-detect within the lock, so threads may join a parallel NDJSON scan
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
ReadAndAutoDetect(gstate, buffer_index);
}
} else {
gstate.file_index++; // Increment the file index before dropping lock so other threads move on
}
// Open the file if it is not yet open
if (!current_reader->IsOpen()) {
current_reader->OpenJSONFile();
}

// If we didn't auto-detect within the lock, do it now
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
// Auto-detect if we haven't yet done this during the bind
if (gstate.bind_data.options.record_type == JSONRecordType::AUTO_DETECT ||
current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
ReadAndAutoDetect(gstate, buffer_index);
}

// If we haven't already, increment the file index if non-parallel scan
if (gstate.enable_parallel_scans && !IsParallel(gstate)) {
TryIncrementFileIndex(gstate);
if (gstate.enable_parallel_scans) {
if (!IsParallel(gstate)) {
// We still have the lock here if parallel scans are enabled
TryIncrementFileIndex(gstate);
}
}

if (!buffer_index.IsValid() || buffer_size == 0) {
Expand Down Expand Up @@ -663,7 +663,9 @@ void JSONScanLocalState::ReadAndAutoDetect(JSONScanGlobalState &gstate, optional
}

auto format_and_record_type = DetectFormatAndRecordType(buffer_ptr, buffer_size, allocator.GetYYAlc());
current_reader->SetFormat(format_and_record_type.first);
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
current_reader->SetFormat(format_and_record_type.first);
}
if (current_reader->GetRecordType() == JSONRecordType::AUTO_DETECT) {
current_reader->SetRecordType(format_and_record_type.second);
}
Expand All @@ -686,9 +688,6 @@ void JSONScanLocalState::ReadNextBufferInternal(JSONScanGlobalState &gstate, opt
}

buffer_offset = 0;
if (buffer_index.GetIndex() == 0 && current_reader->GetFormat() == JSONFormat::ARRAY) {
SkipOverArrayStart();
}
}

void JSONScanLocalState::ReadNextBufferSeek(JSONScanGlobalState &gstate, optional_idx &buffer_index) {
Expand Down