Skip to content

Commit

Permalink
Merge pull request #10188 from lnkuiper/json_many_files
Browse files Browse the repository at this point in the history
Open JSON files lock-free if there are many
  • Loading branch information
Mytherin committed Jan 15, 2024
2 parents cebfbfb + 2056d1f commit 9699025
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 63 deletions.
6 changes: 6 additions & 0 deletions extension/json/include/json.json
Expand Up @@ -141,6 +141,12 @@
"name": "field_appearance_threshold",
"type": "double",
"default": 0.1
},
{
"id": 114,
"name": "maximum_sample_files",
"type": "idx_t",
"default": 32
}
],
"constructor": ["$ClientContext", "files", "date_format", "timestamp_format"]
Expand Down
3 changes: 3 additions & 0 deletions extension/json/include/json_scan.hpp
Expand Up @@ -126,6 +126,8 @@ struct JSONScanData : public TableFunctionData {
//! If the average over the fields of an object is less than this threshold,
//! we default to the JSON type for this object rather than the shredded type
double field_appearance_threshold = 0.1;
//! The maximum number of files we sample to sample sample_size rows
idx_t maximum_sample_files = 32;

//! All column names (in order)
vector<string> names;
Expand Down Expand Up @@ -235,6 +237,7 @@ struct JSONScanLocalState {
void ThrowObjectSizeError(const idx_t object_size);
void ThrowInvalidAtEndError();

//! Must hold the lock
void TryIncrementFileIndex(JSONScanGlobalState &gstate) const;
bool IsParallel(JSONScanGlobalState &gstate) const;

Expand Down
47 changes: 30 additions & 17 deletions extension/json/json_functions/read_json.cpp
Expand Up @@ -16,7 +16,7 @@ void JSONScan::AutoDetect(ClientContext &context, JSONScanData &bind_data, vecto
ArenaAllocator allocator(BufferAllocator::Get(context));
Vector string_vector(LogicalType::VARCHAR);

// Loop through the files (if union_by_name, else just sample the first file)
// Loop through the files (if union_by_name, else sample up to sample_size rows or maximum_sample_files files)
idx_t remaining = bind_data.sample_size;
for (idx_t file_idx = 0; file_idx < bind_data.files.size(); file_idx++) {
// Create global/local state and place the reader in the right field
Expand Down Expand Up @@ -59,8 +59,8 @@ void JSONScan::AutoDetect(ClientContext &context, JSONScanData &bind_data, vecto
if (bind_data.options.file_options.union_by_name) {
// When union_by_name=true we sample sample_size per file
remaining = bind_data.sample_size;
} else if (remaining == 0) {
// When union_by_name=false, we sample sample_size in total (across the first files)
} else if (remaining == 0 || file_idx == bind_data.maximum_sample_files - 1) {
// When union_by_name=false, we sample sample_size in total (across the first maximum_sample_files files)
break;
}
}
Expand Down Expand Up @@ -116,6 +116,9 @@ unique_ptr<FunctionData> ReadJSONBind(ClientContext &context, TableFunctionBindI

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (kv.second.IsNull()) {
throw BinderException("read_json parameter \"%s\" cannot be NULL.", loption);
}
if (loption == "columns") {
auto &child_type = kv.second.type();
if (child_type.id() != LogicalTypeId::STRUCT) {
Expand Down Expand Up @@ -146,8 +149,8 @@ unique_ptr<FunctionData> ReadJSONBind(ClientContext &context, TableFunctionBindI
} else if (arg > 0) {
bind_data->sample_size = arg;
} else {
throw BinderException(
"read_json \"sample_size\" parameter must be positive, or -1 to sample the entire file.");
throw BinderException("read_json \"sample_size\" parameter must be positive, or -1 to sample all input "
"files entirely, up to \"maximum_sample_files\" files.");
}
} else if (loption == "maximum_depth") {
auto arg = BigIntValue::Get(kv.second);
Expand Down Expand Up @@ -198,6 +201,16 @@ unique_ptr<FunctionData> ReadJSONBind(ClientContext &context, TableFunctionBindI
} else {
throw BinderException("read_json requires \"records\" to be one of ['auto', 'true', 'false'].");
}
} else if (loption == "maximum_sample_files") {
auto arg = BigIntValue::Get(kv.second);
if (arg == -1) {
bind_data->maximum_sample_files = NumericLimits<idx_t>::Maximum();
} else if (arg > 0) {
bind_data->maximum_sample_files = arg;
} else {
throw BinderException("read_json \"maximum_sample_files\" parameter must be positive, or -1 to remove "
"the limit on the number of files used to sample \"sample_size\" rows.");
}
}
}

Expand All @@ -209,8 +222,8 @@ unique_ptr<FunctionData> ReadJSONBind(ClientContext &context, TableFunctionBindI
if (!bind_data->auto_detect) {
// Need to specify columns if RECORDS and not auto-detecting
if (return_types.empty()) {
throw BinderException("read_json requires columns to be specified through the \"columns\" parameter."
"\n Use read_json_auto or set auto_detect=true to automatically guess columns.");
throw BinderException("When auto_detect=false, read_json requires columns to be specified through the "
"\"columns\" parameter.");
}
// If we are reading VALUES, we can only have one column
if (bind_data->options.record_type == JSONRecordType::VALUES && return_types.size() != 1) {
Expand Down Expand Up @@ -313,6 +326,7 @@ TableFunction JSONFunctions::GetReadJSONTableFunction(shared_ptr<JSONScanInfo> f
table_function.named_parameters["timestampformat"] = LogicalType::VARCHAR;
table_function.named_parameters["timestamp_format"] = LogicalType::VARCHAR;
table_function.named_parameters["records"] = LogicalType::VARCHAR;
table_function.named_parameters["maximum_sample_files"] = LogicalType::BIGINT;

// TODO: might be able to do filter pushdown/prune ?

Expand All @@ -321,37 +335,36 @@ TableFunction JSONFunctions::GetReadJSONTableFunction(shared_ptr<JSONScanInfo> f
return table_function;
}

TableFunctionSet CreateJSONFunctionInfo(string name, shared_ptr<JSONScanInfo> info, bool auto_function = false) {
TableFunctionSet CreateJSONFunctionInfo(string name, shared_ptr<JSONScanInfo> info) {
auto table_function = JSONFunctions::GetReadJSONTableFunction(std::move(info));
table_function.name = std::move(name);
if (auto_function) {
table_function.named_parameters["maximum_depth"] = LogicalType::BIGINT;
table_function.named_parameters["field_appearance_threshold"] = LogicalType::DOUBLE;
}
table_function.named_parameters["maximum_depth"] = LogicalType::BIGINT;
table_function.named_parameters["field_appearance_threshold"] = LogicalType::DOUBLE;
return MultiFileReader::CreateFunctionSet(table_function);
}

TableFunctionSet JSONFunctions::GetReadJSONFunction() {
auto info = make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::ARRAY, JSONRecordType::RECORDS);
auto info =
make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::AUTO_DETECT, JSONRecordType::AUTO_DETECT, true);
return CreateJSONFunctionInfo("read_json", std::move(info));
}

TableFunctionSet JSONFunctions::GetReadNDJSONFunction() {
auto info =
make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::NEWLINE_DELIMITED, JSONRecordType::RECORDS);
auto info = make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::NEWLINE_DELIMITED,
JSONRecordType::AUTO_DETECT, true);
return CreateJSONFunctionInfo("read_ndjson", std::move(info));
}

TableFunctionSet JSONFunctions::GetReadJSONAutoFunction() {
auto info =
make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::AUTO_DETECT, JSONRecordType::AUTO_DETECT, true);
return CreateJSONFunctionInfo("read_json_auto", std::move(info), true);
return CreateJSONFunctionInfo("read_json_auto", std::move(info));
}

TableFunctionSet JSONFunctions::GetReadNDJSONAutoFunction() {
auto info = make_shared<JSONScanInfo>(JSONScanType::READ_JSON, JSONFormat::NEWLINE_DELIMITED,
JSONRecordType::AUTO_DETECT, true);
return CreateJSONFunctionInfo("read_ndjson_auto", std::move(info), true);
return CreateJSONFunctionInfo("read_ndjson_auto", std::move(info));
}

} // namespace duckdb
67 changes: 33 additions & 34 deletions extension/json/json_scan.cpp
Expand Up @@ -437,7 +437,6 @@ void JSONScanLocalState::ThrowInvalidAtEndError() {
}

void JSONScanLocalState::TryIncrementFileIndex(JSONScanGlobalState &gstate) const {
lock_guard<mutex> guard(gstate.lock);
if (gstate.file_index < gstate.json_readers.size() &&
current_reader.get() == gstate.json_readers[gstate.file_index].get()) {
gstate.file_index++;
Expand Down Expand Up @@ -564,6 +563,7 @@ bool JSONScanLocalState::ReadNextBuffer(JSONScanGlobalState &gstate) {
if (current_reader) {
// If we performed the final read of this reader in the previous iteration, close it now
if (is_last) {
lock_guard<mutex> guard(gstate.lock);
TryIncrementFileIndex(gstate);
current_reader->CloseJSONFile();
current_reader = nullptr;
Expand All @@ -572,57 +572,57 @@ bool JSONScanLocalState::ReadNextBuffer(JSONScanGlobalState &gstate) {

// Try to read
ReadNextBufferInternal(gstate, buffer_index);
if (buffer_index.GetIndex() == 0 && current_reader->GetFormat() == JSONFormat::ARRAY) {
SkipOverArrayStart();
}

// If this is the last read, end the parallel scan now so threads can move on
if (is_last && IsParallel(gstate)) {
lock_guard<mutex> guard(gstate.lock);
TryIncrementFileIndex(gstate);
}

if (buffer_size == 0) {
// We didn't read anything, re-enter the loop
continue;
} else {
// We read something!
break;
}
// We read something!
break;
}

// If we got here, we don't have a reader (anymore). Try to get one
is_last = false;
{
lock_guard<mutex> guard(gstate.lock);
if (gstate.file_index == gstate.json_readers.size()) {
return false; // No more files left
}
unique_lock<mutex> guard(gstate.lock);
if (gstate.file_index == gstate.json_readers.size()) {
return false; // No more files left
}

// Assign the next reader to this thread
current_reader = gstate.json_readers[gstate.file_index].get();
// Assign the next reader to this thread
current_reader = gstate.json_readers[gstate.file_index].get();

// Open the file if it is not yet open
if (!current_reader->IsOpen()) {
current_reader->OpenJSONFile();
}
batch_index = gstate.batch_index++;
batch_index = gstate.batch_index++;
if (!gstate.enable_parallel_scans) {
// Non-parallel scans, increment file index and unlock
gstate.file_index++;
guard.unlock();
}

// Auto-detect format / record type
if (gstate.enable_parallel_scans) {
// Auto-detect within the lock, so threads may join a parallel NDJSON scan
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
ReadAndAutoDetect(gstate, buffer_index);
}
} else {
gstate.file_index++; // Increment the file index before dropping lock so other threads move on
}
// Open the file if it is not yet open
if (!current_reader->IsOpen()) {
current_reader->OpenJSONFile();
}

// If we didn't auto-detect within the lock, do it now
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
// Auto-detect if we haven't yet done this during the bind
if (gstate.bind_data.options.record_type == JSONRecordType::AUTO_DETECT ||
current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
ReadAndAutoDetect(gstate, buffer_index);
}

// If we haven't already, increment the file index if non-parallel scan
if (gstate.enable_parallel_scans && !IsParallel(gstate)) {
TryIncrementFileIndex(gstate);
if (gstate.enable_parallel_scans) {
if (!IsParallel(gstate)) {
// We still have the lock here if parallel scans are enabled
TryIncrementFileIndex(gstate);
}
}

if (!buffer_index.IsValid() || buffer_size == 0) {
Expand Down Expand Up @@ -663,7 +663,9 @@ void JSONScanLocalState::ReadAndAutoDetect(JSONScanGlobalState &gstate, optional
}

auto format_and_record_type = DetectFormatAndRecordType(buffer_ptr, buffer_size, allocator.GetYYAlc());
current_reader->SetFormat(format_and_record_type.first);
if (current_reader->GetFormat() == JSONFormat::AUTO_DETECT) {
current_reader->SetFormat(format_and_record_type.first);
}
if (current_reader->GetRecordType() == JSONRecordType::AUTO_DETECT) {
current_reader->SetRecordType(format_and_record_type.second);
}
Expand All @@ -686,9 +688,6 @@ void JSONScanLocalState::ReadNextBufferInternal(JSONScanGlobalState &gstate, opt
}

buffer_offset = 0;
if (buffer_index.GetIndex() == 0 && current_reader->GetFormat() == JSONFormat::ARRAY) {
SkipOverArrayStart();
}
}

void JSONScanLocalState::ReadNextBufferSeek(JSONScanGlobalState &gstate, optional_idx &buffer_index) {
Expand Down
2 changes: 2 additions & 0 deletions extension/json/serialize_json.cpp
Expand Up @@ -42,6 +42,7 @@ void JSONScanData::Serialize(Serializer &serializer) const {
serializer.WritePropertyWithDefault<string>(111, "date_format", GetDateFormat());
serializer.WritePropertyWithDefault<string>(112, "timestamp_format", GetTimestampFormat());
serializer.WritePropertyWithDefault<double>(113, "field_appearance_threshold", field_appearance_threshold, 0.1);
serializer.WritePropertyWithDefault<idx_t>(114, "maximum_sample_files", maximum_sample_files, 32);
}

unique_ptr<JSONScanData> JSONScanData::Deserialize(Deserializer &deserializer) {
Expand Down Expand Up @@ -70,6 +71,7 @@ unique_ptr<JSONScanData> JSONScanData::Deserialize(Deserializer &deserializer) {
result->transform_options = transform_options;
result->names = std::move(names);
deserializer.ReadPropertyWithDefault<double>(113, "field_appearance_threshold", result->field_appearance_threshold, 0.1);
deserializer.ReadPropertyWithDefault<idx_t>(114, "maximum_sample_files", result->maximum_sample_files, 32);
return result;
}

Expand Down
18 changes: 9 additions & 9 deletions test/sql/json/table/read_json.test
Expand Up @@ -8,13 +8,13 @@ statement ok
pragma enable_verification

statement error
SELECT * FROM read_json('data/json/example_n.ndjson')
SELECT * FROM read_json('data/json/example_n.ndjson', auto_detect=false)
----
Binder Error

# read_json assumes array of obj
# can't read ndjson with array
statement error
SELECT * FROM read_json('data/json/example_n.ndjson', columns={id: 'INTEGER', name: 'VARCHAR'})
SELECT * FROM read_json('data/json/example_n.ndjson', columns={id: 'INTEGER', name: 'VARCHAR'}, format='array')
----
Invalid Input Error: Expected top-level JSON array

Expand Down Expand Up @@ -237,9 +237,9 @@ select * from read_json('data/json/top_level_array.json', auto_detect=true)
cancelled
cancelled

# if we try to read it with 'unstructured', we get an error
# if we try to read it as 'unstructured' records
statement error
select * from read_json('data/json/top_level_array.json', columns={conclusion: 'VARCHAR'}, format='unstructured')
select * from read_json('data/json/top_level_array.json', columns={conclusion: 'VARCHAR'}, format='unstructured', records=true)
----
Invalid Input Error: JSON transform error in file "data/json/top_level_array.json", in record/value 1: Expected OBJECT, but got ARRAY

Expand Down Expand Up @@ -294,18 +294,18 @@ select * from '__TEST_DIR__/my_file.json'

# fails because it's not records
statement error
select * from read_json('__TEST_DIR__/my_file.json', format='array', columns={range: 'INTEGER'})
select * from read_json('__TEST_DIR__/my_file.json', format='array', columns={range: 'INTEGER'}, records=true)
----
Invalid Input Error: JSON transform error

# fails because it's not records
statement error
select * from read_json_auto('__TEST_DIR__/my_file.json', format='array', records='true')
select * from read_json_auto('__TEST_DIR__/my_file.json', format='array', records=true)
----
Binder Error: json_read expected records

query T
select * from read_json('__TEST_DIR__/my_file.json', format='auto', records='false', auto_detect=true)
select * from read_json('__TEST_DIR__/my_file.json', format='auto', records=false, auto_detect=true)
----
0
1
Expand Down Expand Up @@ -392,7 +392,7 @@ statement ok
pragma disable_verification

# test that we can read a JSON list that spans more than one buffer size
# the JSON is 55 bytes, and the minimum buffer size is 2097152
# the JSON is 55 bytes, and the minimum buffer size is 32MB
# let's do 50k to be safe
statement ok
copy (select list(to_json({this_is_a_very_long_field_name_yes_very_much_so:42})) from range(50000)) to '__TEST_DIR__/my_file.json' (format csv, quote '', header 0)
Expand Down

0 comments on commit 9699025

Please sign in to comment.