Skip to content

Commit

Permalink
Merge pull request #53338 from azat/throttling-fixes
Browse files Browse the repository at this point in the history
Fix IO throttling during copying whole directories
  • Loading branch information
serxa committed Sep 1, 2023
2 parents 3f23e3e + aaa68a5 commit cf5ea46
Show file tree
Hide file tree
Showing 30 changed files with 172 additions and 70 deletions.
2 changes: 1 addition & 1 deletion programs/disks/CommandCopy.cpp
Expand Up @@ -59,7 +59,7 @@ class CommandCopy final : public ICommand
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);

disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to);
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* settings= */ {});
}
};
}
Expand Down
6 changes: 3 additions & 3 deletions src/Disks/DiskEncrypted.cpp
Expand Up @@ -324,7 +324,7 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
}


void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
{
/// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk))
Expand All @@ -340,14 +340,14 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
auto wrapped_from_path = wrappedPath(from_dir);
auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir);
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path);
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, settings);
return;
}
}
}

/// Copy the file through buffers with deciphering.
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings);
}

std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskEncrypted.h
Expand Up @@ -112,7 +112,7 @@ class DiskEncrypted : public IDisk
delegate->listFiles(wrapped_path, file_names);
}

void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/DiskEncryptedTransaction.cpp
Expand Up @@ -53,11 +53,11 @@ String DiskEncryptedSettings::findKeyByFingerprint(UInt128 key_fingerprint, cons
return it->second;
}

void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings)
{
auto wrapped_from_path = wrappedPath(from_file_path);
auto wrapped_to_path = wrappedPath(to_file_path);
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path);
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, settings);
}

std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( // NOLINT
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskEncryptedTransaction.h
Expand Up @@ -116,7 +116,7 @@ class DiskEncryptedTransaction : public IDiskTransaction
/// but it's impossible to implement correctly in transactions because other disk can
/// use different metadata storage.
/// TODO: maybe remove it at all, we don't want copies
void copyFile(const std::string & from_file_path, const std::string & to_file_path) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override;

/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
Expand Down
7 changes: 4 additions & 3 deletions src/Disks/DiskLocal.cpp
Expand Up @@ -432,12 +432,13 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another);
}

void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
{
if (isSameDiskType(*this, *to_disk))
/// If throttling was configured we cannot use copying directly.
if (isSameDiskType(*this, *to_disk) && !settings.local_throttler)
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings);
}

SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskLocal.h
Expand Up @@ -65,7 +65,7 @@ class DiskLocal : public IDisk

void replaceFile(const String & from_path, const String & to_path) override;

void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override;

void listFiles(const String & path, std::vector<String> & file_names) const override;

Expand Down
4 changes: 2 additions & 2 deletions src/Disks/FakeDiskTransaction.h
Expand Up @@ -54,9 +54,9 @@ struct FakeDiskTransaction final : public IDiskTransaction
disk.replaceFile(from_path, to_path);
}

void copyFile(const std::string & from_file_path, const std::string & to_file_path) override
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override
{
disk.copyFile(from_file_path, disk, to_file_path);
disk.copyFile(from_file_path, disk, to_file_path, settings);
}

std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
Expand Down
8 changes: 4 additions & 4 deletions src/Disks/IDisk.cpp
Expand Up @@ -3,6 +3,7 @@
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <Poco/Logger.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/ServerUUID.h>
Expand Down Expand Up @@ -122,11 +123,10 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}
}

void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir)
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings)
{
ResultsCollector results;

WriteSettings settings;
/// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
settings.s3_allow_parallel_part_upload = false;
Expand All @@ -140,12 +140,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
}


void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
{
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);

copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false);
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, settings);
}

void IDisk::truncateFile(const String &, size_t)
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/IDisk.h
Expand Up @@ -193,7 +193,7 @@ class IDisk : public Space
virtual void replaceFile(const String & from_path, const String & to_path) = 0;

/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir);
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings);

/// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
virtual void copyFile( /// NOLINT
Expand Down Expand Up @@ -470,7 +470,7 @@ class IDisk : public Space
/// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation.
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir = true);
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings);

virtual void checkAccessImpl(const String & path);

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IDiskTransaction.h
Expand Up @@ -59,7 +59,7 @@ struct IDiskTransaction : private boost::noncopyable
/// but it's impossible to implement correctly in transactions because other disk can
/// use different metadata storage.
/// TODO: maybe remove it at all, we don't want copies
virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path) = 0;
virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings = {}) = 0;

/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
Expand Down
6 changes: 5 additions & 1 deletion src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Expand Up @@ -5,6 +5,7 @@
#include <ranges>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <base/defines.h>

#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>

Expand Down Expand Up @@ -769,8 +770,11 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
}));
}

void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings)
{
/// NOTE: For native copy we can ignore throttling, so no need to use WriteSettings
UNUSED(settings);

operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path));
}
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/ObjectStorages/DiskObjectStorageTransaction.h
Expand Up @@ -86,7 +86,7 @@ struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable

void createFile(const String & path) override;

void copyFile(const std::string & from_file_path, const std::string & to_file_path) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override;

/// writeFile is a difficult function for transactions.
/// Now it's almost noop because metadata added to transaction in finalize method
Expand Down
16 changes: 13 additions & 3 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
Expand Up @@ -416,6 +416,7 @@ void DataPartStorageOnDiskBase::backup(
MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
const std::string & to,
const std::string & dir_path,
const WriteSettings & settings,
std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const
{
Expand All @@ -425,8 +426,16 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
else
disk->createDirectories(to);

localBackup(disk, getRelativePath(), fs::path(to) / dir_path, params.make_source_readonly, {}, params.copy_instead_of_hardlink,
params.files_to_copy_instead_of_hardlinks, params.external_transaction);
localBackup(
disk,
getRelativePath(),
fs::path(to) / dir_path,
settings,
params.make_source_readonly,
/* max_level= */ {},
params.copy_instead_of_hardlink,
params.files_to_copy_instead_of_hardlinks,
params.external_transaction);

if (save_metadata_callback)
save_metadata_callback(disk);
Expand Down Expand Up @@ -457,6 +466,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & dst_disk,
const WriteSettings & write_settings,
Poco::Logger * log) const
{
String path_to_clone = fs::path(to) / dir_path / "";
Expand All @@ -472,7 +482,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
try
{
dst_disk->createDirectories(to);
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone);
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, write_settings);
}
catch (...)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.h
Expand Up @@ -63,13 +63,15 @@ class DataPartStorageOnDiskBase : public IDataPartStorage
MutableDataPartStoragePtr freeze(
const std::string & to,
const std::string & dir_path,
const WriteSettings & settings,
std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const override;

MutableDataPartStoragePtr clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & dst_disk,
const WriteSettings & write_settings,
Poco::Logger * log) const override;

void rename(
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/IDataPartStorage.h
Expand Up @@ -252,6 +252,7 @@ class IDataPartStorage : public boost::noncopyable
virtual std::shared_ptr<IDataPartStorage> freeze(
const std::string & to,
const std::string & dir_path,
const WriteSettings & settings,
std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const = 0;

Expand All @@ -260,6 +261,7 @@ class IDataPartStorage : public boost::noncopyable
const std::string & to,
const std::string & dir_path,
const DiskPtr & disk,
const WriteSettings & write_settings,
Poco::Logger * log) const = 0;

/// Change part's root. from_root should be a prefix path of current root path.
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -1802,11 +1802,12 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
return getDataPartStorage().freeze(
storage.relative_data_path,
*maybe_path_in_detached,
/*save_metadata_callback=*/ {},
Context::getGlobalContextInstance()->getWriteSettings(),
/* save_metadata_callback= */ {},
params);
}

MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const
{
assertOnDisk();

Expand All @@ -1816,7 +1817,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & di
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);

String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, storage.log);
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, write_settings, storage.log);
}

UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Expand Up @@ -377,7 +377,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
const DiskTransactionPtr & disk_transaction) const;

/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const;

/// Checks that .bin and .mrk files exist.
///
Expand Down

0 comments on commit cf5ea46

Please sign in to comment.