Skip to content

Commit

Permalink
Merge pull request #9920 from lnkuiper/file_size_bytes
Browse files Browse the repository at this point in the history
Implement FILE_SIZE_BYTES
  • Loading branch information
Mytherin committed Jan 10, 2024
2 parents 0f41fe7 + daac5aa commit db09b50
Show file tree
Hide file tree
Showing 14 changed files with 433 additions and 131 deletions.
12 changes: 8 additions & 4 deletions extension/json/json_functions/copy_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace duckdb {

static void ThrowJSONCopyParameterException(const string &loption) {
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.");
throw BinderException("COPY (FORMAT JSON) parameter %s expects a single argument.", loption);
}

static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
Expand All @@ -23,7 +23,8 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
// Parse the options, creating options for the CSV writer while doing so
string date_format;
string timestamp_format;
case_insensitive_map_t<vector<Value>> csv_copy_options;
// We insert the JSON file extension here so it works properly with PER_THREAD_OUTPUT/FILE_SIZE_BYTES etc.
case_insensitive_map_t<vector<Value>> csv_copy_options {{"file_extension", {"json"}}};
for (const auto &kv : info.options) {
const auto &loption = StringUtil::Lower(kv.first);
if (loption == "dateformat" || loption == "date_format") {
Expand All @@ -36,8 +37,6 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
ThrowJSONCopyParameterException(loption);
}
timestamp_format = StringValue::Get(kv.second.back());
} else if (loption == "compression") {
csv_copy_options.insert(kv);
} else if (loption == "array") {
if (kv.second.size() > 1) {
ThrowJSONCopyParameterException(loption);
Expand All @@ -47,6 +46,11 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
csv_copy_options["suffix"] = {"\n]\n"};
csv_copy_options["new_line"] = {",\n\t"};
}
} else if (loption == "compression" || loption == "encoding" || loption == "per_thread_output" ||
loption == "file_size_bytes" || loption == "use_tmp_file" || loption == "overwrite_or_ignore" ||
loption == "filename_pattern" || loption == "file_extension") {
// We support these base options
csv_copy_options.insert(kv);
} else {
throw BinderException("Unknown option for COPY ... TO ... (FORMAT JSON): \"%s\".", loption);
}
Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class ParquetWriter {
BufferedFileWriter &GetWriter() {
return *writer;
}
idx_t FileSize() {
lock_guard<mutex> glock(lock);
return writer->total_written;
}

static CopyTypeSupport TypeIsSupported(const LogicalType &type);

Expand Down
17 changes: 16 additions & 1 deletion extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,13 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo
throw NotImplementedException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
}
if (!row_group_size_bytes_set) {
if (row_group_size_bytes_set) {
if (DBConfig::GetConfig(context).options.preserve_insertion_order) {
throw BinderException("ROW_GROUP_SIZE_BYTES does not work while preserving insertion order. Use \"SET "
"preserve_insertion_order=false;\" to disable preserving insertion order.");
}
} else {
// We always set a max row group size bytes so we don't use too much memory
bind_data->row_group_size_bytes = bind_data->row_group_size * ParquetWriteBindData::BYTES_PER_ROW;
}

Expand Down Expand Up @@ -1177,6 +1183,14 @@ idx_t ParquetWriteDesiredBatchSize(ClientContext &context, FunctionData &bind_da
return bind_data.row_group_size;
}

//===--------------------------------------------------------------------===//
// Current File Size
//===--------------------------------------------------------------------===//
idx_t ParquetWriteFileSize(GlobalFunctionData &gstate) {
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
return global_state.writer->FileSize();
}

//===--------------------------------------------------------------------===//
// Scan Replacement
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1238,6 +1252,7 @@ void ParquetExtension::Load(DuckDB &db) {
function.prepare_batch = ParquetWritePrepareBatch;
function.flush_batch = ParquetWriteFlushBatch;
function.desired_batch_size = ParquetWriteDesiredBatchSize;
function.file_size_bytes = ParquetWriteFileSize;
function.serialize = ParquetCopySerialize;
function.deserialize = ParquetCopyDeserialize;
function.supports_type = ParquetWriter::TypeIsSupported;
Expand Down
213 changes: 122 additions & 91 deletions src/execution/operator/persistent/physical_copy_to_file.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/common/hive_partitioning.hpp"
#include "duckdb/common/file_system.hpp"

#include "duckdb/common/file_opener.hpp"
#include "duckdb/common/types/uuid.hpp"
#include "duckdb/common/file_system.hpp"
#include "duckdb/common/hive_partitioning.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/common/types/uuid.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/storage/storage_lock.hpp"

#include <algorithm>

Expand All @@ -15,9 +17,9 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
explicit CopyToFunctionGlobalState(unique_ptr<GlobalFunctionData> global_state)
: rows_copied(0), last_file_offset(0), global_state(std::move(global_state)) {
}
mutex lock;
idx_t rows_copied;
idx_t last_file_offset;
StorageLock lock;
atomic<idx_t> rows_copied;
atomic<idx_t> last_file_offset;
unique_ptr<GlobalFunctionData> global_state;
idx_t created_directories = 0;

Expand All @@ -40,10 +42,74 @@ class CopyToFunctionLocalState : public LocalSinkState {
idx_t writer_offset;
};

unique_ptr<GlobalFunctionData> PhysicalCopyToFile::CreateFileState(ClientContext &context,
GlobalSinkState &sink) const {
auto &g = sink.Cast<CopyToFunctionGlobalState>();
idx_t this_file_offset = g.last_file_offset++;
auto &fs = FileSystem::GetFileSystem(context);
string output_path(filename_pattern.CreateFilename(fs, file_path, file_extension, this_file_offset));
if (fs.FileExists(output_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", output_path);
}
return function.copy_to_initialize_global(context, *bind_data, output_path);
}

unique_ptr<LocalSinkState> PhysicalCopyToFile::GetLocalSinkState(ExecutionContext &context) const {
if (partition_output) {
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();

auto state = make_uniq<CopyToFunctionLocalState>(nullptr);
state->writer_offset = g.last_file_offset++;
state->part_buffer =
make_uniq<HivePartitionedColumnData>(context.client, expected_types, partition_columns, g.partition_state);
state->part_buffer_append_state = make_uniq<PartitionedColumnDataAppendState>();
state->part_buffer->InitializeAppendState(*state->part_buffer_append_state);
return std::move(state);
}
auto res = make_uniq<CopyToFunctionLocalState>(function.copy_to_initialize_local(context, *bind_data));
if (per_thread_output) {
res->global_state = CreateFileState(context.client, *sink_state);
}
return std::move(res);
}

unique_ptr<GlobalSinkState> PhysicalCopyToFile::GetGlobalSinkState(ClientContext &context) const {

if (partition_output || per_thread_output || file_size_bytes.IsValid()) {
auto &fs = FileSystem::GetFileSystem(context);

if (fs.FileExists(file_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", file_path);
}
if (!fs.DirectoryExists(file_path)) {
fs.CreateDirectory(file_path);
} else if (!overwrite_or_ignore) {
idx_t n_files = 0;
fs.ListFiles(file_path, [&n_files](const string &path, bool) { n_files++; });
if (n_files > 0) {
throw IOException("Directory %s is not empty! Enable OVERWRITE_OR_IGNORE option to force writing",
file_path);
}
}

auto state = make_uniq<CopyToFunctionGlobalState>(nullptr);
if (!per_thread_output && file_size_bytes.IsValid()) {
state->global_state = CreateFileState(context, *state);
}

if (partition_output) {
state->partition_state = make_shared<GlobalHivePartitionState>();
}

return std::move(state);
}

return make_uniq<CopyToFunctionGlobalState>(function.copy_to_initialize_global(context, *bind_data, file_path));
}

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//

void PhysicalCopyToFile::MoveTmpFile(ClientContext &context, const string &tmp_file_path) {
auto &fs = FileSystem::GetFileSystem(context);
auto file_path = tmp_file_path.substr(0, tmp_file_path.length() - 4);
Expand All @@ -68,12 +134,39 @@ SinkResultType PhysicalCopyToFile::Sink(ExecutionContext &context, DataChunk &ch
return SinkResultType::NEED_MORE_INPUT;
}

{
lock_guard<mutex> glock(g.lock);
g.rows_copied += chunk.size();
g.rows_copied += chunk.size();

if (per_thread_output) {
auto &gstate = l.global_state;
function.copy_to_sink(context, *bind_data, *gstate, *l.local_state, chunk);

if (file_size_bytes.IsValid() && function.file_size_bytes(*gstate) > file_size_bytes.GetIndex()) {
function.copy_to_finalize(context.client, *bind_data, *gstate);
gstate = CreateFileState(context.client, *sink_state);
}
return SinkResultType::NEED_MORE_INPUT;
}

if (!file_size_bytes.IsValid()) {
function.copy_to_sink(context, *bind_data, *g.global_state, *l.local_state, chunk);
return SinkResultType::NEED_MORE_INPUT;
}

// FILE_SIZE_BYTES is set, but threads write to the same file, synchronize using lock
auto &gstate = g.global_state;
auto lock = g.lock.GetExclusiveLock();
if (function.file_size_bytes(*gstate) > file_size_bytes.GetIndex()) {
auto owned_gstate = std::move(gstate);
gstate = CreateFileState(context.client, *sink_state);
lock.reset();
function.copy_to_finalize(context.client, *bind_data, *owned_gstate);
} else {
lock.reset();
}
function.copy_to_sink(context, *bind_data, per_thread_output ? *l.global_state : *g.global_state, *l.local_state,
chunk);

lock = g.lock.GetSharedLock();
function.copy_to_sink(context, *bind_data, *gstate, *l.local_state, chunk);

return SinkResultType::NEED_MORE_INPUT;
}

Expand Down Expand Up @@ -121,7 +214,7 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
StringUtil::RTrim(trimmed_path, fs.PathSeparator(trimmed_path));
{
// create directories
lock_guard<mutex> global_lock(g.lock);
auto lock = g.lock.GetExclusiveLock();
lock_guard<mutex> global_lock_on_partition_state(g.partition_state->lock);
const auto &global_partitions = g.partition_state->partitions;
// global_partitions have partitions added only at the back, so it's fine to only traverse the last part
Expand All @@ -134,10 +227,10 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope

for (idx_t i = 0; i < partitions.size(); i++) {
string hive_path = GetDirectory(partition_columns, names, partition_key_map[i]->values, trimmed_path, fs);
string full_path(filename_pattern.CreateFilename(fs, hive_path, function.extension, l.writer_offset));
string full_path(filename_pattern.CreateFilename(fs, hive_path, file_extension, l.writer_offset));
if (fs.FileExists(full_path) && !overwrite_or_ignore) {
throw IOException("failed to create " + full_path +
", file exists! Enable OVERWRITE_OR_IGNORE option to force writing");
throw IOException(
"failed to create %s, file exists! Enable OVERWRITE_OR_IGNORE option to force writing", full_path);
}
// Create a writer for the current file
auto fun_data_global = function.copy_to_initialize_global(context.client, *bind_data, full_path);
Expand All @@ -150,16 +243,17 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
function.copy_to_combine(context, *bind_data, *fun_data_global, *fun_data_local);
function.copy_to_finalize(context.client, *bind_data, *fun_data_global);
}

return SinkCombineResultType::FINISHED;
}

if (function.copy_to_combine) {
function.copy_to_combine(context, *bind_data, per_thread_output ? *l.global_state : *g.global_state,
*l.local_state);

} else if (function.copy_to_combine) {
if (per_thread_output) {
// For PER_THREAD_OUTPUT, we can combine/finalize immediately
function.copy_to_combine(context, *bind_data, *l.global_state, *l.local_state);
function.copy_to_finalize(context.client, *bind_data, *l.global_state);
} else if (file_size_bytes.IsValid()) {
// File in global state may change with FILE_SIZE_BYTES, need to grab lock
auto lock = g.lock.GetSharedLock();
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
} else {
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
}
}

Expand All @@ -177,78 +271,15 @@ SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event,
function.copy_to_finalize(context, *bind_data, *gstate.global_state);

if (use_tmp_file) {
D_ASSERT(!per_thread_output); // FIXME
D_ASSERT(!partition_output); // FIXME
D_ASSERT(!per_thread_output);
D_ASSERT(!partition_output);
D_ASSERT(!file_size_bytes.IsValid());
MoveTmpFile(context, file_path);
}
}
return SinkFinalizeType::READY;
}

unique_ptr<LocalSinkState> PhysicalCopyToFile::GetLocalSinkState(ExecutionContext &context) const {
if (partition_output) {
auto state = make_uniq<CopyToFunctionLocalState>(nullptr);
{
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();
lock_guard<mutex> glock(g.lock);
state->writer_offset = g.last_file_offset++;

state->part_buffer = make_uniq<HivePartitionedColumnData>(context.client, expected_types, partition_columns,
g.partition_state);
state->part_buffer_append_state = make_uniq<PartitionedColumnDataAppendState>();
state->part_buffer->InitializeAppendState(*state->part_buffer_append_state);
}
return std::move(state);
}
auto res = make_uniq<CopyToFunctionLocalState>(function.copy_to_initialize_local(context, *bind_data));
if (per_thread_output) {
idx_t this_file_offset;
{
auto &g = sink_state->Cast<CopyToFunctionGlobalState>();
lock_guard<mutex> glock(g.lock);
this_file_offset = g.last_file_offset++;
}
auto &fs = FileSystem::GetFileSystem(context.client);
string output_path(filename_pattern.CreateFilename(fs, file_path, function.extension, this_file_offset));
if (fs.FileExists(output_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", output_path);
}
res->global_state = function.copy_to_initialize_global(context.client, *bind_data, output_path);
}
return std::move(res);
}

unique_ptr<GlobalSinkState> PhysicalCopyToFile::GetGlobalSinkState(ClientContext &context) const {

if (partition_output || per_thread_output) {
auto &fs = FileSystem::GetFileSystem(context);

if (fs.FileExists(file_path) && !overwrite_or_ignore) {
throw IOException("%s exists! Enable OVERWRITE_OR_IGNORE option to force writing", file_path);
}
if (!fs.DirectoryExists(file_path)) {
fs.CreateDirectory(file_path);
} else if (!overwrite_or_ignore) {
idx_t n_files = 0;
fs.ListFiles(file_path, [&n_files](const string &path, bool) { n_files++; });
if (n_files > 0) {
throw IOException("Directory %s is not empty! Enable OVERWRITE_OR_IGNORE option to force writing",
file_path);
}
}

auto state = make_uniq<CopyToFunctionGlobalState>(nullptr);

if (partition_output) {
state->partition_state = make_shared<GlobalHivePartitionState>();
}

return std::move(state);
}

return make_uniq<CopyToFunctionGlobalState>(function.copy_to_initialize_global(context, *bind_data, file_path));
}

//===--------------------------------------------------------------------===//
// Source
//===--------------------------------------------------------------------===//
Expand Down
Loading

0 comments on commit db09b50

Please sign in to comment.