Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http connection pool between replicas #3594

Merged
merged 10 commits into from Nov 21, 2018
2 changes: 1 addition & 1 deletion contrib/poco
Submodule poco updated from 566162 to 20c1d8
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Expand Up @@ -396,6 +396,7 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN = 419;
extern const int CANNOT_UPDATE_COLUMN = 420;
extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 421;
extern const int UNSUPPORTED_URI_SCHEME = 422;

extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ProfileEvents.cpp
Expand Up @@ -170,6 +170,7 @@
M(OSWriteBytes, "Number of bytes written to disks or block devices. Doesn't include bytes that are in page cache dirty pages. May not include data that was written by OS asynchronously.") \
M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \
M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \
M(CreatedHTTPConnections, "Total amount of created HTTP connections (closed or opened).") \

namespace ProfileEvents
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Core/Defines.h
Expand Up @@ -62,6 +62,9 @@

#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum namber of http-connections between two endpoints
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Ok to make this macro more local.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number is unmotivated, you can clearly state that fact in comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro is used in MergeTreeSettings and HTTPCommon. I don't want to include these headers to each other.

Comment about motivation is good idea.


// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html

Expand Down
160 changes: 138 additions & 22 deletions dbms/src/IO/HTTPCommon.cpp
@@ -1,9 +1,10 @@
#include <IO/HTTPCommon.h>

#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/config.h>
#include <Poco/Version.h>

#if USE_POCO_NETSSL
#include <Poco/Net/AcceptCertificateHandler.h>
#include <Poco/Net/Context.h>
Expand All @@ -13,22 +14,146 @@
#include <Poco/Net/RejectCertificateHandler.h>
#include <Poco/Net/SSLManager.h>
#endif

#include <tuple>
#include <unordered_map>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/Application.h>
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>

#include <sstream>

namespace DB

namespace ProfileEvents
{
extern const Event CreatedHTTPConnections;
}

namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
extern const int UNSUPPORTED_URI_SCHEME;
}


namespace
{
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
}

bool isHTTPS(const Poco::URI & uri)
{
if (uri.getScheme() == "https")
return true;
else if (uri.getScheme() == "http")
return false;
else
throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
}

HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
{
HTTPSessionPtr session;

if (https)
#if USE_POCO_NETSSL
session = std::make_shared<Poco::Net::HTTPSClientSession>();
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
else
session = std::make_shared<Poco::Net::HTTPClientSession>();

ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);

session->setHost(DNSResolver::instance().resolveHost(host).toString());
session->setPort(port);
session->setKeepAlive(keep_alive);

return session;
}

class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;

ObjectPtr allocObject() override
{
return makeHTTPSessionImpl(host, port, https, true);
}

public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
: Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_)
{
}
};

class HTTPSessionPool : public ext::singleton<HTTPSessionPool>
{
private:
using Key = std::tuple<std::string, UInt16, bool>;
using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
using Entry = SingleEndpointHTTPSessionPool::Entry;

friend class ext::singleton<HTTPSessionPool>;

struct Hasher
{
size_t operator()(const Key & k) const
{
SipHash s;
s.update(std::get<0>(k));
s.update(std::get<1>(k));
s.update(std::get<2>(k));
return s.get64();
}
};

std::mutex mutex;
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;

protected:
HTTPSessionPool() = default;

public:
Entry getSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t max_connections_per_endpoint)
{
std::unique_lock<std::mutex> lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto key = std::make_tuple(host, port, https);
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));

auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);

setTimeouts(*session, timeouts);
return session;
}
};
}

void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
{
if (!response.getKeepAlive())
Expand All @@ -39,30 +164,21 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}

std::unique_ptr<Poco::Net::HTTPClientSession> makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
{
bool is_ssl = static_cast<bool>(uri.getScheme() == "https");
std::unique_ptr<Poco::Net::HTTPClientSession> session;
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);

if (is_ssl)
#if USE_POCO_NETSSL
session = std::make_unique<Poco::Net::HTTPSClientSession>();
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
else
session = std::make_unique<Poco::Net::HTTPClientSession>();
auto session = makeHTTPSessionImpl(host, port, https, false);
setTimeouts(*session, timeouts);
return session;
}

session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString());
session->setPort(uri.getPort());

#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session->setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif

return session;
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
{
return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: maybe it will lead to artificial replication delay when pool happened to be full? (Due to background pool task delay after exception). Maybe it will lead to 10 second delays when we suddenly want to download many small parts.

}


Expand Down
39 changes: 28 additions & 11 deletions dbms/src/IO/HTTPCommon.h
@@ -1,43 +1,60 @@
#pragma once

#include <mutex>
#include <memory>
#include <iostream>
#include <memory>
#include <mutex>

#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <Common/PoolBase.h>


#include <IO/ConnectionTimeouts.h>

namespace Poco
{
namespace Net
{
class HTTPServerResponse;
}
namespace Net
{
class HTTPServerResponse;
}
}


namespace DB
{
constexpr int HTTP_TOO_MANY_REQUESTS = 429;

const int HTTP_TOO_MANY_REQUESTS = 429;
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
const bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;

void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);
ObjectPtr allocObject() override;

public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_);
};
using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry;
using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;

void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);

/// Create session object to perform requests and set required parameters.
std::unique_ptr<Poco::Net::HTTPClientSession> makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);

/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size);

/** Used to receive response (response headers and possibly body)
* after sending data (request headers and possibly body).
* Throws exception in case of non HTTP_OK (200) response code.
* Returned istream lives in 'session' object.
*/
std::istream * receiveResponse(Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);

std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);
}
64 changes: 0 additions & 64 deletions dbms/src/IO/ReadWriteBufferFromHTTP.cpp
@@ -1,65 +1 @@
#include <IO/ReadWriteBufferFromHTTP.h>

#include <Common/config.h>
#include <Core/Types.h>
#include <IO/ReadBufferFromIStream.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Version.h>
#include <common/logger_useful.h>
#include <IO/HTTPCommon.h>

namespace DB
{


ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(Poco::URI uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},
session{makeHTTPSession(uri, timeouts)}
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
uri.setPath("/");

Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header

if (out_stream_callback)
request.setChunkedTransferEncoding(true);

if (!credentials.getUsername().empty())
credentials.authenticate(request);

Poco::Net::HTTPResponse response;

LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());

auto & stream_out = session->sendRequest(request);

if (out_stream_callback)
out_stream_callback(stream_out);

istr = receiveResponse(*session, request, response);

impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}


bool ReadWriteBufferFromHTTP::nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}

}