Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ class IColumn;
M(Bool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.", 0) \
M(Bool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(Bool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_read_in_window_order, true, "Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_aggregation_in_order, false, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \
M(UInt64, read_in_order_two_level_merge_threshold, 100, "Minimal number of parts to read to run preliminary merge step during multithread reading in order of primary key.", 0) \
Expand Down
3 changes: 2 additions & 1 deletion src/Disks/IDiskRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ friend class DiskRemoteReservation;
DiskPtr metadata_disk;
FileCachePtr cache;

private:
public:
void removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);

private:
void removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);

bool tryReserve(UInt64 bytes);
Expand Down
84 changes: 28 additions & 56 deletions src/Disks/S3/DiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <base/scope_guard_safe.h>
#include <base/unit.h>
#include <base/FnTraits.h>

#include <Common/checkStackSize.h>
#include <Common/createHardLink.h>
Expand All @@ -35,6 +34,7 @@
#include <Disks/IO/ThreadPoolRemoteFSReader.h>

#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
Expand All @@ -57,52 +57,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
class S3PathKeeper : public RemoteFSPathKeeper
{
public:
using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>;
using Chunks = std::list<Chunk>;

explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}

void addPath(const String & path) override
{
if (chunks.empty() || chunks.back().size() >= chunk_limit)
{
/// add one more chunk
chunks.push_back(Chunks::value_type());
chunks.back().reserve(chunk_limit);
}
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
chunks.back().push_back(obj);
}

void removePaths(Fn<void(Chunk &&)> auto && remove_chunk_func)
{
for (auto & chunk : chunks)
remove_chunk_func(std::move(chunk));
}

static String getChunkKeys(const Chunk & chunk)
{
String res;
for (const auto & obj : chunk)
{
const auto & key = obj.GetKey();
if (!res.empty())
res.append(", ");
res.append(key.c_str(), key.size());
}
return res;
}

private:
Chunks chunks;
};

template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{
Expand Down Expand Up @@ -155,12 +109,14 @@ DiskS3::DiskS3(
DiskPtr metadata_disk_,
FileCachePtr cache_,
ContextPtr context_,
const S3Capabilities & s3_capabilities_,
SettingsPtr settings_,
GetDiskSettings settings_getter_)
: IDiskRemote(name_, s3_root_path_, metadata_disk_, std::move(cache_), "DiskS3", settings_->thread_pool_size)
, bucket(std::move(bucket_))
, current_settings(std::move(settings_))
, settings_getter(settings_getter_)
, s3_capabilities(s3_capabilities_)
, context(context_)
{
}
Expand All @@ -180,15 +136,31 @@ void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
{
String keys = S3PathKeeper::getChunkKeys(chunk);
LOG_TRACE(log, "Remove AWS keys {}", keys);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = settings->client->DeleteObjects(request);
// Do not throw here, continue deleting other chunks
logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;});
if (!s3_capabilities.support_batch_delete)
{
LOG_TRACE(log, "Remove AWS keys {} one by one", keys);
for (const auto & obj : chunk)
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(obj.GetKey());
auto outcome = settings->client->DeleteObject(request);
// Do not throw here, continue deleting other keys and chunks
logIfError(outcome, [&](){return "Can't remove AWS key: " + obj.GetKey();});
}
}
else
{
LOG_TRACE(log, "Remove AWS keys {}", keys);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = settings->client->DeleteObjects(request);
// Do not throw here, continue deleting other chunks
logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;});
}
});
}

Expand Down
58 changes: 58 additions & 0 deletions src/Disks/S3/DiskS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
#include <atomic>
#include <optional>
#include <base/logger_useful.h>
#include <base/FnTraits.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
#include <Disks/S3/S3Capabilities.h>

#include <aws/s3/S3Client.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Result.h>
#include <aws/s3/model/ObjectIdentifier.h>

#include <Poco/DirectoryIterator.h>
#include <re2/re2.h>
Expand Down Expand Up @@ -76,6 +79,7 @@ class DiskS3 final : public IDiskRemote
DiskPtr metadata_disk_,
FileCachePtr cache_,
ContextPtr context_,
const S3Capabilities & s3_capabilities_,
SettingsPtr settings_,
GetDiskSettings settings_getter_);

Expand Down Expand Up @@ -119,6 +123,8 @@ class DiskS3 final : public IDiskRemote

void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override;

void setCapabilitiesSupportBatchDelete(bool value) { s3_capabilities.support_batch_delete = value; }

private:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
/// Converts revision to binary string with leading zeroes (64 bit).
Expand Down Expand Up @@ -166,6 +172,7 @@ class DiskS3 final : public IDiskRemote
MultiVersion<DiskS3Settings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
S3Capabilities s3_capabilities;

std::atomic<UInt64> revision_counter = 0;
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
Expand All @@ -187,6 +194,57 @@ class DiskS3 final : public IDiskRemote
ContextPtr context;
};

/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
class S3PathKeeper : public RemoteFSPathKeeper
{
public:
using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>;
using Chunks = std::list<Chunk>;

explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}

void addPath(const String & path) override
{
if (chunks.empty() || chunks.back().size() >= chunk_limit)
{
/// add one more chunk
chunks.push_back(Chunks::value_type());
chunks.back().reserve(chunk_limit);
}
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
chunks.back().push_back(obj);
}

void removePaths(Fn<void(Chunk &&)> auto && remove_chunk_func)
{
for (auto & chunk : chunks)
remove_chunk_func(std::move(chunk));
}

Chunks getChunks() const
{
return chunks;
}

static String getChunkKeys(const Chunk & chunk)
{
String res;
for (const auto & obj : chunk)
{
const auto & key = obj.GetKey();
if (!res.empty())
res.append(", ");
res.append(key.c_str(), key.size());
}
return res;
}

private:
Chunks chunks;
};

}

#endif
15 changes: 15 additions & 0 deletions src/Disks/S3/S3Capabilities.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include <Disks/S3/S3Capabilities.h>

namespace DB
{

S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
return S3Capabilities
{
.support_batch_delete = config.getBool(config_prefix + ".support_batch_delete", true),
.support_proxy = config.getBool(config_prefix + ".support_proxy", config.has(config_prefix + ".proxy")),
};
}

}
27 changes: 27 additions & 0 deletions src/Disks/S3/S3Capabilities.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <string>
#include <Poco/Util/AbstractConfiguration.h>

namespace DB
{

/// Supported/unsupported features by different S3 implementations
/// Can be useful only for almost compatible with AWS S3 versions.
struct S3Capabilities
{
/// Google S3 implementation doesn't support batch delete
/// TODO: possibly we have to use Google SDK https://github.com/googleapis/google-cloud-cpp/tree/main/google/cloud/storage
/// because looks like it miss a lot of features like:
/// 1) batch delete
/// 2) list_v2
/// 3) multipart upload works differently
bool support_batch_delete{true};

/// Y.Cloud S3 implementation support proxy for connection
bool support_proxy{false};
};

S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);

}
Loading