Skip to content

Commit

Permalink
Merge pull request #62443 from ClickHouse/backport/24.3/62249
Browse files Browse the repository at this point in the history
Backport #62249 to 24.3: several fixes for client's keep alive connections
  • Loading branch information
robot-ch-test-poll3 committed Apr 9, 2024
2 parents 693c59f + a8a4f78 commit d928cbb
Show file tree
Hide file tree
Showing 20 changed files with 709 additions and 172 deletions.
32 changes: 30 additions & 2 deletions base/poco/Net/include/Poco/Net/HTTPClientSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,19 @@ namespace Net
Poco::Timespan getKeepAliveTimeout() const;
/// Returns the connection timeout for HTTP connections.

void setKeepAliveMaxRequests(int max_requests);

int getKeepAliveMaxRequests() const;

int getKeepAliveRequest() const;

bool isKeepAliveExpired(double reliability = 1.0) const;
/// Returns if the connection is expired with some margin as fraction of timeout as reliability

double getKeepAliveReliability() const;
/// Returns the current fraction of keep alive timeout when connection is considered safe to use
/// It helps to avoid situation when a client uses nearly expired connection and receives NoMessageException

virtual std::ostream & sendRequest(HTTPRequest & request);
/// Sends the header for the given HTTP request to
/// the server.
Expand Down Expand Up @@ -345,6 +358,8 @@ namespace Net

void assign(HTTPClientSession & session);

void setKeepAliveRequest(int request);

HTTPSessionFactory _proxySessionFactory;
/// Factory to create HTTPClientSession to proxy.
private:
Expand All @@ -353,6 +368,8 @@ namespace Net
Poco::UInt16 _port;
ProxyConfig _proxyConfig;
Poco::Timespan _keepAliveTimeout;
int _keepAliveCurrentRequest = 0;
int _keepAliveMaxRequests = 1000;
Poco::Timestamp _lastRequest;
bool _reconnect;
bool _mustReconnect;
Expand All @@ -361,6 +378,7 @@ namespace Net
Poco::SharedPtr<std::ostream> _pRequestStream;
Poco::SharedPtr<std::istream> _pResponseStream;

static const double _defaultKeepAliveReliabilityLevel;
static ProxyConfig _globalProxyConfig;

HTTPClientSession(const HTTPClientSession &);
Expand Down Expand Up @@ -450,9 +468,19 @@ namespace Net
return _lastRequest;
}

inline void HTTPClientSession::setLastRequest(Poco::Timestamp time)
inline double HTTPClientSession::getKeepAliveReliability() const
{
return _defaultKeepAliveReliabilityLevel;
}

inline int HTTPClientSession::getKeepAliveMaxRequests() const
{
return _keepAliveMaxRequests;
}

inline int HTTPClientSession::getKeepAliveRequest() const
{
_lastRequest = time;
return _keepAliveCurrentRequest;
}

}
Expand Down
4 changes: 4 additions & 0 deletions base/poco/Net/include/Poco/Net/HTTPMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ namespace Net
/// The value is set to "Keep-Alive" if keepAlive is
/// true, or to "Close" otherwise.

void setKeepAliveTimeout(int timeout, int max_requests);
int getKeepAliveTimeout() const;
int getKeepAliveMaxRequests() const;

bool getKeepAlive() const;
/// Returns true if
/// * the message has a Connection header field and its value is "Keep-Alive"
Expand Down
2 changes: 1 addition & 1 deletion base/poco/Net/include/Poco/Net/HTTPServerParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace Net
/// - timeout: 60 seconds
/// - keepAlive: true
/// - maxKeepAliveRequests: 0
/// - keepAliveTimeout: 10 seconds
/// - keepAliveTimeout: 15 seconds

void setServerName(const std::string & serverName);
/// Sets the name and port (name:port) that the server uses to identify itself.
Expand Down
2 changes: 2 additions & 0 deletions base/poco/Net/include/Poco/Net/HTTPServerSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ namespace Net
SocketAddress serverAddress();
/// Returns the server's address.

void setKeepAliveTimeout(Poco::Timespan keepAliveTimeout);

private:
bool _firstRequest;
Poco::Timespan _keepAliveTimeout;
Expand Down
84 changes: 72 additions & 12 deletions base/poco/Net/src/HTTPClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Net {


HTTPClientSession::ProxyConfig HTTPClientSession::_globalProxyConfig;
const double HTTPClientSession::_defaultKeepAliveReliabilityLevel = 0.9;


HTTPClientSession::HTTPClientSession():
Expand Down Expand Up @@ -220,7 +221,41 @@ void HTTPClientSession::setGlobalProxyConfig(const ProxyConfig& config)

void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout)
{
_keepAliveTimeout = timeout;
if (connected())
{
throw Poco::IllegalStateException("cannot change keep alive timeout on initiated connection, "
"That value is managed privately after connection is established.");
}
_keepAliveTimeout = timeout;
}


void HTTPClientSession::setKeepAliveMaxRequests(int max_requests)
{
if (connected())
{
throw Poco::IllegalStateException("cannot change keep alive max requests on initiated connection, "
"That value is managed privately after connection is established.");
}
_keepAliveMaxRequests = max_requests;
}


void HTTPClientSession::setKeepAliveRequest(int request)
{
_keepAliveCurrentRequest = request;
}



void HTTPClientSession::setLastRequest(Poco::Timestamp time)
{
if (connected())
{
throw Poco::IllegalStateException("cannot change last request on initiated connection, "
"That value is managed privately after connection is established.");
}
_lastRequest = time;
}


Expand All @@ -231,6 +266,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
clearException();
_responseReceived = false;

_keepAliveCurrentRequest += 1;

bool keepAlive = getKeepAlive();
if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty())
{
Expand All @@ -241,8 +278,10 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
{
if (!connected())
reconnect();
if (!keepAlive)
request.setKeepAlive(false);
if (!request.has(HTTPMessage::CONNECTION))
request.setKeepAlive(keepAlive);
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds(), _keepAliveMaxRequests);
if (!request.has(HTTPRequest::HOST) && !_host.empty())
request.setHost(_host, _port);
if (!_proxyConfig.host.empty() && !bypassProxy())
Expand Down Expand Up @@ -324,6 +363,17 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response)

_mustReconnect = getKeepAlive() && !response.getKeepAlive();

if (!_mustReconnect)
{
/// when server sends its keep alive timeout, client has to follow that value
auto timeout = response.getKeepAliveTimeout();
if (timeout > 0)
_keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0));
auto max_requests = response.getKeepAliveMaxRequests();
if (max_requests > 0)
_keepAliveMaxRequests = std::min(_keepAliveMaxRequests, max_requests);
}

if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
_pResponseStream = new HTTPFixedLengthInputStream(*this, 0);
else if (response.getChunkedTransferEncoding())
Expand Down Expand Up @@ -430,15 +480,18 @@ std::string HTTPClientSession::proxyRequestPrefix() const
return result;
}

bool HTTPClientSession::isKeepAliveExpired(double reliability) const
{
Poco::Timestamp now;
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest
|| _keepAliveCurrentRequest > _keepAliveMaxRequests;
}

bool HTTPClientSession::mustReconnect() const
{
if (!_mustReconnect)
{
Poco::Timestamp now;
return _keepAliveTimeout <= now - _lastRequest;
}
else return true;
return isKeepAliveExpired(_defaultKeepAliveReliabilityLevel);
return true;
}


Expand Down Expand Up @@ -511,14 +564,21 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session)
if (buffered())
throw Poco::LogicException("assign to a session with not empty buffered data");

attachSocket(session.detachSocket());
setLastRequest(session.getLastRequest());
poco_assert(!connected());

setResolvedHost(session.getResolvedHost());
setKeepAlive(session.getKeepAlive());
setProxyConfig(session.getProxyConfig());

setTimeout(session.getConnectionTimeout(), session.getSendTimeout(), session.getReceiveTimeout());
setKeepAlive(session.getKeepAlive());

setLastRequest(session.getLastRequest());
setKeepAliveTimeout(session.getKeepAliveTimeout());
setProxyConfig(session.getProxyConfig());

_keepAliveMaxRequests = session._keepAliveMaxRequests;
_keepAliveCurrentRequest = session._keepAliveCurrentRequest;

attachSocket(session.detachSocket());

session.reset();
}
Expand Down
48 changes: 48 additions & 0 deletions base/poco/Net/src/HTTPMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Poco/NumberFormatter.h"
#include "Poco/NumberParser.h"
#include "Poco/String.h"
#include <format>


using Poco::NumberFormatter;
Expand Down Expand Up @@ -179,4 +180,51 @@ bool HTTPMessage::getKeepAlive() const
}


void HTTPMessage::setKeepAliveTimeout(int timeout, int max_requests)
{
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max={}", timeout, max_requests));
}


int parseFromHeaderValues(const std::string_view header_value, const std::string_view param_name)
{
auto param_value_pos = header_value.find(param_name);
if (param_value_pos == std::string::npos)
param_value_pos = header_value.size();
if (param_value_pos != header_value.size())
param_value_pos += param_name.size();

auto param_value_end = header_value.find(',', param_value_pos);
if (param_value_end == std::string::npos)
param_value_end = header_value.size();

auto timeout_value_substr = header_value.substr(param_value_pos, param_value_end - param_value_pos);
if (timeout_value_substr.empty())
return -1;

int value = 0;
auto [ptr, ec] = std::from_chars(timeout_value_substr.begin(), timeout_value_substr.end(), value);

if (ec == std::errc())
return value;

return -1;
}


int HTTPMessage::getKeepAliveTimeout() const
{
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
static const std::string_view timeout_param = "timeout=";
return parseFromHeaderValues(ka_header, timeout_param);
}


int HTTPMessage::getKeepAliveMaxRequests() const
{
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
static const std::string_view timeout_param = "max=";
return parseFromHeaderValues(ka_header, timeout_param);
}

} } // namespace Poco::Net
13 changes: 12 additions & 1 deletion base/poco/Net/src/HTTPServerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,18 @@ void HTTPServerConnection::run()

pHandler->handleRequest(request, response);
session.setKeepAlive(_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
}

/// all that fuzz is all about to make session close with less timeout than 15s (set in HTTPServerParams c-tor)
if (_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive())
{
int value = response.getKeepAliveTimeout();
if (value < 0)
value = request.getKeepAliveTimeout();
if (value > 0)
session.setKeepAliveTimeout(Poco::Timespan(value, 0));
}

}
else sendErrorResponse(session, HTTPResponse::HTTP_NOT_IMPLEMENTED);
}
catch (Poco::Exception&)
Expand Down
6 changes: 6 additions & 0 deletions base/poco/Net/src/HTTPServerSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ HTTPServerSession::~HTTPServerSession()
{
}

void HTTPServerSession::setKeepAliveTimeout(Poco::Timespan keepAliveTimeout)
{
_keepAliveTimeout = keepAliveTimeout;
}



bool HTTPServerSession::hasMoreRequests()
{
Expand Down

0 comments on commit d928cbb

Please sign in to comment.