Skip to content

Commit

Permalink
Merge remote-tracking branch 'blessed/master' into remove_optimize_mo…
Browse files Browse the repository at this point in the history
…ve_functions_out_of_any
  • Loading branch information
Algunenano committed Nov 24, 2023
2 parents eb5016a + 8aaf9a4 commit 037cbf3
Show file tree
Hide file tree
Showing 24 changed files with 313 additions and 101 deletions.
2 changes: 0 additions & 2 deletions base/harmful/harmful.c
Expand Up @@ -145,8 +145,6 @@ TRAP(qecvt)
TRAP(qfcvt)
TRAP(register_printf_function)
TRAP(seed48)
TRAP(select)
TRAP(pselect)
//TRAP(setenv)
TRAP(setfsent)
TRAP(setgrent)
Expand Down
2 changes: 1 addition & 1 deletion docs/en/operations/system-tables/blob_storage_log.md
@@ -1,7 +1,7 @@
---
slug: /en/operations/system-tables/blob_storage_log
---
# Blob Storage Operations Log
# blob_storage_log

Contains logging entries with information about various blob storage operations such as uploads and deletes.

Expand Down
2 changes: 1 addition & 1 deletion programs/disks/CommandCopy.cpp
Expand Up @@ -57,7 +57,7 @@ class CommandCopy final : public ICommand
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);

disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* read_settings= */ {}, /* write_settings= */ {});
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* read_settings= */ {}, /* write_settings= */ {}, /* cancellation_hook= */ {});
}
};
}
Expand Down
2 changes: 2 additions & 0 deletions programs/server/Server.cpp
Expand Up @@ -1163,6 +1163,8 @@ try
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif

NamedCollectionUtils::loadIfNot();

/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");

Expand Down
12 changes: 9 additions & 3 deletions src/Disks/DiskEncrypted.cpp
Expand Up @@ -324,7 +324,13 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
}


void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
void DiskEncrypted::copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook)
{
/// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk))
Expand All @@ -340,14 +346,14 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
auto wrapped_from_path = wrappedPath(from_dir);
auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir);
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, read_settings, write_settings);
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, read_settings, write_settings, cancellation_hook);
return;
}
}
}

/// Copy the file through buffers with deciphering.
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings, cancellation_hook);
}

std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
Expand Down
8 changes: 7 additions & 1 deletion src/Disks/DiskEncrypted.h
Expand Up @@ -112,7 +112,13 @@ class DiskEncrypted : public IDisk
delegate->listFiles(wrapped_path, file_names);
}

void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override;
void copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook) override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
Expand Down
10 changes: 8 additions & 2 deletions src/Disks/DiskLocal.cpp
Expand Up @@ -432,13 +432,19 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another);
}

void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
void DiskLocal::copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook)
{
/// If throttling was configured we cannot use copying directly.
if (isSameDiskType(*this, *to_disk) && !read_settings.local_throttler && !write_settings.local_throttler)
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings, cancellation_hook);
}

SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
Expand Down
8 changes: 7 additions & 1 deletion src/Disks/DiskLocal.h
Expand Up @@ -65,7 +65,13 @@ class DiskLocal : public IDisk

void replaceFile(const String & from_path, const String & to_path) override;

void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override;
void copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook) override;

void listFiles(const String & path, std::vector<String> & file_names) const override;

Expand Down
52 changes: 41 additions & 11 deletions src/Disks/IDisk.cpp
Expand Up @@ -24,14 +24,21 @@ bool IDisk::isDirectoryEmpty(const String & path) const
return !iterateDirectory(path)->isValid();
}

void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) /// NOLINT
void IDisk::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook
)
{
LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.",
getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path);

auto in = readFile(from_file_path, read_settings);
auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*in, *out);
copyData(*in, *out, cancellation_hook);
out->finalize();
}

Expand Down Expand Up @@ -80,15 +87,25 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const

using ResultsCollector = std::vector<std::future<void>>;

void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
void asyncCopy(
IDisk & from_disk,
String from_path,
IDisk & to_disk,
String to_path,
ThreadPool & pool,
ResultsCollector & results,
bool copy_root_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook)
{
if (from_disk.isFile(from_path))
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();

pool.scheduleOrThrowOnError(
[&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, promise, thread_group = CurrentThread::getGroup()]()
[&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, promise, thread_group = CurrentThread::getGroup(), &cancellation_hook]()
{
try
{
Expand All @@ -97,7 +114,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
if (thread_group)
CurrentThread::attachToGroup(thread_group);

from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings);
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings, cancellation_hook);
promise->set_value();
}
catch (...)
Expand All @@ -119,33 +136,46 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}

for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, read_settings, write_settings);
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, read_settings, write_settings, cancellation_hook);
}
}

void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings)
void IDisk::copyThroughBuffers(
const String & from_path,
const std::shared_ptr<IDisk> & to_disk,
const String & to_path,
bool copy_root_dir,
const ReadSettings & read_settings,
WriteSettings write_settings,
const std::function<void()> & cancellation_hook)
{
ResultsCollector results;

/// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
write_settings.s3_allow_parallel_part_upload = false;

asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings);
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings, cancellation_hook);

for (auto & result : results)
result.wait();
for (auto & result : results)
result.get(); /// May rethrow an exception
result.get(); /// May rethrow an exception
}


void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
void IDisk::copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook)
{
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);

copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, read_settings, write_settings);
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, read_settings, write_settings, cancellation_hook);
}

void IDisk::truncateFile(const String &, size_t)
Expand Down
20 changes: 17 additions & 3 deletions src/Disks/IDisk.h
Expand Up @@ -194,15 +194,22 @@ class IDisk : public Space
virtual void replaceFile(const String & from_path, const String & to_path) = 0;

/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings);
virtual void copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
const String & to_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook);

/// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
virtual void copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {});
const WriteSettings & write_settings = {},
const std::function<void()> & cancellation_hook = {});

/// List files at `path` and add their names to `file_names`
virtual void listFiles(const String & path, std::vector<String> & file_names) const = 0;
Expand Down Expand Up @@ -474,7 +481,14 @@ class IDisk : public Space
/// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation.
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings);
void copyThroughBuffers(
const String & from_path,
const std::shared_ptr<IDisk> & to_disk,
const String & to_path,
bool copy_root_dir,
const ReadSettings & read_settings,
WriteSettings write_settings,
const std::function<void()> & cancellation_hook);

virtual void checkAccessImpl(const String & path);

Expand Down
6 changes: 4 additions & 2 deletions src/Disks/ObjectStorages/DiskObjectStorage.cpp
Expand Up @@ -175,7 +175,9 @@ void DiskObjectStorage::copyFile( /// NOLINT
IDisk & to_disk,
const String & to_file_path,
const ReadSettings & read_settings,
const WriteSettings & write_settings)
const WriteSettings & write_settings,
const std::function<void()> & cancellation_hook
)
{
if (this == &to_disk)
{
Expand All @@ -187,7 +189,7 @@ void DiskObjectStorage::copyFile( /// NOLINT
else
{
/// Copy through buffers
IDisk::copyFile(from_file_path, to_disk, to_file_path, read_settings, write_settings);
IDisk::copyFile(from_file_path, to_disk, to_file_path, read_settings, write_settings, cancellation_hook);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/Disks/ObjectStorages/DiskObjectStorage.h
Expand Up @@ -163,7 +163,9 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
IDisk & to_disk,
const String & to_file_path,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {}) override;
const WriteSettings & write_settings = {},
const std::function<void()> & cancellation_hook = {}
) override;

void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;

Expand Down
38 changes: 16 additions & 22 deletions src/Functions/fromDaysSinceYearZero.cpp
@@ -1,20 +1,14 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/DateTimeTransforms.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/castColumn.h>

#include <Common/DateLUT.h>
#include <Common/typeid_cast.h>

#include <array>
#include <cmath>
Expand All @@ -23,7 +17,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
}

namespace
Expand All @@ -44,7 +39,6 @@ struct DateTraits32
template <typename Traits>
class FunctionFromDaysSinceYearZero : public IFunction
{

public:
static constexpr auto name = Traits::name;
using RawReturnType = typename Traits::ReturnDataType::FieldType;
Expand All @@ -58,9 +52,7 @@ class FunctionFromDaysSinceYearZero : public IFunction

DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors args{
{"days", &isNativeUInt<IDataType>, nullptr, "UInt*"}
};
FunctionArgumentDescriptors args{{"days", &isNativeInteger<IDataType>, nullptr, "Integer"}};

validateFunctionArgumentTypes(*this, arguments, args);

Expand All @@ -84,7 +76,8 @@ class FunctionFromDaysSinceYearZero : public IFunction
return false;
};

const bool success = try_type(UInt8{}) || try_type(UInt16{}) || try_type(UInt32{}) || try_type(UInt64{});
const bool success = try_type(UInt8{}) || try_type(UInt16{}) || try_type(UInt32{}) || try_type(UInt64{})
|| try_type(Int8{}) || try_type(Int16{}) || try_type(Int32{}) || try_type(Int64{});

if (!success)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column while execute function {}", getName());
Expand All @@ -99,13 +92,14 @@ class FunctionFromDaysSinceYearZero : public IFunction
auto & dst_data = result_column.getData();
dst_data.resize(rows_count);

using equivalent_integer = typename std::conditional_t<sizeof(T) == 4, UInt32, UInt64>;

for (size_t i = 0; i < rows_count; ++i)
{
auto raw_value = src_data[i];
auto value = static_cast<equivalent_integer>(raw_value);
dst_data[i] = static_cast<RawReturnType>(value - ToDaysSinceYearZeroImpl::DAYS_BETWEEN_YEARS_0_AND_1970);
auto value = src_data[i];
if (value < 0)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Expected a non-negative integer, got: {}", std::to_string(value));
/// prevent potential signed integer overflows (aka. undefined behavior) with Date32 results
auto value_uint64 = static_cast<UInt64>(value); /// NOLINT(bugprone-signed-char-misuse,cert-str34-c)
dst_data[i] = static_cast<RawReturnType>(value_uint64 - ToDaysSinceYearZeroImpl::DAYS_BETWEEN_YEARS_0_AND_1970);
}
}
};
Expand Down

0 comments on commit 037cbf3

Please sign in to comment.