Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FILE_SIZE_BYTES #9920

Merged
merged 9 commits into from Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions extension/json/json_functions/copy_json.cpp
Expand Up @@ -12,7 +12,7 @@
namespace duckdb {

static void ThrowJSONCopyParameterException(const string &loption) {
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.");
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.", loption);
}

static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
Expand All @@ -23,7 +23,8 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
// Parse the options, creating options for the CSV writer while doing so
string date_format;
string timestamp_format;
case_insensitive_map_t<vector<Value>> csv_copy_options;
// We insert the JSON file extension here so it works properly with PER_THREAD_OUTPUT/FILE_SIZE_BYTES etc.
case_insensitive_map_t<vector<Value>> csv_copy_options {{"file_extension", {"json"}}};
for (const auto &kv : info.options) {
const auto &loption = StringUtil::Lower(kv.first);
if (loption == "dateformat" || loption == "date_format") {
Expand All @@ -36,8 +37,6 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
ThrowJSONCopyParameterException(loption);
}
timestamp_format = StringValue::Get(kv.second.back());
} else if (loption == "compression") {
csv_copy_options.insert(kv);
} else if (loption == "array") {
if (kv.second.size() > 1) {
ThrowJSONCopyParameterException(loption);
Expand All @@ -47,6 +46,11 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
csv_copy_options["suffix"] = {"\n]\n"};
csv_copy_options["new_line"] = {",\n\t"};
}
} else if (loption == "compression" || loption == "encoding" || loption == "per_thread_output" ||
loption == "file_size_bytes" || loption == "use_tmp_file" || loption == "overwrite_or_ignore" ||
loption == "filename_pattern" || loption == "file_extension") {
// We support these base options
csv_copy_options.insert(kv);
} else {
throw BinderException("Unknown option for COPY ... TO ... (FORMAT JSON): \"%s\".", loption);
}
Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/include/parquet_writer.hpp
Expand Up @@ -87,6 +87,10 @@ class ParquetWriter {
BufferedFileWriter &GetWriter() {
return *writer;
}
idx_t FileSize() {
lock_guard<mutex> glock(lock);
return writer->total_written;
}

static CopyTypeSupport TypeIsSupported(const LogicalType &type);

Expand Down
17 changes: 16 additions & 1 deletion extension/parquet/parquet_extension.cpp
Expand Up @@ -984,7 +984,13 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo
throw NotImplementedException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
}
if (!row_group_size_bytes_set) {
if (row_group_size_bytes_set) {
if (DBConfig::GetConfig(context).options.preserve_insertion_order) {
throw BinderException("ROW_GROUP_SIZE_BYTES does not work while preserving insertion order. Use \"SET "
"preserve_insertion_order=false;\" to disable preserving insertion order.");
}
} else {
// We always set a max row group size bytes so we don't use too much memory
bind_data->row_group_size_bytes = bind_data->row_group_size * ParquetWriteBindData::BYTES_PER_ROW;
}

Expand Down Expand Up @@ -1177,6 +1183,14 @@ idx_t ParquetWriteDesiredBatchSize(ClientContext &context, FunctionData &bind_da
return bind_data.row_group_size;
}

//===--------------------------------------------------------------------===//
// Current File Size
//===--------------------------------------------------------------------===//
idx_t ParquetWriteFileSize(GlobalFunctionData &gstate) {
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
return global_state.writer->FileSize();
}

//===--------------------------------------------------------------------===//
// Scan Replacement
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1238,6 +1252,7 @@ void ParquetExtension::Load(DuckDB &db) {
function.prepare_batch = ParquetWritePrepareBatch;
function.flush_batch = ParquetWriteFlushBatch;
function.desired_batch_size = ParquetWriteDesiredBatchSize;
function.file_size_bytes = ParquetWriteFileSize;
function.serialize = ParquetCopySerialize;
function.deserialize = ParquetCopyDeserialize;
function.supports_type = ParquetWriter::TypeIsSupported;
Expand Down
213 changes: 122 additions & 91 deletions src/execution/operator/persistent/physical_copy_to_file.cpp
@@ -1,10 +1,12 @@
#include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/common/hive_partitioning.hpp"
#include "duckdb/common/file_system.hpp"

#include "duckdb/common/file_opener.hpp"
#include "duckdb/common/types/uuid.hpp"
#include "duckdb/common/file_system.hpp"
#include "duckdb/common/hive_partitioning.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/common/types/uuid.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/storage/storage_lock.hpp"

#include <algorithm>

Expand All @@ -15,9 +17,9 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
explicit CopyToFunctionGlobalState(unique_ptr<GlobalFunctionData> global_state)
: rows_copied(0), last_file_offset(0), global_state(std::move(global_state)) {
}
mutex lock;
idx_t rows_copied;
idx_t last_file_offset;
StorageLock lock;
atomic<idx_t> rows_copied;
atomic<idx_t> last_file_offset;
unique_ptr<GlobalFunctionData> global_state;
idx_t created_directories = 0;

Expand All @@ -40,10 +42,74 @@ class CopyToFunctionLocalState : public LocalSinkState {
idx_t writer_offset;
};

unique_ptr<GlobalFunctionData> PhysicalCopyToFile::CreateFileState(ClientContext &context,
GlobalSinkState &sink) const {
auto &g = sink.Cast<CopyToFunctionGlobalState>();
idx_t this_file_offset = g.last_file_offset++;
auto &fs = FileSystem::GetFileSystem(context);
string output_path(filename_pattern.CreateFilename(fs, file_path, file_extension, this_file_offset));
if (fs.FileExists(output_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", output_path);
}
return function.copy_to_initialize_global(context, *bind_data, output_path);
}

unique_ptr<LocalSinkState> PhysicalCopyToFile::GetLocalSinkState(ExecutionContext &context) const {
if (partition_output) {
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();

auto state = make_uniq<CopyToFunctionLocalState>(nullptr);
state->writer_offset = g.last_file_offset++;
state->part_buffer =
make_uniq<HivePartitionedColumnData>(context.client, expected_types, partition_columns, g.partition_state);
state->part_buffer_append_state = make_uniq<PartitionedColumnDataAppendState>();
state->part_buffer->InitializeAppendState(*state->part_buffer_append_state);
return std::move(state);
}
auto res = make_uniq<CopyToFunctionLocalState>(function.copy_to_initialize_local(context, *bind_data));
if (per_thread_output) {
res->global_state = CreateFileState(context.client, *sink_state);
}
return std::move(res);
}

unique_ptr<GlobalSinkState> PhysicalCopyToFile::GetGlobalSinkState(ClientContext &context) const {

if (partition_output || per_thread_output || file_size_bytes.IsValid()) {
auto &fs = FileSystem::GetFileSystem(context);

if (fs.FileExists(file_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", file_path);
}
if (!fs.DirectoryExists(file_path)) {
fs.CreateDirectory(file_path);
} else if (!overwrite_or_ignore) {
idx_t n_files = 0;
fs.ListFiles(file_path, [&n_files](const string &path, bool) { n_files++; });
if (n_files > 0) {
throw IOException("Directory %s is not empty! Enable OVERWRITE_OR_IGNORE option to force writing",
file_path);
}
}

auto state = make_uniq<CopyToFunctionGlobalState>(nullptr);
if (!per_thread_output && file_size_bytes.IsValid()) {
state->global_state = CreateFileState(context, *state);
}

if (partition_output) {
state->partition_state = make_shared<GlobalHivePartitionState>();
}

return std::move(state);
}

return make_uniq<CopyToFunctionGlobalState>(function.copy_to_initialize_global(context, *bind_data, file_path));
}

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//

void PhysicalCopyToFile::MoveTmpFile(ClientContext &context, const string &tmp_file_path) {
auto &fs = FileSystem::GetFileSystem(context);
auto file_path = tmp_file_path.substr(0, tmp_file_path.length() - 4);
Expand All @@ -68,12 +134,39 @@ SinkResultType PhysicalCopyToFile::Sink(ExecutionContext &context, DataChunk &ch
return SinkResultType::NEED_MORE_INPUT;
}

{
lock_guard<mutex> glock(g.lock);
g.rows_copied += chunk.size();
g.rows_copied += chunk.size();

if (per_thread_output) {
auto &gstate = l.global_state;
function.copy_to_sink(context, *bind_data, *gstate, *l.local_state, chunk);

if (file_size_bytes.IsValid() && function.file_size_bytes(*gstate) > file_size_bytes.GetIndex()) {
function.copy_to_finalize(context.client, *bind_data, *gstate);
gstate = CreateFileState(context.client, *sink_state);
}
return SinkResultType::NEED_MORE_INPUT;
}

if (!file_size_bytes.IsValid()) {
function.copy_to_sink(context, *bind_data, *g.global_state, *l.local_state, chunk);
return SinkResultType::NEED_MORE_INPUT;
}

// FILE_SIZE_BYTES is set, but threads write to the same file, synchronize using lock
auto &gstate = g.global_state;
auto lock = g.lock.GetExclusiveLock();
if (function.file_size_bytes(*gstate) > file_size_bytes.GetIndex()) {
auto owned_gstate = std::move(gstate);
gstate = CreateFileState(context.client, *sink_state);
lock.reset();
function.copy_to_finalize(context.client, *bind_data, *owned_gstate);
} else {
lock.reset();
}
function.copy_to_sink(context, *bind_data, per_thread_output ? *l.global_state : *g.global_state, *l.local_state,
chunk);

lock = g.lock.GetSharedLock();
function.copy_to_sink(context, *bind_data, *gstate, *l.local_state, chunk);

return SinkResultType::NEED_MORE_INPUT;
}

Expand Down Expand Up @@ -121,7 +214,7 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
StringUtil::RTrim(trimmed_path, fs.PathSeparator(trimmed_path));
{
// create directories
lock_guard<mutex> global_lock(g.lock);
auto lock = g.lock.GetExclusiveLock();
lock_guard<mutex> global_lock_on_partition_state(g.partition_state->lock);
const auto &global_partitions = g.partition_state->partitions;
// global_partitions have partitions added only at the back, so it's fine to only traverse the last part
Expand All @@ -134,10 +227,10 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope

for (idx_t i = 0; i < partitions.size(); i++) {
string hive_path = GetDirectory(partition_columns, names, partition_key_map[i]->values, trimmed_path, fs);
string full_path(filename_pattern.CreateFilename(fs, hive_path, function.extension, l.writer_offset));
string full_path(filename_pattern.CreateFilename(fs, hive_path, file_extension, l.writer_offset));
if (fs.FileExists(full_path) && !overwrite_or_ignore) {
throw IOException("failed to create " + full_path +
", file exists! Enable OVERWRITE_OR_IGNORE option to force writing");
throw IOException(
"failed to create %s, file exists! Enable OVERWRITE_OR_IGNORE option to force writing", full_path);
}
// Create a writer for the current file
auto fun_data_global = function.copy_to_initialize_global(context.client, *bind_data, full_path);
Expand All @@ -150,16 +243,17 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
function.copy_to_combine(context, *bind_data, *fun_data_global, *fun_data_local);
function.copy_to_finalize(context.client, *bind_data, *fun_data_global);
}

return SinkCombineResultType::FINISHED;
}

if (function.copy_to_combine) {
function.copy_to_combine(context, *bind_data, per_thread_output ? *l.global_state : *g.global_state,
*l.local_state);

} else if (function.copy_to_combine) {
if (per_thread_output) {
// For PER_THREAD_OUTPUT, we can combine/finalize immediately
function.copy_to_combine(context, *bind_data, *l.global_state, *l.local_state);
function.copy_to_finalize(context.client, *bind_data, *l.global_state);
} else if (file_size_bytes.IsValid()) {
// File in global state may change with FILE_SIZE_BYTES, need to grab lock
auto lock = g.lock.GetSharedLock();
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
} else {
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
}
}

Expand All @@ -177,78 +271,15 @@ SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event,
function.copy_to_finalize(context, *bind_data, *gstate.global_state);

if (use_tmp_file) {
D_ASSERT(!per_thread_output); // FIXME
D_ASSERT(!partition_output); // FIXME
D_ASSERT(!per_thread_output);
D_ASSERT(!partition_output);
D_ASSERT(!file_size_bytes.IsValid());
MoveTmpFile(context, file_path);
}
}
return SinkFinalizeType::READY;
}

unique_ptr<LocalSinkState> PhysicalCopyToFile::GetLocalSinkState(ExecutionContext &context) const {
if (partition_output) {
auto state = make_uniq<CopyToFunctionLocalState>(nullptr);
{
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();
lock_guard<mutex> glock(g.lock);
state->writer_offset = g.last_file_offset++;

state->part_buffer = make_uniq<HivePartitionedColumnData>(context.client, expected_types, partition_columns,
g.partition_state);
state->part_buffer_append_state = make_uniq<PartitionedColumnDataAppendState>();
state->part_buffer->InitializeAppendState(*state->part_buffer_append_state);
}
return std::move(state);
}
auto res = make_uniq<CopyToFunctionLocalState>(function.copy_to_initialize_local(context, *bind_data));
if (per_thread_output) {
idx_t this_file_offset;
{
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();
lock_guard<mutex> glock(g.lock);
this_file_offset = g.last_file_offset++;
}
auto &fs = FileSystem::GetFileSystem(context.client);
string output_path(filename_pattern.CreateFilename(fs, file_path, function.extension, this_file_offset));
if (fs.FileExists(output_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", output_path);
}
res->global_state = function.copy_to_initialize_global(context.client, *bind_data, output_path);
}
return std::move(res);
}

unique_ptr<GlobalSinkState> PhysicalCopyToFile::GetGlobalSinkState(ClientContext &context) const {

if (partition_output || per_thread_output) {
auto &fs = FileSystem::GetFileSystem(context);

if (fs.FileExists(file_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", file_path);
}
if (!fs.DirectoryExists(file_path)) {
fs.CreateDirectory(file_path);
} else if (!overwrite_or_ignore) {
idx_t n_files = 0;
fs.ListFiles(file_path, [&n_files](const string &path, bool) { n_files++; });
if (n_files > 0) {
throw IOException("Directory %s is not empty! Enable OVERWRITE_OR_IGNORE option to force writing",
file_path);
}
}

auto state = make_uniq<CopyToFunctionGlobalState>(nullptr);

if (partition_output) {
state->partition_state = make_shared<GlobalHivePartitionState>();
}

return std::move(state);
}

return make_uniq<CopyToFunctionGlobalState>(function.copy_to_initialize_global(context, *bind_data, file_path));
}

//===--------------------------------------------------------------------===//
// Source
//===--------------------------------------------------------------------===//
Expand Down