Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FILE_SIZE_BYTES #9920

Merged
merged 9 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions extension/json/json_functions/copy_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace duckdb {

static void ThrowJSONCopyParameterException(const string &loption) {
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.");
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.", loption);
}

static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
Expand All @@ -23,7 +23,8 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
// Parse the options, creating options for the CSV writer while doing so
string date_format;
string timestamp_format;
case_insensitive_map_t<vector<Value>> csv_copy_options;
// We insert the JSON file extension here so it works properly with PER_THREAD_OUTPUT/FILE_SIZE_BYTES etc.
case_insensitive_map_t<vector<Value>> csv_copy_options {{"file_extension", {"json"}}};
for (const auto &kv : info.options) {
const auto &loption = StringUtil::Lower(kv.first);
if (loption == "dateformat" || loption == "date_format") {
Expand All @@ -36,8 +37,6 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
ThrowJSONCopyParameterException(loption);
}
timestamp_format = StringValue::Get(kv.second.back());
} else if (loption == "compression") {
csv_copy_options.insert(kv);
} else if (loption == "array") {
if (kv.second.size() > 1) {
ThrowJSONCopyParameterException(loption);
Expand All @@ -47,6 +46,11 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
csv_copy_options["suffix"] = {"\n]\n"};
csv_copy_options["new_line"] = {",\n\t"};
}
} else if (loption == "compression" || loption == "encoding" || loption == "per_thread_output" ||
loption == "file_size_bytes" || loption == "use_tmp_file" || loption == "overwrite_or_ignore" ||
loption == "filename_pattern" || loption == "file_extension") {
// We support these base options
csv_copy_options.insert(kv);
} else {
throw BinderException("Unknown option for COPY ... TO ... (FORMAT JSON): \"%s\".", loption);
}
Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class ParquetWriter {
BufferedFileWriter &GetWriter() {
return *writer;
}
idx_t FileSize() {
lock_guard<mutex> glock(lock);
return writer->total_written;
}

static CopyTypeSupport TypeIsSupported(const LogicalType &type);

Expand Down
17 changes: 16 additions & 1 deletion extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,13 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo
throw NotImplementedException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
}
if (!row_group_size_bytes_set) {
if (row_group_size_bytes_set) {
if (DBConfig::GetConfig(context).options.preserve_insertion_order) {
throw BinderException("ROW_GROUP_SIZE_BYTES does not work while preserving insertion order. Use \"SET "
"preserve_insertion_order=false;\" to disable preserving insertion order.");
}
} else {
// We always set a max row group size bytes so we don't use too much memory
bind_data->row_group_size_bytes = bind_data->row_group_size * ParquetWriteBindData::BYTES_PER_ROW;
}

Expand Down Expand Up @@ -1178,6 +1184,14 @@ idx_t ParquetWriteDesiredBatchSize(ClientContext &context, FunctionData &bind_da
return bind_data.row_group_size;
}

//===--------------------------------------------------------------------===//
// Current File Size
//===--------------------------------------------------------------------===//
idx_t ParquetWriteFileSize(GlobalFunctionData &gstate) {
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
return global_state.writer->FileSize();
}

//===--------------------------------------------------------------------===//
// Scan Replacement
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1239,6 +1253,7 @@ void ParquetExtension::Load(DuckDB &db) {
function.prepare_batch = ParquetWritePrepareBatch;
function.flush_batch = ParquetWriteFlushBatch;
function.desired_batch_size = ParquetWriteDesiredBatchSize;
function.file_size_bytes = ParquetWriteFileSize;
function.serialize = ParquetCopySerialize;
function.deserialize = ParquetCopyDeserialize;
function.supports_type = ParquetWriter::TypeIsSupported;
Expand Down
Loading
Loading