Skip to content

Commit

Permalink
Merge branch 'master' into improve_concurrent_parts_removal
Browse files Browse the repository at this point in the history
  • Loading branch information
tavplubix committed May 17, 2023
2 parents c1c210d + 15ebbd2 commit 9a824a0
Show file tree
Hide file tree
Showing 131 changed files with 4,917 additions and 1,589 deletions.
18 changes: 13 additions & 5 deletions contrib/boost-cmake/CMakeLists.txt
Expand Up @@ -103,11 +103,19 @@ set (SRCS_CONTEXT
)

if (ARCH_AARCH64)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_elf_gas.S"
)
if (OS_DARWIN)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_macho_gas.S"
)
else()
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_elf_gas.S"
)
endif()
elseif (ARCH_PPC64LE)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_ppc64_sysv_elf_gas.S"
Expand Down
25 changes: 20 additions & 5 deletions contrib/googletest-cmake/CMakeLists.txt
@@ -1,15 +1,30 @@
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest")
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest")

add_library(_gtest "${SRC_DIR}/src/gtest-all.cc")
add_library(_gtest "${SRC_DIR}/googletest/src/gtest-all.cc")
set_target_properties(_gtest PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gtest PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}")
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/googletest/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}/googletest")

add_library(_gtest_main "${SRC_DIR}/src/gtest_main.cc")
add_library(_gtest_main "${SRC_DIR}/googletest/src/gtest_main.cc")
set_target_properties(_gtest_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gtest_main PUBLIC _gtest)

add_library(_gtest_all INTERFACE)
target_link_libraries(_gtest_all INTERFACE _gtest _gtest_main)
add_library(ch_contrib::gtest_all ALIAS _gtest_all)


add_library(_gmock "${SRC_DIR}/googlemock/src/gmock-all.cc")
set_target_properties(_gmock PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gmock PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gmock SYSTEM PUBLIC "${SRC_DIR}/googlemock/include" "${SRC_DIR}/googletest/include")
target_include_directories(_gmock PRIVATE "${SRC_DIR}/googlemock")

add_library(_gmock_main "${SRC_DIR}/googlemock/src/gmock_main.cc")
set_target_properties(_gmock_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gmock_main PUBLIC _gmock)

add_library(_gmock_all INTERFACE)
target_link_libraries(_gmock_all INTERFACE _gmock _gmock_main)
add_library(ch_contrib::gmock_all ALIAS _gmock_all)
4 changes: 2 additions & 2 deletions docs/en/engines/table-engines/integrations/kafka.md
Expand Up @@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
Expand Down
6 changes: 3 additions & 3 deletions programs/server/Server.cpp
Expand Up @@ -1872,7 +1872,7 @@ try
}

if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
LOG_WARNING(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");

Expand All @@ -1884,7 +1884,7 @@ try
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));

if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain."
LOG_WARNING(log, "Closed connections. But {} remain."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections);
else
LOG_INFO(log, "Closed connections.");
Expand All @@ -1900,7 +1900,7 @@ try

/// Dump coverage here, because std::atexit callback would not be called.
dumpCoverageReportIfPossible();
LOG_INFO(log, "Will shutdown forcefully.");
LOG_WARNING(log, "Will shutdown forcefully.");
safeExit(0);
}
});
Expand Down
17 changes: 13 additions & 4 deletions src/AggregateFunctions/AggregateFunctionKolmogorovSmirnovTest.h
Expand Up @@ -43,6 +43,7 @@ struct KolmogorovSmirnov : public StatisticalSample<Float64, Float64>
Float64 now_s = 0;
UInt64 pos_x = 0;
UInt64 pos_y = 0;
UInt64 pos_tmp;
UInt64 n1 = x.size();
UInt64 n2 = y.size();

Expand All @@ -65,14 +66,22 @@ struct KolmogorovSmirnov : public StatisticalSample<Float64, Float64>
now_s -= n2_d;
++pos_y;
}
max_s = std::max(max_s, now_s);
min_s = std::min(min_s, now_s);
}
else
{
now_s += n1_d;
++pos_x;
pos_tmp = pos_x + 1;
while (pos_tmp < x.size() && unlikely(fabs(x[pos_tmp] - x[pos_x]) <= tol))
pos_tmp++;
now_s += n1_d * (pos_tmp - pos_x);
pos_x = pos_tmp;
pos_tmp = pos_y + 1;
while (pos_tmp < y.size() && unlikely(fabs(y[pos_tmp] - y[pos_y]) <= tol))
pos_tmp++;
now_s -= n2_d * (pos_tmp - pos_y);
pos_y = pos_tmp;
}
max_s = std::max(max_s, now_s);
min_s = std::min(min_s, now_s);
}
now_s += n1_d * (x.size() - pos_x) - n2_d * (y.size() - pos_y);
min_s = std::min(min_s, now_s);
Expand Down
11 changes: 8 additions & 3 deletions src/Analyzer/FunctionNode.cpp
Expand Up @@ -209,15 +209,20 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
function_ast->kind = ASTFunction::Kind::WINDOW_FUNCTION;
}

auto new_options = options;
/// To avoid surrounding constants with several internal casts.
if (function_name == "_CAST" && (*getArguments().begin())->getNodeType() == QueryTreeNodeType::CONSTANT)
new_options.add_cast_for_constants = false;

const auto & parameters = getParameters();
if (!parameters.getNodes().empty())
{
function_ast->children.push_back(parameters.toAST(options));
function_ast->children.push_back(parameters.toAST(new_options));
function_ast->parameters = function_ast->children.back();
}

const auto & arguments = getArguments();
function_ast->children.push_back(arguments.toAST(options));
function_ast->children.push_back(arguments.toAST(new_options));
function_ast->arguments = function_ast->children.back();

auto window_node = getWindowNode();
Expand All @@ -226,7 +231,7 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
if (auto * identifier_node = window_node->as<IdentifierNode>())
function_ast->window_name = identifier_node->getIdentifier().getFullName();
else
function_ast->window_definition = window_node->toAST(options);
function_ast->window_definition = window_node->toAST(new_options);
}

return function_ast;
Expand Down
2 changes: 2 additions & 0 deletions src/Backups/BackupCoordinationRemote.cpp
Expand Up @@ -115,6 +115,7 @@ namespace
writeBinary(info.checksum, out);
writeBinary(info.base_size, out);
writeBinary(info.base_checksum, out);
writeBinary(info.encrypted_by_disk, out);
/// We don't store `info.data_file_name` and `info.data_file_index` because they're determined automalically
/// after reading file infos for all the hosts (see the class BackupCoordinationFileInfos).
}
Expand All @@ -136,6 +137,7 @@ namespace
readBinary(info.checksum, in);
readBinary(info.base_size, in);
readBinary(info.base_checksum, in);
readBinary(info.encrypted_by_disk, in);
}
return res;
}
Expand Down
41 changes: 30 additions & 11 deletions src/Backups/BackupEntryFromAppendOnlyFile.cpp
@@ -1,26 +1,45 @@
#include <Backups/BackupEntryFromAppendOnlyFile.h>
#include <Disks/IDisk.h>
#include <IO/LimitSeekableReadBuffer.h>


namespace DB
{

namespace
{
/// For append-only files we must calculate its size on the construction of a backup entry.
UInt64 calculateSize(const DiskPtr & disk, const String & file_path, bool copy_encrypted, std::optional<UInt64> unencrypted_file_size)
{
if (!unencrypted_file_size)
return copy_encrypted ? disk->getEncryptedFileSize(file_path) : disk->getFileSize(file_path);
else if (copy_encrypted)
return disk->getEncryptedFileSize(*unencrypted_file_size);
else
return *unencrypted_file_size;
}
}

BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
: BackupEntryFromImmutableFile(disk_, file_path_, settings_, file_size_, checksum_, temporary_file_)
, limit(BackupEntryFromImmutableFile::getSize())
const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_, const std::optional<UInt64> & file_size_)
: disk(disk_)
, file_path(file_path_)
, data_source_description(disk->getDataSourceDescription())
, copy_encrypted(copy_encrypted_ && data_source_description.is_encrypted)
, size(calculateSize(disk_, file_path_, copy_encrypted, file_size_))
{
}

std::unique_ptr<SeekableReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
BackupEntryFromAppendOnlyFile::~BackupEntryFromAppendOnlyFile() = default;

std::unique_ptr<SeekableReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer(const ReadSettings & read_settings) const
{
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), 0, limit);
std::unique_ptr<SeekableReadBuffer> buf;
if (copy_encrypted)
buf = disk->readEncryptedFile(file_path, read_settings.adjustBufferSize(size));
else
buf = disk->readFile(file_path, read_settings.adjustBufferSize(size));
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), 0, size);
}

}
32 changes: 21 additions & 11 deletions src/Backups/BackupEntryFromAppendOnlyFile.h
@@ -1,31 +1,41 @@
#pragma once

#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryWithChecksumCalculation.h>


namespace DB
{

/// Represents a file prepared to be included in a backup, assuming that until this backup entry is destroyed
/// the file can be appended with new data, but the bytes which are already in the file won't be changed.
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
class BackupEntryFromAppendOnlyFile : public BackupEntryWithChecksumCalculation<IBackupEntry>
{
public:

/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
/// The constructor is allowed to not set `file_size_`, in that case it will be calculated from the data.
BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
bool copy_encrypted_ = false,
const std::optional<UInt64> & file_size_ = {});

~BackupEntryFromAppendOnlyFile() override;

std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override;
UInt64 getSize() const override { return size; }

DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
bool isEncryptedByDisk() const override { return copy_encrypted; }

UInt64 getSize() const override { return limit; }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
bool isFromFile() const override { return true; }
DiskPtr getDisk() const override { return disk; }
String getFilePath() const override { return file_path; }

private:
const UInt64 limit;
const DiskPtr disk;
const String file_path;
const DataSourceDescription data_source_description;
const bool copy_encrypted;
const UInt64 size;
};

}
71 changes: 51 additions & 20 deletions src/Backups/BackupEntryFromImmutableFile.cpp
@@ -1,53 +1,84 @@
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Poco/File.h>
#include <Common/filesystemHelpers.h>


namespace DB
{

namespace
{
/// We mix the checksum calculated for non-encrypted data with IV generated to encrypt the file
/// to generate kind of a checksum for encrypted data. Of course it differs from the CityHash properly calculated for encrypted data.
UInt128 combineChecksums(UInt128 checksum1, UInt128 checksum2)
{
chassert(std::size(checksum2.items) == 2);
return CityHash_v1_0_2::CityHash128WithSeed(reinterpret_cast<const char *>(&checksum1), sizeof(checksum1), {checksum2.items[0], checksum2.items[1]});
}
}

BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
bool copy_encrypted_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
const std::optional<UInt128> & checksum_)
: disk(disk_)
, file_path(file_path_)
, settings(settings_)
, data_source_description(disk->getDataSourceDescription())
, copy_encrypted(copy_encrypted_ && data_source_description.is_encrypted)
, file_size(file_size_)
, checksum(checksum_)
, temporary_file_on_disk(temporary_file_)
{
}

BackupEntryFromImmutableFile::~BackupEntryFromImmutableFile() = default;

UInt64 BackupEntryFromImmutableFile::getSize() const
std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer(const ReadSettings & read_settings) const
{
std::lock_guard lock{get_file_size_mutex};
if (!file_size)
file_size = disk->getFileSize(file_path);
return *file_size;
if (copy_encrypted)
return disk->readEncryptedFile(file_path, read_settings);
else
return disk->readFile(file_path, read_settings);
}

std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
UInt64 BackupEntryFromImmutableFile::getSize() const
{
return disk->readFile(file_path, settings);
std::lock_guard lock{size_and_checksum_mutex};
if (!file_size_adjusted)
{
if (!file_size)
file_size = copy_encrypted ? disk->getEncryptedFileSize(file_path) : disk->getFileSize(file_path);
else if (copy_encrypted)
file_size = disk->getEncryptedFileSize(*file_size);
file_size_adjusted = true;
}
return *file_size;
}


DataSourceDescription BackupEntryFromImmutableFile::getDataSourceDescription() const
UInt128 BackupEntryFromImmutableFile::getChecksum() const
{
return disk->getDataSourceDescription();
std::lock_guard lock{size_and_checksum_mutex};
if (!checksum_adjusted)
{
if (!checksum)
checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum();
else if (copy_encrypted)
checksum = combineChecksums(*checksum, disk->getEncryptedFileIV(file_path));
checksum_adjusted = true;
}
return *checksum;
}

String BackupEntryFromImmutableFile::getFilePath() const
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length) const
{
return file_path;
if (prefix_length == 0)
return 0;

if (prefix_length >= getSize())
return getChecksum();

/// For immutable files we don't use partial checksums.
return std::nullopt;
}

}

0 comments on commit 9a824a0

Please sign in to comment.