Skip to content

Commit

Permalink
Merge pull request #48791 from kssenii/better-local-object-storage
Browse files Browse the repository at this point in the history
Make local object storage work consistently with s3 object storage, fix problem with append, make it configurable as independent storage
  • Loading branch information
alesapin committed May 4, 2023
2 parents caed9c8 + 35f437a commit 412b161
Show file tree
Hide file tree
Showing 54 changed files with 398 additions and 814 deletions.
11 changes: 11 additions & 0 deletions docker/test/upgrade/run.sh
Expand Up @@ -59,6 +59,12 @@ install_packages previous_release_package_folder
# available for dump via clickhouse-local
configure

# local_blob_storage disk type does not exist in older versions
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<type>local_blob_storage</type>|<type>local</type>|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml

start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
Expand All @@ -83,6 +89,11 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
export ZOOKEEPER_FAULT_INJECTION=0
configure

sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<type>local_blob_storage</type>|<type>local</type>|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml

start

clickhouse-client --query="SELECT 'Server version: ', version()"
Expand Down
2 changes: 1 addition & 1 deletion src/Backups/BackupIO_S3.cpp
Expand Up @@ -197,7 +197,7 @@ void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_na
auto object_storage = src_disk->getObjectStorage();
std::string src_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / dest_file_name;
copyS3File(client, src_bucket, objects[0].absolute_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
copyS3File(client, src_bucket, objects[0].remote_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Expand Up @@ -136,6 +136,7 @@ if (TARGET ch_contrib::hdfs)
endif()

add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
add_headers_and_sources(dbms Disks/ObjectStorages/Web)

add_headers_and_sources(dbms Storages/Cache)
Expand Down
1 change: 0 additions & 1 deletion src/Common/ErrorCodes.cpp
Expand Up @@ -634,7 +634,6 @@
M(663, INCONSISTENT_METADATA_FOR_BACKUP) \
M(664, ACCESS_STORAGE_DOESNT_ALLOW_BACKUP) \
M(665, CANNOT_CONNECT_NATS) \
M(666, CANNOT_USE_CACHE) \
M(667, NOT_INITIALIZED) \
M(668, INVALID_STATE) \
M(669, NAMED_COLLECTION_DOESNT_EXIST) \
Expand Down
1 change: 0 additions & 1 deletion src/Core/Settings.h
Expand Up @@ -663,7 +663,6 @@ class IColumn;
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \
M(Bool, enable_filesystem_cache_on_lower_level, true, "If read buffer supports caching inside threadpool, allow it to do it, otherwise cache outside ot threadpool. Do not use this setting, it is needed for testing", 0) \
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
Expand Down
78 changes: 1 addition & 77 deletions src/Disks/DiskLocal.cpp
Expand Up @@ -9,9 +9,7 @@
#include <Common/quoteString.h>
#include <Common/atomicRename.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
#include <Disks/loadLocalDiskConfig.h>
#include <Disks/TemporaryFileOnDisk.h>

#include <fstream>
Expand Down Expand Up @@ -39,7 +37,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int LOGICAL_ERROR;
extern const int CANNOT_TRUNCATE_FILE;
Expand All @@ -54,53 +51,6 @@ std::mutex DiskLocal::reservation_mutex;

using DiskLocalPtr = std::shared_ptr<DiskLocal>;

static void loadDiskLocalConfig(const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
String & path,
UInt64 & keep_free_space_bytes)
{
path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"\"default\" disk path should be provided in <path> not it <storage_configuration>");
path = context->getPath();
}
else
{
if (path.empty())
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Disk path can not be empty. Disk {}", name);
if (path.back() != '/')
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Disk path must end with /. Disk {}", name);
if (path == context->getPath())
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Disk path ('{}') cannot be equal to <path>. Use <default> disk instead.", path);
}

bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");

if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG,
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified");

keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);

if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "'keep_free_space_ratio' have to be between 0 and 1");
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context->getPath();

// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
}

std::optional<size_t> fileSizeSafe(const fs::path & path)
{
std::error_code ec;
Expand Down Expand Up @@ -604,25 +554,6 @@ catch (...)
return false;
}

DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
auto metadata_storage = std::make_shared<FakeMetadataStorageFromDisk>(
/* metadata_storage */std::static_pointer_cast<DiskLocal>(shared_from_this()),
object_storage,
/* object_storage_root_path */getPath());

return std::make_shared<DiskObjectStorage>(
getName(),
disk_path,
"Local",
metadata_storage,
object_storage,
false,
/* threadpool_size */16
);
}

void DiskLocal::checkAccessImpl(const String & path)
{
try
Expand Down Expand Up @@ -750,13 +681,6 @@ void DiskLocal::chmod(const String & path, mode_t mode)
DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}

MetadataStoragePtr DiskLocal::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}

void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
Expand Down
4 changes: 0 additions & 4 deletions src/Disks/DiskLocal.h
Expand Up @@ -121,16 +121,12 @@ class DiskLocal : public IDisk
bool canRead() const noexcept;
bool canWrite() noexcept;

DiskObjectStoragePtr createDiskObjectStorage() override;

bool supportsStat() const override { return true; }
struct stat stat(const String & path) const override;

bool supportsChmod() const override { return true; }
void chmod(const String & path, mode_t mode) override;

MetadataStoragePtr getMetadataStorage() override;

protected:
void checkAccessImpl(const String & path) override;

Expand Down
3 changes: 3 additions & 0 deletions src/Disks/DiskType.h
Expand Up @@ -15,6 +15,7 @@ enum class DataSourceType
HDFS,
WebServer,
AzureBlobStorage,
LocalBlobStorage,
};

inline String toString(DataSourceType data_source_type)
Expand All @@ -35,6 +36,8 @@ inline String toString(DataSourceType data_source_type)
return "web";
case DataSourceType::AzureBlobStorage:
return "azure_blob_storage";
case DataSourceType::LocalBlobStorage:
return "local_blob_storage";
}
UNREACHABLE();
}
Expand Down
3 changes: 0 additions & 3 deletions src/Disks/IDisk.cpp
Expand Up @@ -7,9 +7,6 @@
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/ServerUUID.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/FakeDiskTransaction.h>

namespace DB
Expand Down
8 changes: 7 additions & 1 deletion src/Disks/IDisk.h
Expand Up @@ -367,7 +367,13 @@ class IDisk : public Space
/// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual MetadataStoragePtr getMetadataStorage() = 0;
virtual MetadataStoragePtr getMetadataStorage()
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method getMetadataStorage() is not implemented for disk type: {}",
toString(getDataSourceDescription().type));
}

/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string.
Expand Down
Expand Up @@ -175,7 +175,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.getMappedPath(),
.path = object.local_path,
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
Expand Down
13 changes: 6 additions & 7 deletions src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp
Expand Up @@ -5,6 +5,7 @@
#include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h>
#include <Common/assert_cast.h>
#include <IO/BoundedReadBuffer.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <base/hex.h>
Expand Down Expand Up @@ -33,7 +34,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_USE_CACHE;
extern const int LOGICAL_ERROR;
extern const int ARGUMENT_OUT_OF_BOUND;
}
Expand Down Expand Up @@ -190,12 +190,11 @@ CachedOnDiskReadBufferFromFile::getRemoteReadBuffer(FileSegment & file_segment,

if (!remote_fs_segment_reader)
{
remote_fs_segment_reader = implementation_buffer_creator();

if (!remote_fs_segment_reader->supportsRightBoundedReads())
throw Exception(
ErrorCodes::CANNOT_USE_CACHE,
"Cache cannot be used with a ReadBuffer which does not support right bounded reads");
auto impl = implementation_buffer_creator();
if (impl->supportsRightBoundedReads())
remote_fs_segment_reader = std::move(impl);
else
remote_fs_segment_reader = std::make_unique<BoundedReadBuffer>(std::move(impl));

file_segment.setRemoteFileReader(remote_fs_segment_reader);
}
Expand Down
5 changes: 3 additions & 2 deletions src/Disks/IO/CachedOnDiskReadBufferFromFile.h
Expand Up @@ -20,8 +20,7 @@ namespace DB
class CachedOnDiskReadBufferFromFile : public ReadBufferFromFileBase
{
public:
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;
using ImplementationBufferCreator = std::function<ImplementationBufferPtr()>;
using ImplementationBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>()>;

CachedOnDiskReadBufferFromFile(
const String & source_file_path_,
Expand Down Expand Up @@ -61,6 +60,8 @@ class CachedOnDiskReadBufferFromFile : public ReadBufferFromFileBase
};

private:
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;

void initialize(size_t offset, size_t size);
void assertCorrectness() const;

Expand Down
17 changes: 8 additions & 9 deletions src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
Expand Up @@ -27,15 +27,11 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, current_object(!blobs_to_read_.empty() ? blobs_to_read_.front() : throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects"))
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (blobs_to_read.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects");

current_object = blobs_to_read.front();

with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
Expand All @@ -50,7 +46,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c

current_object = object;
total_bytes_read_from_current_file = 0;
const auto & object_path = object.absolute_path;
const auto & object_path = object.remote_path;

size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
auto current_read_buffer_creator = [=, this]() { return read_buffer_creator(object_path, current_read_until_position); };
Expand All @@ -76,12 +72,12 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c

void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
chassert(!current_object.absolute_path.empty());
chassert(!current_object.remote_path.empty());
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = current_object.absolute_path,
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
Expand Down Expand Up @@ -123,6 +119,8 @@ void ReadBufferFromRemoteFSGather::initialize()

if (object.bytes_size > current_buf_offset)
{
LOG_TEST(log, "Reading from file: {} ({})", object.remote_path, object.local_path);

/// Do not create a new buffer if we already have what we need.
if (!current_buf || current_buf_idx != i)
{
Expand Down Expand Up @@ -174,6 +172,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
++current_buf_idx;

const auto & object = blobs_to_read[current_buf_idx];
LOG_TEST(log, "Reading from next file: {} ({})", object.remote_path, object.local_path);
current_buf = createImplementationBuffer(object);

return true;
Expand Down Expand Up @@ -246,7 +245,7 @@ void ReadBufferFromRemoteFSGather::reset()

String ReadBufferFromRemoteFSGather::getFileName() const
{
return current_object.absolute_path;
return current_object.remote_path;
}

size_t ReadBufferFromRemoteFSGather::getFileSize() const
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromRemoteFSGather.h
Expand Up @@ -20,7 +20,7 @@ class ReadBufferFromRemoteFSGather final : public ReadBuffer
friend class ReadIndirectBufferFromRemoteFS;

public:
using ReadBufferCreator = std::function<std::shared_ptr<ReadBufferFromFileBase>(const std::string & path, size_t read_until_position)>;
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(const std::string & path, size_t read_until_position)>;

ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
Expand Down

0 comments on commit 412b161

Please sign in to comment.