Skip to content

Commit

Permalink
Fix #9360, fix #9466: grab a lock before creating directories to fix …
Browse files Browse the repository at this point in the history
…race condition on Windows in partitioned write
  • Loading branch information
Mytherin committed Oct 25, 2023
1 parent 10f9bd9 commit 65103b0
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions src/execution/operator/persistent/physical_copy_to_file.cpp
Expand Up @@ -19,6 +19,7 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
idx_t rows_copied;
idx_t last_file_offset;
unique_ptr<GlobalFunctionData> global_state;
bool created_directories = false;

//! shared state for HivePartitionedColumnData
shared_ptr<GlobalHivePartitionState> partition_state;
Expand Down Expand Up @@ -82,8 +83,8 @@ static void CreateDir(const string &dir_path, FileSystem &fs) {
}
}

static string CreateDirRecursive(const vector<idx_t> &cols, const vector<string> &names, const vector<Value> &values,
string path, FileSystem &fs) {
static void CreateDirectories(const vector<idx_t> &cols, const vector<string> &names, const vector<Value> &values,
string path, FileSystem &fs) {
CreateDir(path, fs);

for (idx_t i = 0; i < cols.size(); i++) {
Expand All @@ -93,7 +94,16 @@ static string CreateDirRecursive(const vector<idx_t> &cols, const vector<string>
path = fs.JoinPath(path, p_dir);
CreateDir(path, fs);
}
}

static string GetDirectory(const vector<idx_t> &cols, const vector<string> &names, const vector<Value> &values,
string path, FileSystem &fs) {
for (idx_t i = 0; i < cols.size(); i++) {
const auto &partition_col_name = names[cols[i]];
const auto &partition_value = values[i];
string p_dir = partition_col_name + "=" + partition_value.ToString();
path = fs.JoinPath(path, p_dir);
}
return path;
}

Expand All @@ -109,10 +119,19 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope

string trimmed_path = file_path;
StringUtil::RTrim(trimmed_path, fs.PathSeparator(trimmed_path));
{
// create directories
lock_guard<mutex> global_lock(g.lock);
if (!g.created_directories) {
for (idx_t i = 0; i < partitions.size(); i++) {
CreateDirectories(partition_columns, names, partition_key_map[i]->values, trimmed_path, fs);
}
g.created_directories = true;
}
}

for (idx_t i = 0; i < partitions.size(); i++) {
string hive_path =
CreateDirRecursive(partition_columns, names, partition_key_map[i]->values, trimmed_path, fs);
string hive_path = GetDirectory(partition_columns, names, partition_key_map[i]->values, trimmed_path, fs);
string full_path(filename_pattern.CreateFilename(fs, hive_path, function.extension, l.writer_offset));
if (fs.FileExists(full_path) && !overwrite_or_ignore) {
throw IOException("failed to create " + full_path +
Expand Down

0 comments on commit 65103b0

Please sign in to comment.