Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 with dynamic proxy configuration #10576

Merged
merged 8 commits into from May 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/aws
Submodule aws updated from 45dd85 to 04d54d
11 changes: 11 additions & 0 deletions docker/test/integration/compose/docker_compose_minio.yml
Expand Up @@ -27,5 +27,16 @@ services:
- SERVER_REDIRECT_CODE=307
- SERVER_ACCESS_LOG=/nginx/access.log

# HTTP proxies for Minio.
proxy1:
image: vimagick/tinyproxy
ports:
- "4081:8888"

proxy2:
image: vimagick/tinyproxy
ports:
- "4082:8888"

volumes:
data1-1:
3 changes: 3 additions & 0 deletions src/CMakeLists.txt
Expand Up @@ -234,6 +234,9 @@ if(USE_RDKAFKA)
add_headers_and_sources(dbms Storages/Kafka)
endif()

if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
Expand Down
105 changes: 22 additions & 83 deletions src/Disks/DiskS3.cpp → src/Disks/S3/DiskS3.cpp
@@ -1,34 +1,31 @@
#include "DiskS3.h"

#if USE_AWS_S3
# include "DiskFactory.h"

# include <random>
# include <utility>
# include <IO/ReadBufferFromFile.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/ReadHelpers.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <Poco/File.h>
# include <Common/checkStackSize.h>
# include <Common/createHardLink.h>
# include <Common/quoteString.h>
# include <Common/thread_local_rng.h>

# include <aws/s3/model/CopyObjectRequest.h>
# include <aws/s3/model/DeleteObjectRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
#include "Disks/DiskFactory.h"

#include <random>
#include <utility>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Common/checkStackSize.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>

#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
}
Expand Down Expand Up @@ -394,12 +391,14 @@ class DiskS3Reservation final : public IReservation
DiskS3::DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_)
: name(std::move(name_))
, client(std::move(client_))
, proxy_configuration(std::move(proxy_configuration_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
Expand Down Expand Up @@ -686,64 +685,4 @@ DiskS3Reservation::~DiskS3Reservation()
}
}

namespace
{

void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}

void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}

void checkRemoveAccess(IDisk & disk)
{
disk.remove("test_acl");
}

}

void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();

S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
auto client = S3::ClientFactory::instance().create(
uri.endpoint,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""));

if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::LOGICAL_ERROR);

String metadata_path = context.getPath() + "disks/" + name + "/";

auto s3disk
= std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path, context.getSettingsRef().s3_min_upload_part_size);

/// This code is used only to check access to the corresponding disk.
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);

return s3disk;
};
factory.registerDiskType("s3", creator);
}

}

#endif
16 changes: 6 additions & 10 deletions src/Disks/DiskS3.h → src/Disks/S3/DiskS3.h
@@ -1,14 +1,10 @@
#pragma once

#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"

#if USE_AWS_S3
# include "DiskFactory.h"

# include <aws/s3/S3Client.h>
# include <Poco/DirectoryIterator.h>
#include <aws/s3/S3Client.h>
#include <Poco/DirectoryIterator.h>


namespace DB
Expand All @@ -26,6 +22,7 @@ class DiskS3 : public IDisk
DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
Expand Down Expand Up @@ -105,6 +102,7 @@ class DiskS3 : public IDisk
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
const String metadata_path;
Expand All @@ -116,5 +114,3 @@ class DiskS3 : public IDisk
};

}

#endif
27 changes: 27 additions & 0 deletions src/Disks/S3/DynamicProxyConfiguration.cpp
@@ -0,0 +1,27 @@
#include "DynamicProxyConfiguration.h"

#include <utility>
#include <common/logger_useful.h>

namespace DB::S3
{
DynamicProxyConfiguration::DynamicProxyConfiguration(std::vector<Poco::URI> _proxies) : proxies(std::move(_proxies)), access_counter(0)
{
}


Aws::Client::ClientConfigurationPerRequest DynamicProxyConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
{
/// Avoid atomic increment if number of proxies is 1.
size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0;

Aws::Client::ClientConfigurationPerRequest cfg;
cfg.proxyHost = proxies[index].getHost();
cfg.proxyPort = proxies[index].getPort();

LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: " << proxies[index].toString());

return cfg;
}

}
24 changes: 24 additions & 0 deletions src/Disks/S3/DynamicProxyConfiguration.h
@@ -0,0 +1,24 @@
#pragma once

#include <utility>
#include <Core/Types.h>
#include <aws/core/client/ClientConfiguration.h>
#include <Poco/URI.h>

namespace DB::S3
{
class DynamicProxyConfiguration
{
public:
explicit DynamicProxyConfiguration(std::vector<Poco::URI> _proxies);
/// Returns proxy configuration on each HTTP request.
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request);

private:
/// List of configured proxies.
const std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};

}
119 changes: 119 additions & 0 deletions src/Disks/S3/registerDiskS3.cpp
@@ -0,0 +1,119 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include "DiskS3.h"
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PATH_ACCESS_DENIED;
}

namespace
{
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}

void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}

void checkRemoveAccess(IDisk & disk) { disk.remove("test_acl"); }

std::shared_ptr<S3::DynamicProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
{
if (config->has("proxy"))
{
std::vector<String> keys;
config->keys("proxy", keys);

std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(config->getString("proxy." + key));

if (proxy_uri.getScheme() != "http")
throw Exception("Only HTTP scheme is allowed in proxy configuration at the moment, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy configuration, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);

proxies.push_back(proxy_uri);

LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: " << proxy_uri.toString());
}

if (!proxies.empty())
return std::make_shared<S3::DynamicProxyConfiguration>(proxies);
}
return nullptr;
}

}


void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
const auto * disk_config = config.createView(config_prefix);

Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();

Aws::Client::ClientConfiguration cfg;

S3::URI uri(Poco::URI(disk_config->getString("endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);

cfg.endpointOverride = uri.endpoint;

auto proxy_config = getProxyConfiguration(disk_config);
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };

auto client = S3::ClientFactory::instance().create(
cfg,
disk_config->getString("access_key_id", ""),
disk_config->getString("secret_access_key", ""));

String metadata_path = context.getPath() + "disks/" + name + "/";

auto s3disk = std::make_shared<DiskS3>(
name,
client,
std::move(proxy_config),
uri.bucket,
uri.key,
metadata_path,
context.getSettingsRef().s3_min_upload_part_size);

/// This code is used only to check access to the corresponding disk.
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);

return s3disk;
};
factory.registerDiskType("s3", creator);
}

}
13 changes: 13 additions & 0 deletions src/Disks/S3/ya.make
@@ -0,0 +1,13 @@
LIBRARY()

PEERDIR(
clickhouse/src/Common
)

SRCS(
DiskS3.cpp
registerDiskS3.cpp
DynamicProxyConfiguration.cpp
)

END()