From 9b9b9b9d6564419dbab968ae9d0fecc716462602 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Thu, 19 Jun 2025 14:56:11 +0000 Subject: [PATCH 1/3] Merge pull request #81597 from ClickHouse/chesema-clints-keep-alive preserve connection when final empty chunk is left by client --- .../Net/include/Poco/Net/HTTPChunkedStream.h | 8 +++-- base/poco/Net/src/HTTPChunkedStream.cpp | 29 +++++++++++++++++-- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h index a6576aa561d0..d5263319ed38 100644 --- a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h +++ b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h @@ -45,7 +45,7 @@ namespace Net ~HTTPChunkedStreamBuf(); void close(); - bool isComplete() const { return _chunk == std::char_traits::eof(); } + bool isComplete(bool read_from_device_to_check_eof = false); protected: int readFromDevice(char * buffer, std::streamsize length); @@ -70,8 +70,6 @@ namespace Net ~HTTPChunkedIOS(); HTTPChunkedStreamBuf * rdbuf(); - bool isComplete() const { return _buf.isComplete(); } - protected: HTTPChunkedStreamBuf _buf; }; @@ -83,6 +81,8 @@ namespace Net public: HTTPChunkedInputStream(HTTPSession & session); ~HTTPChunkedInputStream(); + + bool isComplete() { return _buf.isComplete(/*read_from_device_to_check_eof*/ true); } }; @@ -92,6 +92,8 @@ namespace Net public: HTTPChunkedOutputStream(HTTPSession & session); ~HTTPChunkedOutputStream(); + + bool isComplete() { return _buf.isComplete(); } }; diff --git a/base/poco/Net/src/HTTPChunkedStream.cpp b/base/poco/Net/src/HTTPChunkedStream.cpp index 16ed1e71c314..043c38ce9e71 100644 --- a/base/poco/Net/src/HTTPChunkedStream.cpp +++ b/base/poco/Net/src/HTTPChunkedStream.cpp @@ -54,7 +54,7 @@ void HTTPChunkedStreamBuf::close() sync(); _session.write("0\r\n\r\n", 5); - _chunk = std::char_traits::eof(); + _chunk = std::char_traits::eof(); } } @@ -90,7 +90,7 @@ unsigned int HTTPChunkedStreamBuf::parseChunkLen() if (size_t pos = line.find(';'); pos != std::string::npos) line.resize(pos); - unsigned chunkLen; + unsigned chunkLen = 0; if (NumberParser::tryParseHex(line, chunkLen)) return chunkLen; else @@ -118,6 +118,9 @@ int HTTPChunkedStreamBuf::readFromDevice(char* buffer, std::streamsize length) if (_chunk > 0) { + if (length == 0) + return 0; + if (length > _chunk) length = _chunk; int n = _session.read(buffer, length); if (n > 0) @@ -128,7 +131,7 @@ int HTTPChunkedStreamBuf::readFromDevice(char* buffer, std::streamsize length) if (_chunk == 0) skipCRLF(); return n; } - else + else { skipCRLF(); _chunk = eof; @@ -137,6 +140,26 @@ int HTTPChunkedStreamBuf::readFromDevice(char* buffer, std::streamsize length) } +bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) +{ + if (read_from_device_to_check_eof) + { + try + { + /// If the stream is closed without final last chunk + /// "Unexpected EOF" exception would be thrown + readFromDevice(nullptr, 0); + } + catch (Poco::Net::MessageException &) + { + return false; + } + } + + return _chunk == std::char_traits::eof(); +} + + int HTTPChunkedStreamBuf::writeToDevice(const char* buffer, std::streamsize length) { _chunkBuffer.clear(); From 541eb6844612d30982af8dec057c9ffef08fe18c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=D1=81hael=20Stetsyuk?= <59827607+mstetsyuk@users.noreply.github.com> Date: Fri, 17 Oct 2025 09:54:10 +0000 Subject: [PATCH 2/3] Merge pull request #88668 from ClickHouse/wrap-PooledConnection-dtor wrap `~PooledConnection` in try-catch --- .../Net/include/Poco/Net/HTTPChunkedStream.h | 2 +- base/poco/Net/src/HTTPChunkedStream.cpp | 4 +- src/Common/HTTPConnectionPool.cpp | 57 +++++++++++-------- .../tests/gtest_http_chunked_stream.cpp | 16 ++++++ ...ed_losing_files_after_exception.reference} | 0 ...buted_losing_files_after_exception.sql.j2} | 0 6 files changed, 52 insertions(+), 27 deletions(-) create mode 100644 src/Common/tests/gtest_http_chunked_stream.cpp rename tests/queries/0_stateless/{02537_distributed_loosing_files_after_exception.reference => 02537_distributed_losing_files_after_exception.reference} (100%) rename tests/queries/0_stateless/{02537_distributed_loosing_files_after_exception.sql.j2 => 02537_distributed_losing_files_after_exception.sql.j2} (100%) diff --git a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h index d5263319ed38..604e66da5786 100644 --- a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h +++ b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h @@ -45,7 +45,7 @@ namespace Net ~HTTPChunkedStreamBuf(); void close(); - bool isComplete(bool read_from_device_to_check_eof = false); + bool isComplete(bool read_from_device_to_check_eof = false) noexcept; protected: int readFromDevice(char * buffer, std::streamsize length); diff --git a/base/poco/Net/src/HTTPChunkedStream.cpp b/base/poco/Net/src/HTTPChunkedStream.cpp index 043c38ce9e71..ca0b73128ff4 100644 --- a/base/poco/Net/src/HTTPChunkedStream.cpp +++ b/base/poco/Net/src/HTTPChunkedStream.cpp @@ -140,7 +140,7 @@ int HTTPChunkedStreamBuf::readFromDevice(char* buffer, std::streamsize length) } -bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) +bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) noexcept { if (read_from_device_to_check_eof) { @@ -150,7 +150,7 @@ bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) /// "Unexpected EOF" exception would be thrown readFromDevice(nullptr, 0); } - catch (Poco::Net::MessageException &) + catch (...) { return false; } diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index cf272e612b03..59c4396acbac 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -434,36 +434,45 @@ class EndpointConnectionPool : public std::enable_shared_from_this(response_stream)) + if (bool(response_stream)) { - response_stream_completed = fixed_steam->isComplete(); + if (auto * fixed_steam = dynamic_cast(response_stream)) + { + response_stream_completed = fixed_steam->isComplete(); + } + else if (auto * chunked_steam = dynamic_cast(response_stream)) + { + response_stream_completed = chunked_steam->isComplete(); + } + else if (auto * http_stream = dynamic_cast(response_stream)) + { + response_stream_completed = http_stream->isComplete(); + } + else + { + response_stream_completed = false; + } } - else if (auto * chunked_steam = dynamic_cast(response_stream)) - { - response_stream_completed = chunked_steam->isComplete(); - } - else if (auto * http_stream = dynamic_cast(response_stream)) - { - response_stream_completed = http_stream->isComplete(); - } - else - { - response_stream_completed = false; - } - } - response_stream = nullptr; - Session::setSendDataHooks(); - Session::setReceiveDataHooks(); + response_stream = nullptr; + Session::setSendDataHooks(); + Session::setReceiveDataHooks(); + Session::setSendThrottler(); + Session::setReceiveThrottler(); - group->atConnectionDestroy(); + group->atConnectionDestroy(); - if (!isExpired) - if (auto lock = pool.lock()) - lock->atConnectionDestroy(*this); + if (!isExpired) + if (auto lock = pool.lock()) + lock->atConnectionDestroy(*this); - CurrentMetrics::sub(metrics.active_count); + CurrentMetrics::sub(metrics.active_count); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } private: diff --git a/src/Common/tests/gtest_http_chunked_stream.cpp b/src/Common/tests/gtest_http_chunked_stream.cpp new file mode 100644 index 000000000000..c737f4794105 --- /dev/null +++ b/src/Common/tests/gtest_http_chunked_stream.cpp @@ -0,0 +1,16 @@ +#include + +#include +#include + + +TEST(HTTPChunkedStreamBuf, IsCompleteHandlesInvalidSocketException) +{ + Poco::Net::HTTPClientSession session; + Poco::Net::HTTPChunkedStreamBuf buf(session, std::ios::in); + + /// Default-initialized socket throws InvalidSocketException in SocketImpl::receiveBytes, + /// which HTTPChunkedStreamBuf::isComplete should swallow and return false. + bool complete = buf.isComplete(true); + ASSERT_FALSE(complete); +} diff --git a/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.reference b/tests/queries/0_stateless/02537_distributed_losing_files_after_exception.reference similarity index 100% rename from tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.reference rename to tests/queries/0_stateless/02537_distributed_losing_files_after_exception.reference diff --git a/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.sql.j2 b/tests/queries/0_stateless/02537_distributed_losing_files_after_exception.sql.j2 similarity index 100% rename from tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.sql.j2 rename to tests/queries/0_stateless/02537_distributed_losing_files_after_exception.sql.j2 From bbb27a4c5c8aea4e3eac341f46ec2012afdba1df Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Tue, 28 Oct 2025 11:57:45 +0000 Subject: [PATCH 3/3] Merge pull request #89090 from ClickHouse/mstetsyuk-patch-1 remove try/catch from `~PooledConnection` --- src/Common/HTTPConnectionPool.cpp | 59 ++++++++++++++----------------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index 59c4396acbac..4443fe4e15a6 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -434,45 +434,38 @@ class EndpointConnectionPool : public std::enable_shared_from_this(response_stream)) { - if (auto * fixed_steam = dynamic_cast(response_stream)) - { - response_stream_completed = fixed_steam->isComplete(); - } - else if (auto * chunked_steam = dynamic_cast(response_stream)) - { - response_stream_completed = chunked_steam->isComplete(); - } - else if (auto * http_stream = dynamic_cast(response_stream)) - { - response_stream_completed = http_stream->isComplete(); - } - else - { - response_stream_completed = false; - } + response_stream_completed = fixed_steam->isComplete(); } - response_stream = nullptr; - Session::setSendDataHooks(); - Session::setReceiveDataHooks(); - Session::setSendThrottler(); - Session::setReceiveThrottler(); + else if (auto * chunked_steam = dynamic_cast(response_stream)) + { + response_stream_completed = chunked_steam->isComplete(); + } + else if (auto * http_stream = dynamic_cast(response_stream)) + { + response_stream_completed = http_stream->isComplete(); + } + else + { + response_stream_completed = false; + } + } + response_stream = nullptr; + Session::setSendDataHooks(); + Session::setReceiveDataHooks(); + Session::setSendThrottler(); + Session::setReceiveThrottler(); - group->atConnectionDestroy(); + group->atConnectionDestroy(); - if (!isExpired) - if (auto lock = pool.lock()) - lock->atConnectionDestroy(*this); + if (!isExpired) + if (auto lock = pool.lock()) + lock->atConnectionDestroy(*this); - CurrentMetrics::sub(metrics.active_count); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + CurrentMetrics::sub(metrics.active_count); } private: