diff --git a/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto b/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto index 19f3fe3dd37d..7f4ea0c73f7c 100644 --- a/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto +++ b/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto @@ -300,6 +300,14 @@ message HttpConnectionManager { // is terminated with a 408 Request Timeout error code if no upstream response // header has been received, otherwise a stream reset occurs. // + // This timeout also specifies the amount of time that Envoy will wait for the peer to open enough + // window to write any remaining stream data once the entirety of stream data (local end stream is + // true) has been buffered pending available window. In other words, this timeout defends against + // a peer that does not release enough window to completely write the stream, even though all + // data has been proxied within available flow control windows. If the timeout is hit in this + // case, the :ref:`tx_flush_timeout ` counter will be + // incremented. + // // Note that it is possible to idle timeout even if the wire traffic for a stream is non-idle, due // to the granularity of events presented to the connection manager. For example, while receiving // very large request headers, it may be the case that there is traffic regularly arriving on the diff --git a/api/envoy/config/filter/network/http_connection_manager/v3alpha/http_connection_manager.proto b/api/envoy/config/filter/network/http_connection_manager/v3alpha/http_connection_manager.proto index f96b590d7130..040895af4594 100644 --- a/api/envoy/config/filter/network/http_connection_manager/v3alpha/http_connection_manager.proto +++ b/api/envoy/config/filter/network/http_connection_manager/v3alpha/http_connection_manager.proto @@ -287,6 +287,14 @@ message HttpConnectionManager { // is terminated with a 408 Request Timeout error code if no upstream response // header has been received, otherwise a stream reset occurs. // + // This timeout also specifies the amount of time that Envoy will wait for the peer to open enough + // window to write any remaining stream data once the entirety of stream data (local end stream is + // true) has been buffered pending available window. In other words, this timeout defends against + // a peer that does not release enough window to completely write the stream, even though all + // data has been proxied within available flow control windows. If the timeout is hit in this + // case, the :ref:`tx_flush_timeout ` counter will be + // incremented. + // // Note that it is possible to idle timeout even if the wire traffic for a stream is non-idle, due // to the granularity of events presented to the connection manager. For example, while receiving // very large request headers, it may be the case that there is traffic regularly arriving on the diff --git a/docs/root/configuration/best_practices/edge.rst b/docs/root/configuration/best_practices/edge.rst index 94e1728eb172..29ee298562fa 100644 --- a/docs/root/configuration/best_practices/edge.rst +++ b/docs/root/configuration/best_practices/edge.rst @@ -23,6 +23,9 @@ HTTP proxies should additionally configure: * :ref:`HTTP/2 maximum concurrent streams limit ` to 100, * :ref:`HTTP/2 initial stream window size limit ` to 64 KiB, * :ref:`HTTP/2 initial connection window size limit ` to 1 MiB. +* :ref:`headers_with_underscores_action setting ` to REJECT_REQUEST, to protect upstream services that treat '_' and '-' as interchangeable. +* :ref:`Listener connection limits. ` +* :ref:`Global downstream connection limits `. The following is a YAML example of the above recommendation. @@ -108,3 +111,15 @@ The following is a YAML example of the above recommendation. http2_protocol_options: initial_stream_window_size: 65536 # 64 KiB initial_connection_window_size: 1048576 # 1 MiB + + layered_runtime: + layers: + - name: static_layer_0 + static_layer: + envoy: + resource_limits: + listener: + example_listener_name: + connection_limit: 10000 + overload: + global_downstream_max_connections: 50000 diff --git a/docs/root/configuration/http/http_conn_man/stats.rst b/docs/root/configuration/http/http_conn_man/stats.rst index 74ce491f39a6..f375832b8c6c 100644 --- a/docs/root/configuration/http/http_conn_man/stats.rst +++ b/docs/root/configuration/http/http_conn_man/stats.rst @@ -137,7 +137,16 @@ All http2 statistics are rooted at *http2.* rx_reset, Counter, Total number of reset stream frames received by Envoy too_many_header_frames, Counter, Total number of times an HTTP2 connection is reset due to receiving too many headers frames. Envoy currently supports proxying at most one header frame for 100-Continue one non-100 response code header frame and one frame with trailers trailers, Counter, Total number of trailers seen on requests coming from downstream + tx_flush_timeout, Counter, Total number of :ref:`stream idle timeouts ` waiting for open stream window to flush the remainder of a stream tx_reset, Counter, Total number of reset stream frames transmitted by Envoy + streams_active, Gauge, Active streams as observed by the codec + pending_send_bytes, Gauge, Currently buffered body data in bytes waiting to be written when stream/connection window is opened. + +.. attention:: + + The HTTP/2 `streams_active` gauge may be greater than the HTTP connection manager + `downstream_rq_active` gauge due to differences in stream accounting between the codec and the + HTTP connection manager. Tracing statistics ------------------ diff --git a/docs/root/configuration/listeners/listeners.rst b/docs/root/configuration/listeners/listeners.rst index 73605a853658..3a8ea7112664 100644 --- a/docs/root/configuration/listeners/listeners.rst +++ b/docs/root/configuration/listeners/listeners.rst @@ -8,6 +8,7 @@ Listeners overview stats + runtime listener_filters/listener_filters network_filters/network_filters lds diff --git a/docs/root/configuration/listeners/runtime.rst b/docs/root/configuration/listeners/runtime.rst new file mode 100644 index 000000000000..b42b6aa5fa3f --- /dev/null +++ b/docs/root/configuration/listeners/runtime.rst @@ -0,0 +1,8 @@ +.. _config_listeners_runtime: + +Runtime +------- +The following runtime settings are supported: + +envoy.resource_limits.listener..connection_limit + Sets a limit on the number of active connections to the specified listener. diff --git a/docs/root/configuration/listeners/stats.rst b/docs/root/configuration/listeners/stats.rst index 73be50156bea..a144f2764df8 100644 --- a/docs/root/configuration/listeners/stats.rst +++ b/docs/root/configuration/listeners/stats.rst @@ -16,8 +16,10 @@ Every listener has a statistics tree rooted at *listener.
.* with the fo downstream_cx_destroy, Counter, Total destroyed connections downstream_cx_active, Gauge, Total active connections downstream_cx_length_ms, Histogram, Connection length milliseconds + downstream_cx_overflow, Counter, Total connections rejected due to enforcement of listener connection limit downstream_pre_cx_timeout, Counter, Sockets that timed out during listener filter processing downstream_pre_cx_active, Gauge, Sockets currently undergoing listener filter processing + global_cx_overflow, Counter, Total connections rejected due to enforecement of the global connection limit no_filter_chain_match, Counter, Total connections that didn't match any filter chain ssl.connection_error, Counter, Total TLS connection errors not including failed certificate verifications ssl.handshake, Counter, Total successful TLS connection handshakes diff --git a/docs/root/configuration/operations/overload_manager/overload_manager.rst b/docs/root/configuration/operations/overload_manager/overload_manager.rst index 06d298a8f22a..b888e0f649ed 100644 --- a/docs/root/configuration/operations/overload_manager/overload_manager.rst +++ b/docs/root/configuration/operations/overload_manager/overload_manager.rst @@ -53,6 +53,30 @@ The following overload actions are supported: envoy.overload_actions.stop_accepting_connections, Envoy will stop accepting new network connections on its configured listeners envoy.overload_actions.shrink_heap, Envoy will periodically try to shrink the heap by releasing free memory to the system +Limiting Active Connections +--------------------------- + +Currently, the only supported way to limit the total number of active connections allowed across all +listeners is via specifying an integer through the runtime key +``overload.global_downstream_max_connections``. The connection limit is recommended to be less than +half of the system's file descriptor limit, to account for upstream connections, files, and other +usage of file descriptors. +If the value is unspecified, there is no global limit on the number of active downstream connections +and Envoy will emit a warning indicating this at startup. To disable the warning without setting a +limit on the number of active downstream connections, the runtime value may be set to a very large +limit (~2e9). + +If it is desired to only limit the number of downstream connections for a particular listener, +per-listener limits can be set via the :ref:`listener configuration `. + +One may simultaneously specify both per-listener and global downstream connection limits and the +conditions will be enforced independently. For instance, if it is known that a particular listener +should have a smaller number of open connections than others, one may specify a smaller connection +limit for that specific listener and allow the global limit to enforce resource utilization among +all listeners. + +An example configuration can be found in the :ref:`edge best practices document `. + Statistics ---------- diff --git a/docs/root/faq/configuration/resource_limits.rst b/docs/root/faq/configuration/resource_limits.rst new file mode 100644 index 000000000000..214096486eb6 --- /dev/null +++ b/docs/root/faq/configuration/resource_limits.rst @@ -0,0 +1,20 @@ +.. _faq_resource_limits: + +How does Envoy prevent file descriptor exhaustion? +================================================== + +:ref:`Per-listener connection limits ` may be configured as an upper bound +on the number of active connections a particular listener will accept. The listener may accept more +connections than the configured value on the order of the number of worker threads. + +In addition, one may configure a :ref:`global limit ` on the number of +connections that will apply across all listeners. + +On Unix-based systems, it is recommended to keep the sum of all connection limits less than half of +the system's file descriptor limit to account for upstream connections, files, and other usage of +file descriptors. + +.. note:: + + This per-listener connection limiting will eventually be handled by the :ref:`overload manager + `. diff --git a/docs/root/faq/configuration/timeouts.rst b/docs/root/faq/configuration/timeouts.rst index 46d927acf299..955037607f34 100644 --- a/docs/root/faq/configuration/timeouts.rst +++ b/docs/root/faq/configuration/timeouts.rst @@ -52,7 +52,9 @@ context request/stream is interchangeable. ` is the amount of time that the connection manager will allow a stream to exist with no upstream or downstream activity. The default stream idle timeout is *5 minutes*. This timeout is strongly - recommended for streaming APIs (requests or responses that never end). + recommended for all requests (not just streaming requests/responses) as it additionally defends + against an HTTP/2 peer that does not open stream window once an entire response has been buffered + to be sent to a downstream client). Route timeouts ^^^^^^^^^^^^^^ diff --git a/docs/root/faq/overview.rst b/docs/root/faq/overview.rst index 663a953435b8..9a484a57d75e 100644 --- a/docs/root/faq/overview.rst +++ b/docs/root/faq/overview.rst @@ -33,6 +33,7 @@ Configuration configuration/zipkin_tracing configuration/flow_control configuration/timeouts + configuration/resource_limits Load balancing -------------- diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index c398536883f4..f0c76944e7ce 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -9,6 +9,10 @@ Version history * http: added HTTP/1.1 flood protection. Can be temporarily disabled using the runtime feature `envoy.reloadable_features.http1_flood_protection`. 1.12.5 (Pending) ================ +* http: the :ref:`stream_idle_timeout ` + now also defends against an HTTP/2 peer that does not open stream window once an entire response has been buffered to be sent to a downstream client. +* listener: add runtime support for `per-listener limits ` on active/accepted connections. +* overload management: add runtime support for :ref:`global limits ` on active/accepted connections. 1.12.4 (June 8, 2020) ===================== diff --git a/examples/front-proxy/front-envoy.yaml b/examples/front-proxy/front-envoy.yaml index 5bed8c849017..e19e6109b28e 100644 --- a/examples/front-proxy/front-envoy.yaml +++ b/examples/front-proxy/front-envoy.yaml @@ -4,6 +4,7 @@ static_resources: socket_address: address: 0.0.0.0 port_value: 80 + name: example_listener_name filter_chains: - filters: - name: envoy.http_connection_manager @@ -64,3 +65,12 @@ admin: socket_address: address: 0.0.0.0 port_value: 8001 +layered_runtime: + layers: + - name: static_layer_0 + static_layer: + envoy: + resource_limits: + listener: + example_listener_name: + connection_limit: 10000 diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 801a5c0de9ee..687cd9ec4f83 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -57,6 +57,15 @@ class Instance { public: virtual ~Instance() = default; + /** + * Register function to call when the last byte in the last slice of this + * buffer has fully drained. Note that slices may be transferred to + * downstream buffers, drain trackers are transferred along with the bytes + * they track so the function is called only after the last byte is drained + * from all buffers. + */ + virtual void addDrainTracker(std::function drain_tracker) PURE; + /** * Copy data into the buffer (deprecated, use absl::string_view variant * instead). diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 592635788c37..5e4d55423d7d 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -203,6 +203,13 @@ class Stream { * @return uint32_t the stream's configured buffer limits. */ virtual uint32_t bufferLimit() PURE; + + /** + * Set the flush timeout for the stream. At the codec level this is used to bound the amount of + * time the codec will wait to flush body data pending open stream window. It does *not* count + * small window updates as satisfying the idle timeout as this is a potential DoS vector. + */ + virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE; }; /** diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index fbb0ff7c17f9..ef8688cddaee 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -6,6 +6,7 @@ #include "envoy/api/io_error.h" #include "envoy/common/exception.h" +#include "envoy/common/resource.h" #include "envoy/network/connection.h" #include "envoy/network/connection_balancer.h" #include "envoy/network/listen_socket.h" @@ -108,6 +109,11 @@ class ListenerConfig { * though the implementation may be a NOP balancer. */ virtual ConnectionBalancer& connectionBalancer() PURE; + + /** + * Open connection resources for this listener. + */ + virtual ResourceLimit& openConnections() PURE; }; /** @@ -122,6 +128,11 @@ class ListenerCallbacks { * @param socket supplies the socket that is moved into the callee. */ virtual void onAccept(ConnectionSocketPtr&& socket) PURE; + + /** + * Called when a new connection is rejected. + */ + virtual void onReject() PURE; }; /** diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index ea7d6654f68b..7d68beb6beee 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -28,6 +28,7 @@ envoy_cc_library( "//source/common/common:stack_array", "//source/common/common:utility_lib", "//source/common/event:libevent_lib", + "//source/server:backtrace_lib", ], ) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index bc7c6fbff48d..0629b1fff76b 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -37,6 +37,12 @@ void OwnedImpl::addImpl(const void* data, uint64_t size) { } } +void OwnedImpl::addDrainTracker(std::function drain_tracker) { + ASSERT(!old_impl_); + ASSERT(!slices_.empty()); + slices_.back()->addDrainTracker(std::move(drain_tracker)); +} + void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); } void OwnedImpl::addBufferFragment(BufferFragment& fragment) { @@ -305,9 +311,11 @@ void* OwnedImpl::linearize(uint32_t size) { auto dest = static_cast(reservation.mem_); do { uint64_t data_size = slices_.front()->dataSize(); - memcpy(dest, slices_.front()->data(), data_size); - bytes_copied += data_size; - dest += data_size; + if (data_size > 0) { + memcpy(dest, slices_.front()->data(), data_size); + bytes_copied += data_size; + dest += data_size; + } slices_.pop_front(); } while (bytes_copied < linearized_size); ASSERT(dest == static_cast(reservation.mem_) + linearized_size); @@ -331,6 +339,7 @@ void OwnedImpl::coalesceOrAddSlice(SlicePtr&& other_slice) { // Copy content of the `other_slice`. The `move` methods which call this method effectively // drain the source buffer. addImpl(other_slice->data(), slice_size); + other_slice->transferDrainTrackersTo(*slices_.back()); } else { // Take ownership of the slice. slices_.emplace_back(std::move(other_slice)); diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 77d5f58f7f28..f70ccef7f6c3 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -35,7 +35,11 @@ class Slice { public: using Reservation = RawSlice; - virtual ~Slice() = default; + virtual ~Slice() { + for (const auto& drain_tracker : drain_trackers_) { + drain_tracker(); + } + } /** * @return a pointer to the start of the usable content. @@ -137,6 +141,9 @@ class Slice { */ uint64_t append(const void* data, uint64_t size) { uint64_t copy_size = std::min(size, reservableSize()); + if (copy_size == 0) { + return 0; + } uint8_t* dest = base_ + reservable_; reservable_ += copy_size; // NOLINTNEXTLINE(clang-analyzer-core.NullDereference) @@ -193,6 +200,15 @@ class Slice { */ virtual bool canCoalesce() const { return true; } + void transferDrainTrackersTo(Slice& destination) { + destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_); + ASSERT(drain_trackers_.empty()); + } + + void addDrainTracker(std::function drain_tracker) { + drain_trackers_.emplace_back(std::move(drain_tracker)); + } + protected: Slice(uint64_t data, uint64_t reservable, uint64_t capacity) : data_(data), reservable_(reservable), capacity_(capacity) {} @@ -208,6 +224,8 @@ class Slice { /** Total number of bytes in the slice */ uint64_t capacity_; + + std::list> drain_trackers_; }; using SlicePtr = std::unique_ptr; @@ -512,6 +530,7 @@ class OwnedImpl : public LibEventInstance { OwnedImpl(const void* data, uint64_t size); // Buffer::Instance + void addDrainTracker(std::function drain_tracker) override; void add(const void* data, uint64_t size) override; void addBufferFragment(BufferFragment& fragment) override; void add(absl::string_view data) override; @@ -567,6 +586,8 @@ class OwnedImpl : public LibEventInstance { */ static void useOldImpl(bool use_old_impl); + static bool newBuffersUseOldImpl() { return use_old_impl_; } + /** * Describe the in-memory representation of the slices in the buffer. For use * in tests that want to make assertions about the specific arrangement of diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index c6c3c42f3db3..d850acb3c0af 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -17,7 +17,7 @@ namespace Http { CodecClient::CodecClient(Type type, Network::ClientConnectionPtr&& connection, Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher) - : type_(type), connection_(std::move(connection)), host_(host), + : type_(type), host_(host), connection_(std::move(connection)), idle_timeout_(host_->cluster().idleTimeout()) { if (type_ != Type::HTTP3) { // Make sure upstream connections process data and then the FIN, rather than processing diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index ab29c15db85a..49ae4bc17a45 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -155,9 +155,11 @@ class CodecClient : Logger::Loggable, } const Type type_; - ClientConnectionPtr codec_; - Network::ClientConnectionPtr connection_; + // The order of host_, connection_, and codec_ matter as during destruction each can refer to + // the previous, at least in tests. Upstream::HostDescriptionConstSharedPtr host_; + Network::ClientConnectionPtr connection_; + ClientConnectionPtr codec_; Event::TimerPtr idle_timer_; const absl::optional idle_timeout_; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index aa7de6f07fed..90866fb2fb3a 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -261,6 +261,7 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder, new_stream->state_.is_internally_created_ = is_internally_created; new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); + new_stream->response_encoder_->getStream().setFlushTimeout(new_stream->idle_timeout_ms_); new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit(); // If the network connection is backed up, the stream should be made aware of it on creation. // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacks_. @@ -866,7 +867,10 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, if (hasCachedRoute()) { const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry(); if (route_entry != nullptr && route_entry->idleTimeout()) { + // TODO(mattklein123): Technically if the cached route changes, we should also see if the + // route idle timeout has changed and update the value. idle_timeout_ms_ = route_entry->idleTimeout().value(); + response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_); if (idle_timeout_ms_.count()) { // If we have a route-level idle timeout but no global stream idle timeout, create a timer. if (stream_idle_timer_ == nullptr) { diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 04d8ae1294a4..cf718cec06b4 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -423,6 +423,20 @@ void ConnectionImpl::completeLastHeader() { ASSERT(current_header_value_.empty()); } +uint32_t ConnectionImpl::getHeadersSize() { + return current_header_field_.size() + current_header_value_.size() + + (current_header_map_->byteSize() ? *current_header_map_->byteSize() : 0); +} + +void ConnectionImpl::checkMaxHeadersSize() { + const uint32_t total = getHeadersSize(); + if (total > (max_headers_kb_ * 1024)) { + error_code_ = Http::Code::RequestHeaderFieldsTooLarge; + sendProtocolError(); + throw CodecProtocolException("headers size exceeds limit"); + } +} + bool ConnectionImpl::maybeDirectDispatch(Buffer::Instance& data) { if (!handling_upgrade_) { // Only direct dispatch for Upgrade requests. @@ -494,6 +508,8 @@ void ConnectionImpl::onHeaderField(const char* data, size_t length) { } current_header_field_.append(data, length); + + checkMaxHeadersSize(); } void ConnectionImpl::onHeaderValue(const char* data, size_t length) { @@ -524,16 +540,7 @@ void ConnectionImpl::onHeaderValue(const char* data, size_t length) { header_parsing_state_ = HeaderParsingState::Value; current_header_value_.append(header_value.data(), header_value.length()); - // Verify that the cached value in byte size exists. - ASSERT(current_header_map_->byteSize().has_value()); - const uint32_t total = current_header_field_.size() + current_header_value_.size() + - current_header_map_->byteSize().value(); - if (total > (max_headers_kb_ * 1024)) { - - error_code_ = Http::Code::RequestHeaderFieldsTooLarge; - sendProtocolError(); - throw CodecProtocolException("headers size exceeds limit"); - } + checkMaxHeadersSize(); } int ConnectionImpl::onHeadersCompleteBase() { @@ -634,6 +641,12 @@ ServerConnectionImpl::ServerConnectionImpl( Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_flood_protection")), headers_with_underscores_action_(headers_with_underscores_action) {} +uint32_t ServerConnectionImpl::getHeadersSize() { + // Add in the the size of the request URL if processing request headers. + const uint32_t url_size = active_request_ ? active_request_->request_url_.size() : 0; + return url_size + ConnectionImpl::getHeadersSize(); +} + void ServerConnectionImpl::onEncodeComplete() { ASSERT(active_request_); if (active_request_->remote_complete_) { @@ -741,6 +754,8 @@ void ServerConnectionImpl::onMessageBegin() { void ServerConnectionImpl::onUrl(const char* data, size_t length) { if (active_request_) { active_request_->request_url_.append(data, length); + + checkMaxHeadersSize(); } } diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index e16ec8401286..0c85f48c9294 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -65,6 +65,11 @@ class StreamEncoderImpl : public StreamEncoder, void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; uint32_t bufferLimit() override; + void setFlushTimeout(std::chrono::milliseconds) override { + // HTTP/1 has one stream per connection, thus any data encoded is immediately written to the + // connection, invoking any watermarks as necessary. There is no internal buffering that would + // require a flush timeout not already covered by other timeouts. + } void isResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } @@ -209,6 +214,20 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable 0) { setWriteBufferWatermarks(buffer_limit / 2, buffer_limit); } } +ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr); } + +void ConnectionImpl::StreamImpl::destroy() { + disarmStreamIdleTimer(); + parent_.stats_.streams_active_.dec(); + parent_.stats_.pending_send_bytes_.sub(pending_send_data_.length()); +} + static void insertHeader(std::vector& headers, const HeaderEntry& header) { uint8_t flags = 0; if (header.key().type() == HeaderString::Type::Reference) { @@ -130,6 +139,7 @@ void ConnectionImpl::StreamImpl::encodeTrailers(const HeaderMap& trailers) { // waiting on window updates. We need to save the trailers so that we can emit them later. ASSERT(!pending_trailers_); pending_trailers_ = std::make_unique(trailers); + createPendingFlushTimer(); } else { submitTrailers(trailers); parent_.sendPendingFrames(); @@ -262,6 +272,7 @@ int ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t return NGHTTP2_ERR_FLOODED; } + parent_.stats_.pending_send_bytes_.sub(length); output.move(pending_send_data_, length); parent_.connection_.write(output, false); return 0; @@ -283,9 +294,30 @@ void ConnectionImpl::ServerStreamImpl::submitHeaders(const std::vector 0) { + stream_idle_timer_ = + parent_.connection_.dispatcher().createTimer([this] { onPendingFlushTimer(); }); + stream_idle_timer_->enableTimer(stream_idle_timeout_); + } +} + +void ConnectionImpl::StreamImpl::onPendingFlushTimer() { + ENVOY_CONN_LOG(debug, "pending stream flush timeout", parent_.connection_); + stream_idle_timer_.reset(); + parent_.stats_.tx_flush_timeout_.inc(); + ASSERT(local_end_stream_ && !local_end_stream_sent_); + // This will emit a reset frame for this stream and close the stream locally. No reset callbacks + // will be run because higher layers think the stream is already finished. + resetStreamWorker(StreamResetReason::LocalReset); + parent_.sendPendingFrames(); +} + void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { ASSERT(!local_end_stream_); local_end_stream_ = end_stream; + parent_.stats_.pending_send_bytes_.add(data.length()); pending_send_data_.move(data); if (data_deferred_) { int rc = nghttp2_session_resume_data(parent_.session_, stream_id_); @@ -295,6 +327,9 @@ void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_str } parent_.sendPendingFrames(); + if (local_end_stream_ && pending_send_data_.length() > 0) { + createPendingFlushTimer(); + } } void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) { @@ -373,22 +408,20 @@ bool checkRuntimeOverride(bool config_value, const char* override_key) { ConnectionImpl::ConnectionImpl(Network::Connection& connection, Stats::Scope& stats, const Http2Settings& http2_settings, const uint32_t max_headers_kb, const uint32_t max_headers_count) - : stats_{ALL_HTTP2_CODEC_STATS(POOL_COUNTER_PREFIX(stats, "http2."))}, connection_(connection), - max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count), + : stats_{ALL_HTTP2_CODEC_STATS(POOL_COUNTER_PREFIX(stats, "http2."), + POOL_GAUGE_PREFIX(stats, "http2."))}, + connection_(connection), max_headers_kb_(max_headers_kb), + max_headers_count_(max_headers_count), per_stream_buffer_limit_(http2_settings.initial_stream_window_size_), stream_error_on_invalid_http_messaging_(checkRuntimeOverride( http2_settings.stream_error_on_invalid_http_messaging_, InvalidHttpMessagingOverrideKey)), flood_detected_(false), max_outbound_frames_( Runtime::getInteger(MaxOutboundFramesOverrideKey, http2_settings.max_outbound_frames_)), - frame_buffer_releasor_([this](const Buffer::OwnedBufferFragmentImpl* fragment) { - releaseOutboundFrame(fragment); - }), + frame_buffer_releasor_([this]() { releaseOutboundFrame(); }), max_outbound_control_frames_(Runtime::getInteger( MaxOutboundControlFramesOverrideKey, http2_settings.max_outbound_control_frames_)), - control_frame_buffer_releasor_([this](const Buffer::OwnedBufferFragmentImpl* fragment) { - releaseOutboundControlFrame(fragment); - }), + control_frame_buffer_releasor_([this]() { releaseOutboundControlFrame(); }), max_consecutive_inbound_frames_with_empty_payload_( Runtime::getInteger(MaxConsecutiveInboundFramesWithEmptyPayloadOverrideKey, http2_settings.max_consecutive_inbound_frames_with_empty_payload_)), @@ -400,7 +433,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, Stats::Scope& st http2_settings.max_inbound_window_update_frames_per_data_frame_sent_)), dispatching_(false), raised_goaway_(false), pending_deferred_reset_(false) {} -ConnectionImpl::~ConnectionImpl() { nghttp2_session_del(session_); } +ConnectionImpl::~ConnectionImpl() { + for (const auto& stream : active_streams_) { + stream->destroy(); + } + nghttp2_session_del(session_); +} void ConnectionImpl::dispatch(Buffer::Instance& data) { ENVOY_CONN_LOG(trace, "dispatching {} bytes", connection_, data.length()); @@ -611,6 +649,15 @@ int ConnectionImpl::onFrameSend(const nghttp2_frame* frame) { case NGHTTP2_GOAWAY: { ENVOY_CONN_LOG(debug, "sent goaway code={}", connection_, frame->goaway.error_code); if (frame->goaway.error_code != NGHTTP2_NO_ERROR) { + // TODO(mattklein123): Returning this error code abandons standard nghttp2 frame accounting. + // As such, it is not reliable to call sendPendingFrames() again after this and we assume + // that the connection is going to get torn down immediately. One byproduct of this is that + // we need to cancel all pending flush stream timeouts since they can race with connection + // teardown. As part of the work to remove exceptions we should aim to clean up all of this + // error handling logic and only handle this type of case at the end of dispatch. + for (auto& stream : active_streams_) { + stream->disarmStreamIdleTimer(); + } return NGHTTP2_ERR_CALLBACK_FAILURE; } break; @@ -694,27 +741,21 @@ bool ConnectionImpl::addOutboundFrameFragment(Buffer::OwnedImpl& output, const u return false; } - auto fragment = Buffer::OwnedBufferFragmentImpl::create( - absl::string_view(reinterpret_cast(data), length), - is_outbound_flood_monitored_control_frame ? control_frame_buffer_releasor_ - : frame_buffer_releasor_); - - // The Buffer::OwnedBufferFragmentImpl object will be deleted in the *frame_buffer_releasor_ - // callback. - output.addBufferFragment(*fragment.release()); + output.add(data, length); + output.addDrainTracker(is_outbound_flood_monitored_control_frame ? control_frame_buffer_releasor_ + : frame_buffer_releasor_); return true; } -void ConnectionImpl::releaseOutboundFrame(const Buffer::OwnedBufferFragmentImpl* fragment) { +void ConnectionImpl::releaseOutboundFrame() { ASSERT(outbound_frames_ >= 1); --outbound_frames_; - delete fragment; } -void ConnectionImpl::releaseOutboundControlFrame(const Buffer::OwnedBufferFragmentImpl* fragment) { +void ConnectionImpl::releaseOutboundControlFrame() { ASSERT(outbound_control_frames_ >= 1); --outbound_control_frames_; - releaseOutboundFrame(fragment); + releaseOutboundFrame(); } ssize_t ConnectionImpl::onSend(const uint8_t* data, size_t length) { @@ -760,6 +801,7 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { stream->runResetCallbacks(reason); } + stream->destroy(); connection_.dispatcher().deferredDelete(stream->removeFromList(active_streams_)); // Any unconsumed data must be consumed before the stream is deleted. // nghttp2 does not appear to track this internally, and any stream deleted diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index e36fc0eab514..9fb56887537f 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -40,7 +40,7 @@ const std::string CLIENT_MAGIC_PREFIX = "PRI * HTTP/2"; /** * All stats for the HTTP/2 codec. @see stats_macros.h */ -#define ALL_HTTP2_CODEC_STATS(COUNTER) \ +#define ALL_HTTP2_CODEC_STATS(COUNTER, GAUGE) \ COUNTER(dropped_headers_with_underscores) \ COUNTER(header_overflow) \ COUNTER(headers_cb_no_stream) \ @@ -54,13 +54,16 @@ const std::string CLIENT_MAGIC_PREFIX = "PRI * HTTP/2"; COUNTER(rx_reset) \ COUNTER(too_many_header_frames) \ COUNTER(trailers) \ - COUNTER(tx_reset) + COUNTER(tx_flush_timeout) \ + COUNTER(tx_reset) \ + GAUGE(streams_active, Accumulate) \ + GAUGE(pending_send_bytes, Accumulate) /** * Wrapper struct for the HTTP/2 codec stats. @see stats_macros.h */ struct CodecStats { - ALL_HTTP2_CODEC_STATS(GENERATE_COUNTER_STRUCT) + ALL_HTTP2_CODEC_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) }; class Utility { @@ -150,6 +153,18 @@ class ConnectionImpl : public virtual Connection, protected Logger::LoggabledisableTimer(); + stream_idle_timer_.reset(); + } + } StreamImpl* base() { return this; } ssize_t onDataSourceRead(uint64_t length, uint32_t* data_flags); @@ -161,6 +176,8 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable; @@ -253,6 +276,10 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable frame_buffer_releasor_; // This counter keeps track of the number of outbound frames of types PING, SETTINGS and // RST_STREAM (these that were buffered in the underlying connection but not yet written into the // socket). If this counter exceeds the `max_outbound_control_frames_' value the connection is @@ -330,7 +358,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable control_frame_buffer_releasor_; // This counter keeps track of the number of consecutive inbound frames of types HEADERS, // CONTINUATION and DATA with an empty payload and no end stream flag. If this counter exceeds // the `max_consecutive_inbound_frames_with_empty_payload_` value the connection is terminated. @@ -395,9 +423,8 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable AcceptedSocketImpl::global_accepted_socket_count_; + } // namespace Network } // namespace Envoy diff --git a/source/common/network/listen_socket_impl.h b/source/common/network/listen_socket_impl.h index a5247e68f192..d74d2d2e3785 100644 --- a/source/common/network/listen_socket_impl.h +++ b/source/common/network/listen_socket_impl.h @@ -170,7 +170,21 @@ class AcceptedSocketImpl : public ConnectionSocketImpl { public: AcceptedSocketImpl(IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& local_address, const Address::InstanceConstSharedPtr& remote_address) - : ConnectionSocketImpl(std::move(io_handle), local_address, remote_address) {} + : ConnectionSocketImpl(std::move(io_handle), local_address, remote_address) { + ++global_accepted_socket_count_; + } + + ~AcceptedSocketImpl() override { + ASSERT(global_accepted_socket_count_.load() > 0); + --global_accepted_socket_count_; + } + + // TODO (tonya11en): Global connection count tracking is temporarily performed via a static + // variable until the logic is moved into the overload manager. + static uint64_t acceptedSocketCount() { return global_accepted_socket_count_.load(); } + +private: + static std::atomic global_accepted_socket_count_; }; // ConnectionSocket used with client connections. diff --git a/source/common/network/listener_impl.cc b/source/common/network/listener_impl.cc index bd0679464cc3..26efba21dd18 100644 --- a/source/common/network/listener_impl.cc +++ b/source/common/network/listener_impl.cc @@ -17,6 +17,30 @@ namespace Envoy { namespace Network { +const std::string ListenerImpl::GlobalMaxCxRuntimeKey = + "overload.global_downstream_max_connections"; + +bool ListenerImpl::rejectCxOverGlobalLimit() { + // Enforce the global connection limit if necessary, immediately closing the accepted connection. + Runtime::Loader* runtime = Runtime::LoaderSingleton::getExisting(); + + if (runtime == nullptr) { + // The runtime singleton won't exist in most unit tests that do not need global downstream limit + // enforcement. Therefore, there is no need to enforce limits if the singleton doesn't exist. + // TODO(tonya11en): Revisit this once runtime is made globally available. + return false; + } + + // If the connection limit is not set, don't limit the connections, but still track them. + // TODO(tonya11en): In integration tests, threadsafeSnapshot is necessary since the FakeUpstreams + // use a listener and do not run in a worker thread. In practice, this code path will always be + // run on a worker thread, but to prevent failed assertions in test environments, threadsafe + // snapshots must be used. This must be revisited. + const uint64_t global_cx_limit = runtime->threadsafeSnapshot()->getInteger( + GlobalMaxCxRuntimeKey, std::numeric_limits::max()); + return AcceptedSocketImpl::acceptedSocketCount() >= global_cx_limit; +} + void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) { ListenerImpl* listener = static_cast(arg); @@ -24,6 +48,13 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* // Create the IoSocketHandleImpl for the fd here. IoHandlePtr io_handle = std::make_unique(fd); + if (rejectCxOverGlobalLimit()) { + // The global connection limit has been reached. + io_handle->close(); + listener->cb_.onReject(); + return; + } + // Get the local address from the new socket if the listener is listening on IP ANY // (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case). const Address::InstanceConstSharedPtr& local_address = diff --git a/source/common/network/listener_impl.h b/source/common/network/listener_impl.h index f9d932e9dd34..c62c8514d3b8 100644 --- a/source/common/network/listener_impl.h +++ b/source/common/network/listener_impl.h @@ -1,5 +1,8 @@ #pragma once +#include "envoy/runtime/runtime.h" + +#include "absl/strings/string_view.h" #include "base_listener_impl.h" namespace Envoy { @@ -17,6 +20,8 @@ class ListenerImpl : public BaseListenerImpl { void disable() override; void enable() override; + static const std::string GlobalMaxCxRuntimeKey; + protected: void setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket); @@ -27,6 +32,10 @@ class ListenerImpl : public BaseListenerImpl { int remote_addr_len, void* arg); static void errorCallback(evconnlistener* listener, void* context); + // Returns true if global connection limit has been reached and the accepted socket should be + // rejected/closed. If the accepted socket is to be admitted, false is returned. + static bool rejectCxOverGlobalLimit(); + Event::Libevent::ListenerPtr listener_; }; diff --git a/source/common/upstream/resource_manager_impl.h b/source/common/upstream/resource_manager_impl.h index f9ae0c34ea2a..b48ba8a42c26 100644 --- a/source/common/upstream/resource_manager_impl.h +++ b/source/common/upstream/resource_manager_impl.h @@ -58,13 +58,14 @@ class ResourceManagerImpl : public ResourceManager { remaining_.set(max); } - // Upstream::Resource - bool canCreate() override { return current_ < max(); } + ~ManagedResourceImpl() override { ASSERT(count() == 0); } + void inc() override { BasicResourceLimitImpl::inc(); updateRemaining(); open_gauge_.set(BasicResourceLimitImpl::canCreate() ? 0 : 1); } + void decBy(uint64_t amount) override { BasicResourceLimitImpl::decBy(amount); updateRemaining(); diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h index 047970f4fdbe..16183f0807ad 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h @@ -34,6 +34,9 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, public Envo // Http::Stream void resetStream(Http::StreamResetReason reason) override; void readDisable(bool disable) override; + void setFlushTimeout(std::chrono::milliseconds) override { + // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. + } // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index ed342c64379b..288e8853ffeb 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -275,6 +275,14 @@ void ConnectionHandlerImpl::ActiveTcpSocket::newConnection() { } void ConnectionHandlerImpl::ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) { + if (listenerConnectionLimitReached()) { + ENVOY_LOG(trace, "closing connection: listener connection limit reached for {}", + config_.name()); + socket->close(); + stats_.downstream_cx_overflow_.inc(); + return; + } + onAcceptWorker(std::move(socket), config_.handOffRestoredDestinationConnections(), false); } diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index cc809fcf4ea3..8bdb52360214 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -27,7 +27,9 @@ namespace Server { #define ALL_LISTENER_STATS(COUNTER, GAUGE, HISTOGRAM) \ COUNTER(downstream_cx_destroy) \ + COUNTER(downstream_cx_overflow) \ COUNTER(downstream_cx_total) \ + COUNTER(downstream_global_cx_overflow) \ COUNTER(downstream_pre_cx_timeout) \ COUNTER(no_filter_chain_match) \ GAUGE(downstream_cx_active, Accumulate) \ @@ -106,15 +108,22 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, ActiveTcpListener(ConnectionHandlerImpl& parent, Network::ListenerPtr&& listener, Network::ListenerConfig& config); ~ActiveTcpListener() override; + bool listenerConnectionLimitReached() const { + // TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload + // manager. + return !config_.openConnections().canCreate(); + } void onAcceptWorker(Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced); void decNumConnections() { ASSERT(num_listener_connections_ > 0); --num_listener_connections_; + config_.openConnections().dec(); } // Network::ListenerCallbacks void onAccept(Network::ConnectionSocketPtr&& socket) override; + void onReject() override { stats_.downstream_global_cx_overflow_.inc(); } // ActiveListenerImplBase Network::Listener* listener() override { return listener_.get(); } @@ -122,7 +131,10 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, // Network::BalancedConnectionHandler uint64_t numConnections() const override { return num_listener_connections_; } - void incNumConnections() override { ++num_listener_connections_; } + void incNumConnections() override { + ++num_listener_connections_; + config_.openConnections().inc(); + } void post(Network::ConnectionSocketPtr&& socket) override; /** diff --git a/source/server/http/BUILD b/source/server/http/BUILD index 0600a4174a0e..9011b91b2692 100644 --- a/source/server/http/BUILD +++ b/source/server/http/BUILD @@ -32,6 +32,7 @@ envoy_cc_library( "//source/common/access_log:access_log_lib", "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", + "//source/common/common:basic_resource_lib", "//source/common/common:empty_string", "//source/common/common:enum_to_int", "//source/common/common:macros", diff --git a/source/server/http/admin.h b/source/server/http/admin.h index e063442e8569..967702596a4f 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -20,6 +20,7 @@ #include "envoy/upstream/outlier_detection.h" #include "envoy/upstream/resource_manager.h" +#include "common/common/basic_resource_impl.h" #include "common/common/empty_string.h" #include "common/common/logger.h" #include "common/common/macros.h" @@ -356,12 +357,14 @@ class AdminImpl : public Admin, return envoy::api::v2::core::TrafficDirection::UNSPECIFIED; } Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } + ResourceLimit& openConnections() override { return open_connections_; } AdminImpl& parent_; const std::string name_; Stats::ScopePtr scope_; Http::ConnectionManagerListenerStats stats_; Network::NopConnectionBalancerImpl connection_balancer_; + BasicResourceLimitImpl open_connections_; }; using AdminListenerPtr = std::unique_ptr; diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 32661629bd52..36973965401a 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -50,7 +50,11 @@ ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::st config_(config), version_info_(version_info), listener_filters_timeout_( PROTOBUF_GET_MS_OR_DEFAULT(config, listener_filters_timeout, 15000)), - continue_on_listener_filters_timeout_(config.continue_on_listener_filters_timeout()) { + continue_on_listener_filters_timeout_(config.continue_on_listener_filters_timeout()), + cx_limit_runtime_key_("envoy.resource_limits.listener." + config_.name() + + ".connection_limit"), + open_connections_(std::make_shared( + std::numeric_limits::max(), parent.server_.runtime(), cx_limit_runtime_key_)) { if (config.has_transparent()) { addListenSocketOptions(Network::SocketOptionFactory::buildIpTransparentOptions()); } @@ -77,6 +81,15 @@ ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::st udp_listener_factory_ = config_factory.createActiveUdpListenerFactory(*message); } + const absl::optional runtime_val = + parent_.server_.runtime().snapshot().get(cx_limit_runtime_key_); + if (runtime_val && runtime_val->empty()) { + ENVOY_LOG(warn, + "Listener connection limit runtime key {} is empty. There are currently no " + "limitations on the number of accepted connections for listener {}.", + cx_limit_runtime_key_, config_.name()); + } + if (!config.listener_filters().empty()) { switch (socket_type_) { case Network::Address::SocketType::Datagram: @@ -316,4 +329,4 @@ void ListenerImpl::setSocket(const Network::SocketSharedPtr& socket) { } } // namespace Server -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 21e06155aa42..f14648241594 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -8,6 +8,7 @@ #include "envoy/server/filter_config.h" #include "envoy/stats/scope.h" +#include "common/common/basic_resource_impl.h" #include "common/common/logger.h" #include "common/init/manager_impl.h" @@ -101,6 +102,14 @@ class ListenerImpl : public Network::ListenerConfig, return udp_listener_factory_.get(); } Network::ConnectionBalancer& connectionBalancer() override { return *connection_balancer_; } + ResourceLimit& openConnections() override { return *open_connections_; } + + void ensureSocketOptions() { + if (!listen_socket_options_) { + listen_socket_options_ = + std::make_shared>(); + } + } // Server::Configuration::ListenerFactoryContext AccessLog::AccessLogManager& accessLogManager() override; @@ -130,12 +139,6 @@ class ListenerImpl : public Network::ListenerConfig, OptProcessContextRef processContext() override; Configuration::ServerFactoryContext& getServerFactoryContext() const override; - void ensureSocketOptions() { - if (!listen_socket_options_) { - listen_socket_options_ = - std::make_shared>(); - } - } // Network::DrainDecision bool drainClose() const override; @@ -153,6 +156,7 @@ class ListenerImpl : public Network::ListenerConfig, ensureSocketOptions(); listen_socket_options_->emplace_back(std::move(option)); } + void addListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) { ensureSocketOptions(); Network::Socket::appendOptions(listen_socket_options_, options); @@ -195,9 +199,15 @@ class ListenerImpl : public Network::ListenerConfig, Network::ActiveUdpListenerFactoryPtr udp_listener_factory_; Network::ConnectionBalancerPtr connection_balancer_; + // Per-listener connection limits are only specified via runtime. + // + // TODO (tonya11en): Move this functionality into the overload manager. + const std::string cx_limit_runtime_key_; + std::shared_ptr open_connections_; + // to access ListenerManagerImpl::factory_. friend class ListenerFilterChainFactoryBuilder; }; } // namespace Server -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/source/server/server.cc b/source/server/server.cc index 5a136fbf5e53..0299a7157513 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -31,6 +31,7 @@ #include "common/local_info/local_info_impl.h" #include "common/memory/stats.h" #include "common/network/address_impl.h" +#include "common/network/listener_impl.h" #include "common/protobuf/utility.h" #include "common/router/rds_impl.h" #include "common/runtime/runtime_impl.h" @@ -277,6 +278,8 @@ void InstanceImpl::initialize(const Options& options, // Enable the selected buffer implementation (old libevent evbuffer version or new native // version) early in the initialization, before any buffers can be created. + RELEASE_ASSERT(!options.libeventBufferEnabled(), + "old_impl not supported for buffers"); // This option is no longer supported. Buffer::OwnedImpl::useOldImpl(options.libeventBufferEnabled()); ENVOY_LOG(info, "buffer implementation: {}", Buffer::OwnedImpl().usesOldImpl() ? "old (libevent)" : "new"); @@ -466,6 +469,15 @@ void InstanceImpl::initialize(const Options& options, // GuardDog (deadlock detection) object and thread setup before workers are // started and before our own run() loop runs. guard_dog_ = std::make_unique(stats_store_, config_, *api_); + + // If there is no global limit to the number of active connections, warn on startup. + // TODO (tonya11en): Move this functionality into the overload manager. + if (runtime().snapshot().get(Network::ListenerImpl::GlobalMaxCxRuntimeKey) == EMPTY_STRING) { + ENVOY_LOG(warn, + "there is no configured limit to the number of allowed active connections. Set a " + "limit via the runtime key {}", + Network::ListenerImpl::GlobalMaxCxRuntimeKey); + } } void InstanceImpl::startWorkers() { diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index fe2453905ce6..787830f9e503 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -67,6 +67,12 @@ void releaseFragmentAllocation(const void* p, size_t, const Buffer::BufferFragme // walk off the edge; the caller should be guaranteeing this. class StringBuffer : public Buffer::Instance { public: + void addDrainTracker(std::function drain_tracker) override { + // Not implemented well. + ASSERT(false); + drain_tracker(); + } + void add(const void* data, uint64_t size) override { FUZZ_ASSERT(start_ + size_ + size <= data_.size()); ::memcpy(mutableEnd(), data, size); @@ -478,7 +484,8 @@ void executeActions(const test::common::buffer::BufferFuzzTestCase& input, Buffe void BufferFuzz::bufferFuzz(const test::common::buffer::BufferFuzzTestCase& input, bool old_impl) { ENVOY_LOG_MISC(trace, "Using {} buffer implementation", old_impl ? "old" : "new"); - Buffer::OwnedImpl::useOldImpl(old_impl); + // Buffer::OwnedImpl::useOldImpl(old_impl); no longer supported + Buffer::OwnedImpl::useOldImpl(false); Context ctxt; // Fuzzed buffers. BufferList buffers; diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index 6e744a9e418c..62e5578da56f 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -41,12 +41,21 @@ class OwnedImplTest : public BufferImplementationParamTest { return; } const auto& buffer_slices = buffer.describeSlicesForTest(); + ASSERT_EQ(buffer_list.size(), buffer_slices.size()); for (uint64_t i = 0; i < buffer_slices.size(); i++) { EXPECT_EQ(buffer_slices[i].data, buffer_list[i][0]); EXPECT_EQ(buffer_slices[i].reservable, buffer_list[i][1]); EXPECT_EQ(buffer_slices[i].capacity, buffer_list[i][2]); } } + + static void expectFirstSlice(std::vector slice_description, OwnedImpl& buffer) { + const auto& buffer_slices = buffer.describeSlicesForTest(); + ASSERT_LE(1, buffer_slices.size()); + EXPECT_EQ(buffer_slices[0].data, slice_description[0]); + EXPECT_EQ(buffer_slices[0].reservable, slice_description[1]); + EXPECT_EQ(buffer_slices[0].capacity, slice_description[2]); + } }; INSTANTIATE_TEST_SUITE_P(OwnedImplTest, OwnedImplTest, @@ -89,6 +98,7 @@ TEST_P(OwnedImplTest, AddEmptyFragment) { BufferFragmentImpl frag2("", 0, [this](const void*, size_t, const BufferFragmentImpl*) { release_callback_called_ = true; }); + BufferFragmentImpl frag3(input, 11, [](const void*, size_t, const BufferFragmentImpl*) {}); Buffer::OwnedImpl buffer; buffer.addBufferFragment(frag1); EXPECT_EQ(11, buffer.length()); @@ -96,7 +106,18 @@ TEST_P(OwnedImplTest, AddEmptyFragment) { buffer.addBufferFragment(frag2); EXPECT_EQ(11, buffer.length()); - buffer.drain(11); + buffer.addBufferFragment(frag3); + EXPECT_EQ(22, buffer.length()); + + // Cover case of copying a buffer with an empty fragment. + Buffer::OwnedImpl buffer2; + buffer2.add(buffer); + + // Cover copyOut + std::unique_ptr outbuf(new char[buffer.length()]); + buffer.copyOut(0, buffer.length(), outbuf.get()); + + buffer.drain(22); EXPECT_EQ(0, buffer.length()); EXPECT_TRUE(release_callback_called_); } @@ -344,6 +365,300 @@ TEST_P(OwnedImplTest, Read) { EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); } +TEST_P(OwnedImplTest, DrainTracking) { + if (GetParam() == BufferImplementation::Old) { + return; + } + testing::InSequence s; + + Buffer::OwnedImpl buffer; + buffer.add("a"); + + testing::MockFunction tracker1; + testing::MockFunction tracker2; + buffer.addDrainTracker(tracker1.AsStdFunction()); + buffer.addDrainTracker(tracker2.AsStdFunction()); + + testing::MockFunction done; + EXPECT_CALL(tracker1, Call()); + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(done, Call()); + buffer.drain(buffer.length()); + done.Call(); +} + +TEST_P(OwnedImplTest, MoveDrainTrackersWhenTransferingSlices) { + if (GetParam() == BufferImplementation::Old) { + return; + } + testing::InSequence s; + + Buffer::OwnedImpl buffer1; + buffer1.add("a"); + + testing::MockFunction tracker1; + buffer1.addDrainTracker(tracker1.AsStdFunction()); + + Buffer::OwnedImpl buffer2; + buffer2.add("b"); + + testing::MockFunction tracker2; + buffer2.addDrainTracker(tracker2.AsStdFunction()); + + buffer2.add(std::string(10000, 'c')); + testing::MockFunction tracker3; + buffer2.addDrainTracker(tracker3.AsStdFunction()); + EXPECT_EQ(2, buffer2.getRawSlices(nullptr, 0)); + + buffer1.move(buffer2); + EXPECT_EQ(10002, buffer1.length()); + EXPECT_EQ(0, buffer2.length()); + EXPECT_EQ(3, buffer1.getRawSlices(nullptr, 0)); + EXPECT_EQ(0, buffer2.getRawSlices(nullptr, 0)); + + testing::MockFunction done; + EXPECT_CALL(tracker1, Call()); + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(tracker3, Call()); + EXPECT_CALL(done, Call()); + buffer1.drain(buffer1.length()); + done.Call(); +} + +TEST_P(OwnedImplTest, MoveDrainTrackersWhenCopying) { + if (GetParam() == BufferImplementation::Old) { + return; + } + testing::InSequence s; + + Buffer::OwnedImpl buffer1; + buffer1.add("a"); + + testing::MockFunction tracker1; + buffer1.addDrainTracker(tracker1.AsStdFunction()); + + Buffer::OwnedImpl buffer2; + buffer2.add("b"); + + testing::MockFunction tracker2; + buffer2.addDrainTracker(tracker2.AsStdFunction()); + + buffer1.move(buffer2); + EXPECT_EQ(2, buffer1.length()); + EXPECT_EQ(0, buffer2.length()); + EXPECT_EQ(1, buffer1.getRawSlices(nullptr, 0)); + EXPECT_EQ(0, buffer2.getRawSlices(nullptr, 0)); + + buffer1.drain(1); + testing::MockFunction done; + EXPECT_CALL(tracker1, Call()); + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(done, Call()); + buffer1.drain(1); + done.Call(); +} + +TEST_P(OwnedImplTest, PartialMoveDrainTrackers) { + if (GetParam() == BufferImplementation::Old) { + return; + } + testing::InSequence s; + + Buffer::OwnedImpl buffer1; + buffer1.add("a"); + + testing::MockFunction tracker1; + buffer1.addDrainTracker(tracker1.AsStdFunction()); + + Buffer::OwnedImpl buffer2; + buffer2.add("b"); + + testing::MockFunction tracker2; + buffer2.addDrainTracker(tracker2.AsStdFunction()); + + buffer2.add(std::string(10000, 'c')); + testing::MockFunction tracker3; + buffer2.addDrainTracker(tracker3.AsStdFunction()); + EXPECT_EQ(2, buffer2.getRawSlices(nullptr, 0)); + + // Move the first slice and associated trackers and part of the second slice to buffer1. + buffer1.move(buffer2, 4999); + EXPECT_EQ(5000, buffer1.length()); + EXPECT_EQ(5002, buffer2.length()); + EXPECT_EQ(3, buffer1.getRawSlices(nullptr, 0)); + EXPECT_EQ(1, buffer2.getRawSlices(nullptr, 0)); + + testing::MockFunction done; + EXPECT_CALL(tracker1, Call()); + buffer1.drain(1); + + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(done, Call()); + buffer1.drain(buffer1.length()); + done.Call(); + + // tracker3 remained in buffer2. + EXPECT_CALL(tracker3, Call()); + buffer2.drain(buffer2.length()); +} + +TEST_P(OwnedImplTest, DrainTrackingOnDestruction) { + if (GetParam() == BufferImplementation::Old) { + return; + } + testing::InSequence s; + + auto buffer = std::make_unique(); + buffer->add("a"); + + testing::MockFunction tracker; + buffer->addDrainTracker(tracker.AsStdFunction()); + + testing::MockFunction done; + EXPECT_CALL(tracker, Call()); + EXPECT_CALL(done, Call()); + buffer.reset(); + done.Call(); +} + +TEST_P(OwnedImplTest, Linearize) { + Buffer::OwnedImpl buffer; + + // Unowned slice to track when linearize kicks in. + std::string input(1000, 'a'); + BufferFragmentImpl frag( + input.c_str(), input.size(), + [this](const void*, size_t, const BufferFragmentImpl*) { release_callback_called_ = true; }); + buffer.addBufferFragment(frag); + + // Second slice with more data. + buffer.add(std::string(1000, 'b')); + + // Linearize does not change the pointer associated with the first slice if requested size is less + // than or equal to size of the first slice. + EXPECT_EQ(input.c_str(), buffer.linearize(input.size())); + EXPECT_FALSE(release_callback_called_); + + constexpr uint64_t LinearizeSize = 2000; + void* out_ptr = buffer.linearize(LinearizeSize); + EXPECT_TRUE(release_callback_called_); + EXPECT_EQ(input + std::string(1000, 'b'), + absl::string_view(reinterpret_cast(out_ptr), LinearizeSize)); +} + +TEST_P(OwnedImplTest, LinearizeEmptyBuffer) { + Buffer::OwnedImpl buffer; + EXPECT_EQ(nullptr, buffer.linearize(0)); +} + +TEST_P(OwnedImplTest, LinearizeSingleSlice) { + auto buffer = std::make_unique(); + + // Unowned slice to track when linearize kicks in. + std::string input(1000, 'a'); + BufferFragmentImpl frag( + input.c_str(), input.size(), + [this](const void*, size_t, const BufferFragmentImpl*) { release_callback_called_ = true; }); + buffer->addBufferFragment(frag); + + EXPECT_EQ(input.c_str(), buffer->linearize(buffer->length())); + EXPECT_FALSE(release_callback_called_); + + buffer.reset(); + EXPECT_TRUE(release_callback_called_); +} + +TEST_P(OwnedImplTest, LinearizeDrainTracking) { + if (GetParam() == BufferImplementation::Old) { + return; + } + constexpr uint32_t SmallChunk = 200; + constexpr uint32_t LargeChunk = 16384 - SmallChunk; + constexpr uint32_t LinearizeSize = SmallChunk + LargeChunk; + + // Create a buffer with a eclectic combination of buffer OwnedSlice and UnownedSlices that will + // help us explore the properties of linearize. + Buffer::OwnedImpl buffer; + + // Large add below the target linearize size. + testing::MockFunction tracker1; + buffer.add(std::string(LargeChunk, 'a')); + buffer.addDrainTracker(tracker1.AsStdFunction()); + + // Unowned slice which causes some fragmentation. + testing::MockFunction tracker2; + testing::MockFunction + release_callback_tracker; + std::string frag_input(2 * SmallChunk, 'b'); + BufferFragmentImpl frag(frag_input.c_str(), frag_input.size(), + release_callback_tracker.AsStdFunction()); + buffer.addBufferFragment(frag); + buffer.addDrainTracker(tracker2.AsStdFunction()); + + // And an unowned slice with 0 size, because. + testing::MockFunction tracker3; + testing::MockFunction + release_callback_tracker2; + BufferFragmentImpl frag2(nullptr, 0, release_callback_tracker2.AsStdFunction()); + buffer.addBufferFragment(frag2); + buffer.addDrainTracker(tracker3.AsStdFunction()); + + // Add a very large chunk + testing::MockFunction tracker4; + buffer.add(std::string(LargeChunk + LinearizeSize, 'c')); + buffer.addDrainTracker(tracker4.AsStdFunction()); + + // Small adds that create no gaps. + testing::MockFunction tracker5; + for (int i = 0; i < 105; ++i) { + buffer.add(std::string(SmallChunk, 'd')); + } + buffer.addDrainTracker(tracker5.AsStdFunction()); + + expectSlices({{16184, 136, 16320}, + {400, 0, 400}, + {0, 0, 0}, + {32704, 0, 32704}, + {4032, 0, 4032}, + {4032, 0, 4032}, + {4032, 0, 4032}, + {4032, 0, 4032}, + {4032, 0, 4032}, + {704, 3328, 4032}}, + buffer); + + testing::InSequence s; + testing::MockFunction drain_tracker; + testing::MockFunction done_tracker; + EXPECT_CALL(tracker1, Call()); + EXPECT_CALL(release_callback_tracker, Call(_, _, _)); + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(drain_tracker, Call(3 * LargeChunk + 108 * SmallChunk, 16384)); + EXPECT_CALL(release_callback_tracker2, Call(_, _, _)); + EXPECT_CALL(tracker3, Call()); + EXPECT_CALL(tracker4, Call()); + EXPECT_CALL(drain_tracker, Call(2 * LargeChunk + 107 * SmallChunk, 16384)); + EXPECT_CALL(drain_tracker, Call(LargeChunk + 106 * SmallChunk, 16384)); + EXPECT_CALL(drain_tracker, Call(105 * SmallChunk, 16384)); + EXPECT_CALL(tracker5, Call()); + EXPECT_CALL(drain_tracker, Call(4616, 4616)); + EXPECT_CALL(done_tracker, Call()); + for (auto& expected_first_slice : std::vector>{{16584, 3832, 20416}, + {32904, 3896, 36800}, + {16520, 3896, 36800}, + {20296, 120, 20416}, + {4616, 3512, 8128}}) { + const uint32_t write_size = std::min(LinearizeSize, buffer.length()); + buffer.linearize(write_size); + expectFirstSlice(expected_first_slice, buffer); + drain_tracker.Call(buffer.length(), write_size); + buffer.drain(write_size); + } + done_tracker.Call(); + + expectSlices({}, buffer); +} + TEST_P(OwnedImplTest, ReserveCommit) { // This fragment will later be added to the buffer. It is declared in an enclosing scope to // ensure it is not destructed until after the buffer is. @@ -403,12 +718,12 @@ TEST_P(OwnedImplTest, ReserveCommit) { // Request a reservation that too big to fit in the existing slices. This should result // in the creation of a third slice. - expectSlices({{1, 4055, 4056}}, buffer); + expectSlices({{1, 4031, 4032}}, buffer); buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, NumIovecs); - expectSlices({{1, 4055, 4056}, {0, 4056, 4056}}, buffer); + expectSlices({{1, 4031, 4032}, {0, 4032, 4032}}, buffer); const void* slice2 = iovecs[1].mem_; num_reserved = buffer.reserve(8192, iovecs, NumIovecs); - expectSlices({{1, 4055, 4056}, {0, 4056, 4056}, {0, 4056, 4056}}, buffer); + expectSlices({{1, 4031, 4032}, {0, 4032, 4032}, {0, 4032, 4032}}, buffer); EXPECT_EQ(3, num_reserved); EXPECT_EQ(slice1, iovecs[0].mem_); EXPECT_EQ(slice2, iovecs[1].mem_); @@ -417,11 +732,11 @@ TEST_P(OwnedImplTest, ReserveCommit) { // Append a fragment to the buffer, and then request a small reservation. The buffer // should make a new slice to satisfy the reservation; it cannot safely use any of // the previously seen slices, because they are no longer at the end of the buffer. - expectSlices({{1, 4055, 4056}}, buffer); + expectSlices({{1, 4031, 4032}}, buffer); buffer.addBufferFragment(fragment); EXPECT_EQ(13, buffer.length()); num_reserved = buffer.reserve(1, iovecs, NumIovecs); - expectSlices({{1, 4055, 4056}, {12, 0, 12}, {0, 4056, 4056}}, buffer); + expectSlices({{1, 4031, 4032}, {12, 0, 12}, {0, 4032, 4032}}, buffer); EXPECT_EQ(1, num_reserved); EXPECT_NE(slice1, iovecs[0].mem_); commitReservation(iovecs, num_reserved, buffer); @@ -454,16 +769,16 @@ TEST_P(OwnedImplTest, ReserveCommitReuse) { EXPECT_EQ(2, num_reserved); const void* first_slice = iovecs[0].mem_; iovecs[0].len_ = 1; - expectSlices({{8000, 4248, 12248}, {0, 12248, 12248}}, buffer); + expectSlices({{8000, 4224, 12224}, {0, 12224, 12224}}, buffer); buffer.commit(iovecs, 1); EXPECT_EQ(8001, buffer.length()); EXPECT_EQ(first_slice, iovecs[0].mem_); // The second slice is now released because there's nothing in the second slice. - expectSlices({{8001, 4247, 12248}}, buffer); + expectSlices({{8001, 4223, 12224}}, buffer); // Reserve 16KB again. num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - expectSlices({{8001, 4247, 12248}, {0, 12248, 12248}}, buffer); + expectSlices({{8001, 4223, 12224}, {0, 12224, 12224}}, buffer); EXPECT_EQ(2, num_reserved); EXPECT_EQ(static_cast(first_slice) + 1, static_cast(iovecs[0].mem_)); @@ -492,7 +807,7 @@ TEST_P(OwnedImplTest, ReserveReuse) { EXPECT_EQ(2, num_reserved); EXPECT_EQ(first_slice, iovecs[0].mem_); EXPECT_EQ(second_slice, iovecs[1].mem_); - expectSlices({{0, 12248, 12248}, {0, 8152, 8152}}, buffer); + expectSlices({{0, 12224, 12224}, {0, 8128, 8128}}, buffer); // The remaining tests validate internal manipulations of the new slice // implementation, so they're not valid for the old evbuffer implementation. @@ -506,51 +821,51 @@ TEST_P(OwnedImplTest, ReserveReuse) { const void* third_slice = iovecs[1].mem_; EXPECT_EQ(2, num_reserved); EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(12248, iovecs[0].len_); + EXPECT_EQ(12224, iovecs[0].len_); EXPECT_NE(second_slice, iovecs[1].mem_); EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); - expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer); + expectSlices({{0, 12224, 12224}, {0, 8128, 8128}, {0, 20416, 20416}}, buffer); // Repeating a the reservation request for a smaller block returns the previous entry. num_reserved = buffer.reserve(16384, iovecs, NumIovecs); EXPECT_EQ(2, num_reserved); EXPECT_EQ(first_slice, iovecs[0].mem_); EXPECT_EQ(second_slice, iovecs[1].mem_); - expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer); + expectSlices({{0, 12224, 12224}, {0, 8128, 8128}, {0, 20416, 20416}}, buffer); // Repeat the larger reservation notice that it doesn't match the prior reservation for 30000 // bytes. num_reserved = buffer.reserve(30000, iovecs, NumIovecs); EXPECT_EQ(2, num_reserved); EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(12248, iovecs[0].len_); + EXPECT_EQ(12224, iovecs[0].len_); EXPECT_NE(second_slice, iovecs[1].mem_); EXPECT_NE(third_slice, iovecs[1].mem_); EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); - expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {0, 20440, 20440}}, buffer); + expectSlices({{0, 12224, 12224}, {0, 8128, 8128}, {0, 20416, 20416}, {0, 20416, 20416}}, buffer); // Commit the most recent reservation and verify the representation. buffer.commit(iovecs, num_reserved); - expectSlices({{12248, 0, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {17752, 2688, 20440}}, + expectSlices({{12224, 0, 12224}, {0, 8128, 8128}, {0, 20416, 20416}, {17776, 2640, 20416}}, buffer); // Do another reservation. num_reserved = buffer.reserve(16384, iovecs, NumIovecs); EXPECT_EQ(2, num_reserved); - expectSlices({{12248, 0, 12248}, - {0, 8152, 8152}, - {0, 20440, 20440}, - {17752, 2688, 20440}, - {0, 16344, 16344}}, + expectSlices({{12224, 0, 12224}, + {0, 8128, 8128}, + {0, 20416, 20416}, + {17776, 2640, 20416}, + {0, 16320, 16320}}, buffer); // And commit. buffer.commit(iovecs, num_reserved); - expectSlices({{12248, 0, 12248}, - {0, 8152, 8152}, - {0, 20440, 20440}, - {20440, 0, 20440}, - {13696, 2648, 16344}}, + expectSlices({{12224, 0, 12224}, + {0, 8128, 8128}, + {0, 20416, 20416}, + {20416, 0, 20416}, + {13744, 2576, 16320}}, buffer); } @@ -692,7 +1007,7 @@ TEST_P(OwnedImplTest, ReserveZeroCommit) { ASSERT_EQ(::close(pipe_fds[1]), 0); ASSERT_EQ(previous_length, buf.search(data.data(), rc, previous_length)); EXPECT_EQ("bbbbb", buf.toString().substr(0, 5)); - expectSlices({{5, 0, 4056}, {1953, 2103, 4056}}, buf); + expectSlices({{5, 0, 4032}, {1953, 2079, 4032}}, buf); } TEST_P(OwnedImplTest, ReadReserveAndCommit) { @@ -714,7 +1029,7 @@ TEST_P(OwnedImplTest, ReadReserveAndCommit) { ASSERT_EQ(result.rc_, static_cast(rc)); ASSERT_EQ(::close(pipe_fds[1]), 0); EXPECT_EQ("bbbbbe", buf.toString()); - expectSlices({{6, 4050, 4056}}, buf); + expectSlices({{6, 4026, 4032}}, buf); } TEST(OverflowDetectingUInt64, Arithmetic) { diff --git a/test/common/buffer/utility.h b/test/common/buffer/utility.h index daf67d85c1d2..3dfee93e6acf 100644 --- a/test/common/buffer/utility.h +++ b/test/common/buffer/utility.h @@ -22,10 +22,11 @@ enum class BufferImplementation { class BufferImplementationParamTest : public testing::TestWithParam { protected: BufferImplementationParamTest() { + saved_buffer_old_impl_ = OwnedImpl::newBuffersUseOldImpl(); OwnedImpl::useOldImpl(GetParam() == BufferImplementation::Old); } - ~BufferImplementationParamTest() override = default; + ~BufferImplementationParamTest() { OwnedImpl::useOldImpl(saved_buffer_old_impl_); } /** Verify that a buffer has been constructed using the expected implementation. */ void verifyImplementation(const OwnedImpl& buffer) { @@ -38,6 +39,9 @@ class BufferImplementationParamTest : public testing::TestWithParam void { stream_callbacks_ = &callbacks; })); + EXPECT_CALL(stream_, setFlushTimeout(_)); EXPECT_CALL(stream_, bufferLimit()).WillOnce(Return(initial_buffer_limit_)); } diff --git a/test/common/http/http1/codec_impl_test.cc b/test/common/http/http1/codec_impl_test.cc index 10d3db74a404..ee3256290f82 100644 --- a/test/common/http/http1/codec_impl_test.cc +++ b/test/common/http/http1/codec_impl_test.cc @@ -1553,6 +1553,26 @@ TEST_F(Http1ClientConnectionImplTest, HighwatermarkMultipleResponses) { static_cast(codec_.get()) ->onUnderlyingConnectionBelowWriteBufferLowWatermark(); } + +TEST_F(Http1ServerConnectionImplTest, LargeRequestUrlRejected) { + initialize(); + + std::string exception_reason; + NiceMock decoder; + Http::StreamEncoder* response_encoder = nullptr; + EXPECT_CALL(callbacks_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder = &encoder; + return decoder; + })); + + // Default limit of 60 KiB + std::string long_url = "/" + std::string(60 * 1024, 'q'); + Buffer::OwnedImpl buffer("GET " + long_url + " HTTP/1.1\r\n"); + + EXPECT_THROW_WITH_MESSAGE(codec_->dispatch(buffer), EnvoyException, "headers size exceeds limit"); +} + TEST_F(Http1ServerConnectionImplTest, LargeRequestHeadersRejected) { // Default limit of 60 KiB std::string long_string = "big: " + std::string(60 * 1024, 'q') + "\r\n"; @@ -1635,8 +1655,24 @@ TEST_F(Http1ServerConnectionImplTest, ManyRequestHeadersAccepted) { testRequestHeadersAccepted(createHeaderFragment(150)); } -// Tests that response headers of 80 kB fails. -TEST_F(Http1ClientConnectionImplTest, LargeResponseHeadersRejected) { +// Tests that incomplete response headers of 80 kB header value fails. +TEST_F(Http1ClientConnectionImplTest, ResponseHeadersWithLargeValueRejected) { + initialize(); + + NiceMock response_decoder; + Http::StreamEncoder& request_encoder = codec_->newStream(response_decoder); + TestHeaderMapImpl headers{{":method", "GET"}, {":path", "/"}, {":authority", "host"}}; + request_encoder.encodeHeaders(headers, true); + + Buffer::OwnedImpl buffer("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"); + codec_->dispatch(buffer); + std::string long_header = "big: " + std::string(80 * 1024, 'q'); + buffer = Buffer::OwnedImpl(long_header); + EXPECT_THROW_WITH_MESSAGE(codec_->dispatch(buffer), EnvoyException, "headers size exceeds limit"); +} + +// Tests that incomplete response headers with a 80 kB header field fails. +TEST_F(Http1ClientConnectionImplTest, ResponseHeadersWithLargeFieldRejected) { initialize(); NiceMock response_decoder; @@ -1646,7 +1682,7 @@ TEST_F(Http1ClientConnectionImplTest, LargeResponseHeadersRejected) { Buffer::OwnedImpl buffer("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"); codec_->dispatch(buffer); - std::string long_header = "big: " + std::string(80 * 1024, 'q') + "\r\n"; + std::string long_header = "bigfield" + std::string(80 * 1024, 'q'); buffer = Buffer::OwnedImpl(long_header); EXPECT_THROW_WITH_MESSAGE(codec_->dispatch(buffer), EnvoyException, "headers size exceeds limit"); } diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 44711e889fc9..d4f124b3469e 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -59,17 +59,36 @@ class Http2CodecImplTestFixture { }; Http2CodecImplTestFixture(Http2SettingsTuple client_settings, Http2SettingsTuple server_settings) - : client_settings_(client_settings), server_settings_(server_settings) {} - virtual ~Http2CodecImplTestFixture() = default; + : client_settings_(client_settings), server_settings_(server_settings) { + // Make sure we explicitly test for stream flush timer creation. + EXPECT_CALL(client_connection_.dispatcher_, createTimer_(_)).Times(0); + EXPECT_CALL(server_connection_.dispatcher_, createTimer_(_)).Times(0); + } + virtual ~Http2CodecImplTestFixture() { + client_connection_.dispatcher_.clearDeferredDeleteList(); + if (client_ != nullptr) { + client_.reset(); + EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.streams_active")->value()); + EXPECT_EQ(0, + TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); + } + server_connection_.dispatcher_.clearDeferredDeleteList(); + if (server_ != nullptr) { + server_.reset(); + EXPECT_EQ(0, TestUtility::findGauge(server_stats_store_, "http2.streams_active")->value()); + EXPECT_EQ(0, + TestUtility::findGauge(server_stats_store_, "http2.pending_send_bytes")->value()); + } + } virtual void initialize() { Http2SettingsFromTuple(client_http2settings_, client_settings_); Http2SettingsFromTuple(server_http2settings_, server_settings_); client_ = std::make_unique( - client_connection_, client_callbacks_, stats_store_, client_http2settings_, + client_connection_, client_callbacks_, client_stats_store_, client_http2settings_, max_request_headers_kb_, max_response_headers_count_); server_ = std::make_unique( - server_connection_, server_callbacks_, stats_store_, server_http2settings_, + server_connection_, server_callbacks_, server_stats_store_, server_http2settings_, max_request_headers_kb_, max_request_headers_count_, headers_with_underscores_action_); request_encoder_ = &client_->newStream(response_decoder_); @@ -79,6 +98,7 @@ class Http2CodecImplTestFixture { .WillRepeatedly(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { response_encoder_ = &encoder; encoder.getStream().addCallbacks(server_stream_callbacks_); + encoder.getStream().setFlushTimeout(std::chrono::milliseconds(30000)); return request_decoder_; })); } @@ -136,12 +156,13 @@ class Http2CodecImplTestFixture { const Http2SettingsTuple server_settings_; bool allow_metadata_ = false; bool stream_error_on_invalid_http_messaging_ = false; - Stats::IsolatedStoreImpl stats_store_; + Stats::IsolatedStoreImpl client_stats_store_; Http2Settings client_http2settings_; NiceMock client_connection_; MockConnectionCallbacks client_callbacks_; std::unique_ptr client_; ConnectionWrapper client_wrapper_; + Stats::IsolatedStoreImpl server_stats_store_; Http2Settings server_http2settings_; NiceMock server_connection_; MockServerConnectionCallbacks server_callbacks_; @@ -286,7 +307,7 @@ TEST_P(Http2CodecImplTest, InvalidContinueWithFin) { TestHeaderMapImpl continue_headers{{":status", "100"}}; EXPECT_THROW(response_encoder_->encodeHeaders(continue_headers, true), CodecProtocolException); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); } TEST_P(Http2CodecImplTest, InvalidContinueWithFinAllowed) { @@ -314,7 +335,7 @@ TEST_P(Http2CodecImplTest, InvalidContinueWithFinAllowed) { setupDefaultConnectionMocks(); client_wrapper_.dispatch(Buffer::OwnedImpl(), *client_); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); } TEST_P(Http2CodecImplTest, InvalidRepeatContinue) { @@ -330,7 +351,7 @@ TEST_P(Http2CodecImplTest, InvalidRepeatContinue) { response_encoder_->encode100ContinueHeaders(continue_headers); EXPECT_THROW(response_encoder_->encodeHeaders(continue_headers, true), CodecProtocolException); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); }; TEST_P(Http2CodecImplTest, InvalidRepeatContinueAllowed) { @@ -361,7 +382,7 @@ TEST_P(Http2CodecImplTest, InvalidRepeatContinueAllowed) { setupDefaultConnectionMocks(); client_wrapper_.dispatch(Buffer::OwnedImpl(), *client_); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); }; TEST_P(Http2CodecImplTest, Invalid103) { @@ -382,7 +403,7 @@ TEST_P(Http2CodecImplTest, Invalid103) { EXPECT_THROW_WITH_MESSAGE(response_encoder_->encodeHeaders(early_hint_headers, false), CodecProtocolException, "Unexpected 'trailers' with no end stream."); - EXPECT_EQ(1, stats_store_.counter("http2.too_many_header_frames").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.too_many_header_frames").value()); } TEST_P(Http2CodecImplTest, Invalid204WithContentLength) { @@ -403,7 +424,7 @@ TEST_P(Http2CodecImplTest, Invalid204WithContentLength) { } EXPECT_THROW(response_encoder_->encodeHeaders(response_headers, false), CodecProtocolException); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); }; TEST_P(Http2CodecImplTest, Invalid204WithContentLengthAllowed) { @@ -440,7 +461,7 @@ TEST_P(Http2CodecImplTest, Invalid204WithContentLengthAllowed) { setupDefaultConnectionMocks(); client_wrapper_.dispatch(Buffer::OwnedImpl(), *client_); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, client_stats_store_.counter("http2.rx_messaging_error").value()); }; TEST_P(Http2CodecImplTest, RefusedStreamReset) { @@ -463,7 +484,7 @@ TEST_P(Http2CodecImplTest, InvalidHeadersFrame) { initialize(); EXPECT_THROW(request_encoder_->encodeHeaders(TestHeaderMapImpl{}, true), CodecProtocolException); - EXPECT_EQ(1, stats_store_.counter("http2.rx_messaging_error").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.rx_messaging_error").value()); } TEST_P(Http2CodecImplTest, InvalidHeadersFrameAllowed) { @@ -525,7 +546,7 @@ TEST_P(Http2CodecImplTest, TrailingHeaders) { response_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); } -TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { +TEST_P(Http2CodecImplTest, TrailingHeadersLargeClientBody) { initialize(); // Buffer server data so we can make sure we don't get any window updates. @@ -540,11 +561,11 @@ TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AtLeast(1)); Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); request_encoder_->encodeData(body, false); - EXPECT_CALL(request_decoder_, decodeTrailers_(_)); request_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); // Flush pending data. setupDefaultConnectionMocks(); + EXPECT_CALL(request_decoder_, decodeTrailers_(_)); server_wrapper_.dispatch(Buffer::OwnedImpl(), *server_); TestHeaderMapImpl response_headers{{":status", "200"}}; @@ -697,8 +718,11 @@ TEST_P(Http2CodecImplDeferredResetTest, DeferredResetServer) { response_encoder_->encodeHeaders(response_headers, false); Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); EXPECT_CALL(server_stream_callbacks_, onAboveWriteBufferHighWatermark()).Times(AnyNumber()); + auto flush_timer = new Event::MockTimer(&server_connection_.dispatcher_); + EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(30000), _)); response_encoder_->encodeData(body, true); EXPECT_CALL(server_stream_callbacks_, onResetStream(StreamResetReason::LocalReset, _)); + EXPECT_CALL(*flush_timer, disableTimer()); response_encoder_->getStream().resetStream(StreamResetReason::LocalReset); MockStreamCallbacks client_stream_callbacks; @@ -732,6 +756,8 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // Force the server stream to be read disabled. This will cause it to stop sending window // updates to the client. server_->getStream(1)->readDisable(true); + EXPECT_EQ(1, TestUtility::findGauge(client_stats_store_, "http2.streams_active")->value()); + EXPECT_EQ(1, TestUtility::findGauge(server_stats_store_, "http2.streams_active")->value()); uint32_t initial_stream_window = nghttp2_session_get_stream_effective_local_window_size(client_->session(), 1); @@ -757,6 +783,8 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { Buffer::OwnedImpl more_long_data(std::string(initial_stream_window, 'a')); request_encoder_->encodeData(more_long_data, false); EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window, + TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); EXPECT_EQ(initial_stream_window, server_->getStream(1)->unconsumed_bytes_); // If we go over the limit, the stream callbacks should fire. @@ -764,6 +792,8 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { Buffer::OwnedImpl last_byte("!"); request_encoder_->encodeData(last_byte, false); EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window + 1, + TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // Now create a second stream on the connection. MockStreamDecoder response_decoder2; @@ -807,6 +837,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks3, onBelowWriteBufferLowWatermark()); server_->getStream(1)->readDisable(false); EXPECT_EQ(0, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // The extra 1 byte sent won't trigger another window update, so the final window should be the // initial window minus the last 1 byte flush from the client to server. EXPECT_EQ(initial_stream_window - 1, @@ -899,6 +930,142 @@ TEST_P(Http2CodecImplFlowControlTest, FlowControlPendingRecvData) { request_encoder_->encodeData(data, false); } +// Verify that we create and disable the stream flush timer when trailers follow a stream that +// does not have enough window. +TEST_P(Http2CodecImplFlowControlTest, TrailingHeadersLargeServerBody) { + initialize(); + + InSequence s; + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + request_encoder_->encodeHeaders(request_headers, true); + + ON_CALL(client_connection_, write(_, _)) + .WillByDefault( + Invoke([&](Buffer::Instance& data, bool) -> void { server_wrapper_.buffer_.add(data); })); + TestHeaderMapImpl response_headers{{":status", "200"}}; + EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); + response_encoder_->encodeHeaders(response_headers, false); + EXPECT_CALL(server_stream_callbacks_, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); + auto flush_timer = new Event::MockTimer(&server_connection_.dispatcher_); + EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(30000), _)); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + response_encoder_->encodeData(body, false); + response_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); + + // Send window updates from the client. + setupDefaultConnectionMocks(); + EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); + EXPECT_CALL(response_decoder_, decodeTrailers_(_)); + EXPECT_CALL(*flush_timer, disableTimer()); + server_wrapper_.dispatch(Buffer::OwnedImpl(), *server_); + EXPECT_EQ(0, server_stats_store_.counter("http2.tx_flush_timeout").value()); +} + +// Verify that we create and handle the stream flush timeout when trailers follow a stream that +// does not have enough window. +TEST_P(Http2CodecImplFlowControlTest, TrailingHeadersLargeServerBodyFlushTimeout) { + initialize(); + + InSequence s; + MockStreamCallbacks client_stream_callbacks; + request_encoder_->getStream().addCallbacks(client_stream_callbacks); + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + request_encoder_->encodeHeaders(request_headers, true); + + ON_CALL(client_connection_, write(_, _)) + .WillByDefault( + Invoke([&](Buffer::Instance& data, bool) -> void { server_wrapper_.buffer_.add(data); })); + TestHeaderMapImpl response_headers{{":status", "200"}}; + EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); + response_encoder_->encodeHeaders(response_headers, false); + EXPECT_CALL(server_stream_callbacks_, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); + auto flush_timer = new Event::MockTimer(&server_connection_.dispatcher_); + EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(30000), _)); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + response_encoder_->encodeData(body, false); + response_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); + + // Invoke a stream flush timeout. Make sure we don't get a reset locally for higher layers but + // we do get a reset on the client. + EXPECT_CALL(server_stream_callbacks_, onResetStream(_, _)).Times(0); + EXPECT_CALL(client_stream_callbacks, onResetStream(StreamResetReason::RemoteReset, _)); + flush_timer->invokeCallback(); + EXPECT_EQ(1, server_stats_store_.counter("http2.tx_flush_timeout").value()); +} + +// Verify that we create and handle the stream flush timeout when there is a large body that +// does not have enough window. +TEST_P(Http2CodecImplFlowControlTest, LargeServerBodyFlushTimeout) { + initialize(); + + InSequence s; + MockStreamCallbacks client_stream_callbacks; + request_encoder_->getStream().addCallbacks(client_stream_callbacks); + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + request_encoder_->encodeHeaders(request_headers, true); + + ON_CALL(client_connection_, write(_, _)) + .WillByDefault( + Invoke([&](Buffer::Instance& data, bool) -> void { server_wrapper_.buffer_.add(data); })); + TestHeaderMapImpl response_headers{{":status", "200"}}; + EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); + response_encoder_->encodeHeaders(response_headers, false); + EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); + auto flush_timer = new Event::MockTimer(&server_connection_.dispatcher_); + EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(30000), _)); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + response_encoder_->encodeData(body, true); + + // Invoke a stream flush timeout. Make sure we don't get a reset locally for higher layers but + // we do get a reset on the client. + EXPECT_CALL(server_stream_callbacks_, onResetStream(_, _)).Times(0); + EXPECT_CALL(client_stream_callbacks, onResetStream(StreamResetReason::RemoteReset, _)); + flush_timer->invokeCallback(); + EXPECT_EQ(1, server_stats_store_.counter("http2.tx_flush_timeout").value()); +} + +// Verify that when an incoming protocol error races with a stream flush timeout we correctly +// disable the flush timeout and do not attempt to reset the stream. +TEST_P(Http2CodecImplFlowControlTest, LargeServerBodyFlushTimeoutAfterGoaway) { + initialize(); + + InSequence s; + MockStreamCallbacks client_stream_callbacks; + request_encoder_->getStream().addCallbacks(client_stream_callbacks); + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + request_encoder_->encodeHeaders(request_headers, true); + + ON_CALL(client_connection_, write(_, _)) + .WillByDefault( + Invoke([&](Buffer::Instance& data, bool) -> void { server_wrapper_.buffer_.add(data); })); + TestHeaderMapImpl response_headers{{":status", "200"}}; + EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); + response_encoder_->encodeHeaders(response_headers, false); + EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); + auto flush_timer = new Event::MockTimer(&server_connection_.dispatcher_); + EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(30000), _)); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + response_encoder_->encodeData(body, true); + + // Force a protocol error. + Buffer::OwnedImpl garbage_data("this should cause a protocol error"); + EXPECT_CALL(client_callbacks_, onGoAway()); + EXPECT_CALL(*flush_timer, disableTimer()); + EXPECT_CALL(server_stream_callbacks_, onResetStream(_, _)).Times(0); + EXPECT_THROW(server_wrapper_.dispatch(garbage_data, *server_), CodecProtocolException); + EXPECT_EQ(0, server_stats_store_.counter("http2.tx_flush_timeout").value()); +} + TEST_P(Http2CodecImplTest, WatermarkUnderEndStream) { initialize(); MockStreamCallbacks callbacks; @@ -953,10 +1120,10 @@ TEST_P(Http2CodecImplStreamLimitTest, MaxClientStreams) { Http2SettingsFromTuple(client_http2settings_, ::testing::get<0>(GetParam())); Http2SettingsFromTuple(server_http2settings_, ::testing::get<1>(GetParam())); client_ = std::make_unique( - client_connection_, client_callbacks_, stats_store_, client_http2settings_, + client_connection_, client_callbacks_, client_stats_store_, client_http2settings_, max_request_headers_kb_, max_response_headers_count_); server_ = std::make_unique( - server_connection_, server_callbacks_, stats_store_, server_http2settings_, + server_connection_, server_callbacks_, server_stats_store_, server_http2settings_, max_request_headers_kb_, max_request_headers_count_, headers_with_underscores_action_); for (int i = 0; i < 101; ++i) { @@ -1104,7 +1271,7 @@ TEST_P(Http2CodecImplTest, HeaderNameWithUnderscoreAreDropped) { request_headers.addCopy("bad_header", "something"); EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), _)); request_encoder_->encodeHeaders(request_headers, false); - EXPECT_EQ(1, stats_store_.counter("http2.dropped_headers_with_underscores").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.dropped_headers_with_underscores").value()); } // Tests that request with header names containing underscore are rejected when the option is set to @@ -1118,7 +1285,9 @@ TEST_P(Http2CodecImplTest, HeaderNameWithUnderscoreAreRejectedByDefault) { request_headers.addCopy("bad_header", "something"); EXPECT_CALL(server_stream_callbacks_, onResetStream(_, _)).Times(1); request_encoder_->encodeHeaders(request_headers, false); - EXPECT_EQ(1, stats_store_.counter("http2.requests_rejected_with_underscores_in_headers").value()); + EXPECT_EQ( + 1, + server_stats_store_.counter("http2.requests_rejected_with_underscores_in_headers").value()); } // Tests request headers with name containing underscore are allowed when the option is set to @@ -1134,7 +1303,7 @@ TEST_P(Http2CodecImplTest, HeaderNameWithUnderscoreAllowed) { EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), _)); EXPECT_CALL(server_stream_callbacks_, onResetStream(_, _)).Times(0); request_encoder_->encodeHeaders(request_headers, false); - EXPECT_EQ(0, stats_store_.counter("http2.dropped_headers_with_underscores").value()); + EXPECT_EQ(0, server_stats_store_.counter("http2.dropped_headers_with_underscores").value()); } // This is the HTTP/2 variant of the HTTP/1 regression test for CVE-2019-18801. @@ -1349,7 +1518,7 @@ TEST_P(Http2CodecImplTest, PingFlood) { EXPECT_THROW(client_->sendPendingFrames(), FrameFloodException); EXPECT_EQ(ack_count, Http2Settings::DEFAULT_MAX_OUTBOUND_CONTROL_FRAMES); - EXPECT_EQ(1, stats_store_.counter("http2.outbound_control_flood").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_control_flood").value()); } // Verify that codec allows PING flood when mitigation is disabled @@ -1376,7 +1545,10 @@ TEST_P(Http2CodecImplTest, PingFloodMitigationDisabled) { // Verify that outbound control frame counter decreases when send buffer is drained TEST_P(Http2CodecImplTest, PingFloodCounterReset) { - static const int kMaxOutboundControlFrames = 100; + // Ping frames are 17 bytes each so 237 full frames and a partial frame fit in the current min + // size for buffer slices. Setting the limit to 2x+1 the number that fits in a single slice allows + // the logic below that verifies drain and overflow thresholds. + static const int kMaxOutboundControlFrames = 475; max_outbound_control_frames_ = kMaxOutboundControlFrames; initialize(); @@ -1401,15 +1573,16 @@ TEST_P(Http2CodecImplTest, PingFloodCounterReset) { EXPECT_NO_THROW(client_->sendPendingFrames()); EXPECT_EQ(ack_count, kMaxOutboundControlFrames); - // Drain kMaxOutboundFrames / 2 slices from the send buffer + // Drain floor(kMaxOutboundFrames / 2) slices from the send buffer buffer.drain(buffer.length() / 2); - // Send kMaxOutboundFrames / 2 more pings. + // Send floor(kMaxOutboundFrames / 2) more pings. for (int i = 0; i < kMaxOutboundControlFrames / 2; ++i) { EXPECT_EQ(0, nghttp2_submit_ping(client_->session(), NGHTTP2_FLAG_NONE, nullptr)); } // The number of outbound frames should be half of max so the connection should not be terminated. EXPECT_NO_THROW(client_->sendPendingFrames()); + EXPECT_EQ(ack_count, kMaxOutboundControlFrames + kMaxOutboundControlFrames / 2); // 1 more ping frame should overflow the outbound frame limit. EXPECT_EQ(0, nghttp2_submit_ping(client_->session(), NGHTTP2_FLAG_NONE, nullptr)); @@ -1443,7 +1616,7 @@ TEST_P(Http2CodecImplTest, ResponseHeadersFlood) { EXPECT_THROW(client_->sendPendingFrames(), FrameFloodException); EXPECT_EQ(frame_count, Http2Settings::DEFAULT_MAX_OUTBOUND_FRAMES + 1); - EXPECT_EQ(1, stats_store_.counter("http2.outbound_flood").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value()); } // Verify that codec detects flood of outbound DATA frames @@ -1476,7 +1649,7 @@ TEST_P(Http2CodecImplTest, ResponseDataFlood) { EXPECT_THROW(client_->sendPendingFrames(), FrameFloodException); EXPECT_EQ(frame_count, Http2Settings::DEFAULT_MAX_OUTBOUND_FRAMES + 1); - EXPECT_EQ(1, stats_store_.counter("http2.outbound_flood").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value()); } // Verify that codec allows outbound DATA flood when mitigation is disabled @@ -1580,7 +1753,7 @@ TEST_P(Http2CodecImplTest, PingStacksWithDataFlood) { EXPECT_THROW(client_->sendPendingFrames(), FrameFloodException); EXPECT_EQ(frame_count, Http2Settings::DEFAULT_MAX_OUTBOUND_FRAMES); - EXPECT_EQ(1, stats_store_.counter("http2.outbound_flood").value()); + EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value()); } TEST_P(Http2CodecImplTest, PriorityFlood) { diff --git a/test/common/network/BUILD b/test/common/network/BUILD index ad449c8601a7..17bda9820852 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -25,6 +25,7 @@ envoy_cc_test_library( "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", ], ) @@ -125,11 +126,11 @@ envoy_cc_test( "//test/mocks/buffer:buffer_mocks", "//test/mocks/network:network_mocks", "//test/mocks/ratelimit:ratelimit_mocks", - "//test/mocks/runtime:runtime_mocks", "//test/mocks/server:server_mocks", "//test/mocks/tracing:tracing_mocks", "//test/mocks/upstream:host_mocks", "//test/mocks/upstream:upstream_mocks", + "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", ], ) @@ -171,6 +172,7 @@ envoy_cc_test( "//source/common/stats:stats_lib", "//test/common/network:listener_impl_test_base_lib", "//test/mocks/network:network_mocks", + "//test/mocks/runtime:runtime_mocks", "//test/mocks/server:server_mocks", "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", diff --git a/test/common/network/dns_impl_test.cc b/test/common/network/dns_impl_test.cc index c9bcd1468773..fa1de4e07862 100644 --- a/test/common/network/dns_impl_test.cc +++ b/test/common/network/dns_impl_test.cc @@ -266,6 +266,8 @@ class TestDnsServer : public ListenerCallbacks { queries_.emplace_back(query); } + void onReject() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + void addHosts(const std::string& hostname, const IpList& ip, const RecordType& type) { if (type == RecordType::A) { hosts_a_[hostname] = ip; diff --git a/test/common/network/listener_impl_test.cc b/test/common/network/listener_impl_test.cc index 8c5fe2738efe..ac8b2152a9c6 100644 --- a/test/common/network/listener_impl_test.cc +++ b/test/common/network/listener_impl_test.cc @@ -7,6 +7,7 @@ #include "test/mocks/server/mocks.h" #include "test/test_common/environment.h" #include "test/test_common/network_utility.h" +#include "test/test_common/test_runtime.h" #include "test/test_common/utility.h" #include "gmock/gmock.h" @@ -134,6 +135,72 @@ TEST_P(ListenerImplTest, UseActualDst) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +TEST_P(ListenerImplTest, GlobalConnectionLimitEnforcement) { + // Required to manipulate runtime values when there is no test server. + TestScopedRuntime scoped_runtime; + + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"overload.global_downstream_max_connections", "2"}}); + Network::TcpListenSocket socket(Network::Test::getCanonicalLoopbackAddress(version_), nullptr, + true); + Network::MockListenerCallbacks listener_callbacks; + Network::MockConnectionHandler connection_handler; + Network::ListenerPtr listener = dispatcher_->createListener(socket, listener_callbacks, true); + + std::vector client_connections; + std::vector server_connections; + StreamInfo::StreamInfoImpl stream_info(dispatcher_->timeSource()); + EXPECT_CALL(listener_callbacks, onAccept_(_)) + .WillRepeatedly(Invoke([&](Network::ConnectionSocketPtr& accepted_socket) -> void { + server_connections.emplace_back(dispatcher_->createServerConnection( + std::move(accepted_socket), Network::Test::createRawBufferSocket())); + dispatcher_->exit(); + })); + + auto initiate_connections = [&](const int count) { + for (int i = 0; i < count; ++i) { + client_connections.emplace_back(dispatcher_->createClientConnection( + socket.localAddress(), Network::Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr)); + client_connections.back()->connect(); + } + }; + + initiate_connections(5); + EXPECT_CALL(listener_callbacks, onReject()).Times(3); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + // We expect any server-side connections that get created to populate 'server_connections'. + EXPECT_EQ(2, server_connections.size()); + + // Let's increase the allowed connections and try sending more connections. + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"overload.global_downstream_max_connections", "3"}}); + initiate_connections(5); + EXPECT_CALL(listener_callbacks, onReject()).Times(4); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_EQ(3, server_connections.size()); + + // Clear the limit and verify there's no longer a limit. + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"overload.global_downstream_max_connections", ""}}); + initiate_connections(10); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_EQ(13, server_connections.size()); + + for (const auto& conn : client_connections) { + conn->close(ConnectionCloseType::NoFlush); + } + for (const auto& conn : server_connections) { + conn->close(ConnectionCloseType::NoFlush); + } + + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"overload.global_downstream_max_connections", ""}}); +} + TEST_P(ListenerImplTest, WildcardListenerUseActualDst) { Network::TcpListenSocket socket(Network::Test::getAnyAddress(version_), nullptr, true); Network::MockListenerCallbacks listener_callbacks; diff --git a/test/config/utility.cc b/test/config/utility.cc index 29717682558e..58e1d400c58a 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -330,6 +330,22 @@ void ConfigHelper::applyConfigModifiers() { config_modifiers_.clear(); } +void ConfigHelper::addRuntimeOverride(const std::string& key, const std::string& value) { + if (bootstrap_.mutable_layered_runtime()->layers_size() == 0) { + auto* static_layer = bootstrap_.mutable_layered_runtime()->add_layers(); + static_layer->set_name("static_layer"); + static_layer->mutable_static_layer(); + auto* admin_layer = bootstrap_.mutable_layered_runtime()->add_layers(); + admin_layer->set_name("admin"); + admin_layer->mutable_admin_layer(); + } + auto* static_layer = + bootstrap_.mutable_layered_runtime()->mutable_layers(0)->mutable_static_layer(); + ProtobufWkt::Value string_value; + string_value.set_string_value(value); + (*static_layer->mutable_fields())[std::string(key)] = std::move(string_value); +} + void ConfigHelper::finalize(const std::vector& ports) { RELEASE_ASSERT(!finalized_, ""); diff --git a/test/config/utility.h b/test/config/utility.h index 8359fa0edeb1..ee7ae27c6511 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -169,6 +169,9 @@ class ConfigHelper { // configuration generated in ConfigHelper::finalize. void skipPortUsageValidation() { skip_port_usage_validation_ = true; } + // Add this key value pair to the static runtime. + void addRuntimeOverride(const std::string& key, const std::string& value); + private: // Load the first HCM struct from the first listener into a parsed proto. bool loadHttpConnectionManager( diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index 918422007301..529cc6f6a2b4 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -76,6 +76,7 @@ class ProxyProtocolTest : public testing::TestWithParam connection_callbacks_; Network::Connection* server_connection_; Network::MockConnectionCallbacks server_callbacks_; + BasicResourceLimitImpl open_connections_; std::shared_ptr read_filter_; std::string name_; const Network::FilterChainSharedPtr filter_chain_; @@ -920,9 +922,8 @@ class WildcardProxyProtocolTest : public testing::TestWithParamconnected()); } @@ -252,7 +252,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, AltsIntegrationTestClientWrongHandshaker, // and connection closes. TEST_P(AltsIntegrationTestClientWrongHandshaker, ConnectToWrongHandshakerAddress) { initialize(); - codec_client_ = makeRawHttpConnection(makeAltsConnection()); + codec_client_ = makeRawHttpConnection(makeAltsConnection(), absl::nullopt); EXPECT_FALSE(codec_client_->connected()); } diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc index c3d0960dc06c..a353b1aa508f 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc @@ -276,7 +276,8 @@ TEST_P(SslCertficateIntegrationTest, ServerEcdsaClientRsaOnly) { server_rsa_cert_ = false; server_ecdsa_cert_ = true; initialize(); - auto codec_client = makeRawHttpConnection(makeSslClientConnection(rsaOnlyClientOptions())); + auto codec_client = + makeRawHttpConnection(makeSslClientConnection(rsaOnlyClientOptions()), absl::nullopt); EXPECT_FALSE(codec_client->connected()); const std::string counter_name = listenerStatPrefix("ssl.connection_error"); Stats::CounterSharedPtr counter = test_server_->counter(counter_name); @@ -303,7 +304,8 @@ TEST_P(SslCertficateIntegrationTest, ServerRsaClientEcdsaOnly) { client_ecdsa_cert_ = true; initialize(); EXPECT_FALSE( - makeRawHttpConnection(makeSslClientConnection(ecdsaOnlyClientOptions()))->connected()); + makeRawHttpConnection(makeSslClientConnection(ecdsaOnlyClientOptions()), absl::nullopt) + ->connected()); const std::string counter_name = listenerStatPrefix("ssl.connection_error"); Stats::CounterSharedPtr counter = test_server_->counter(counter_name); test_server_->waitForCounterGe(counter_name, 1); diff --git a/test/integration/BUILD b/test/integration/BUILD index d514efc0e947..6e03419e5a1f 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -10,10 +10,6 @@ load( "envoy_select_hot_restart", "envoy_sh_test", ) -load( - "//source/extensions:all_extensions.bzl", - "envoy_all_extensions", -) envoy_package() @@ -449,6 +445,7 @@ envoy_cc_test_library( "//source/common/buffer:buffer_lib", "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/common:assert_lib", + "//source/common/common:basic_resource_lib", "//source/common/common:minimal_logger_lib", "//source/common/event:dispatcher_lib", "//source/common/grpc:codec_lib", @@ -914,3 +911,17 @@ envoy_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_cc_test( + name = "cx_limit_integration_test", + srcs = ["cx_limit_integration_test.cc"], + deps = [ + ":http_integration_lib", + "//include/envoy/network:filter_interface", + "//include/envoy/registry", + "//source/extensions/filters/network/tcp_proxy:config", + "//test/config:utility_lib", + "//test/test_common:logging_lib", + "//test/test_common:simulated_time_system_lib", + ], +) diff --git a/test/integration/cx_limit_integration_test.cc b/test/integration/cx_limit_integration_test.cc new file mode 100644 index 000000000000..ce96cb47e1b4 --- /dev/null +++ b/test/integration/cx_limit_integration_test.cc @@ -0,0 +1,147 @@ +#include "envoy/network/filter.h" +#include "envoy/registry/registry.h" + +#include "common/network/utility.h" + +#include "test/config/utility.h" +#include "test/integration/integration.h" +#include "test/test_common/logging.h" +#include "test/test_common/simulated_time_system.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +class ConnectionLimitIntegrationTest : public testing::TestWithParam, + public Event::TestUsingSimulatedTime, + public BaseIntegrationTest { +public: + ConnectionLimitIntegrationTest() + : BaseIntegrationTest(GetParam(), ConfigHelper::TCP_PROXY_CONFIG) {} + + void setEmptyListenerLimit() { + config_helper_.addRuntimeOverride("envoy.resource_limits.listener.listener_0.connection_limit", + ""); + } + + void setListenerLimit(const uint32_t num_conns) { + config_helper_.addRuntimeOverride("envoy.resource_limits.listener.listener_0.connection_limit", + std::to_string(num_conns)); + } + + void setGlobalLimit(std::string&& num_conns) { + config_helper_.addRuntimeOverride("overload.global_downstream_max_connections", num_conns); + } + + void initialize() override { BaseIntegrationTest::initialize(); } + + // Assumes a limit of 2 connections. + void doTest(std::function init_func, std::string&& check_stat) { + init_func(); + + std::vector tcp_clients; + std::vector raw_conns; + + tcp_clients.emplace_back(makeTcpConnection(lookupPort("listener_0"))); + raw_conns.emplace_back(); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(raw_conns.back())); + ASSERT_TRUE(tcp_clients.back()->connected()); + + tcp_clients.emplace_back(makeTcpConnection(lookupPort("listener_0"))); + raw_conns.emplace_back(); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(raw_conns.back())); + ASSERT_TRUE(tcp_clients.back()->connected()); + + tcp_clients.emplace_back(makeTcpConnection(lookupPort("listener_0"))); + raw_conns.emplace_back(); + ASSERT_FALSE(fake_upstreams_[0]->waitForRawConnection(raw_conns.back())); + tcp_clients.back()->waitForDisconnect(); + + // Get rid of the client that failed to connect. + tcp_clients.back()->close(); + tcp_clients.pop_back(); + + // Close the first connection that was successful so that we can open a new successful + // connection. + tcp_clients.front()->close(); + ASSERT_TRUE(raw_conns.front()->close()); + ASSERT_TRUE(raw_conns.front()->waitForDisconnect()); + + tcp_clients.emplace_back(makeTcpConnection(lookupPort("listener_0"))); + raw_conns.emplace_back(); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(raw_conns.back())); + ASSERT_TRUE(tcp_clients.back()->connected()); + + const bool isV4 = (version_ == Network::Address::IpVersion::v4); + auto local_address = isV4 ? Network::Utility::getCanonicalIpv4LoopbackAddress() + : Network::Utility::getIpv6LoopbackAddress(); + + const std::string counter_prefix = (isV4 ? "listener.127.0.0.1_0." : "listener.[__1]_0."); + + test_server_->waitForCounterEq(counter_prefix + check_stat, 1); + + for (auto& tcp_client : tcp_clients) { + tcp_client->close(); + } + + tcp_clients.clear(); + raw_conns.clear(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, ConnectionLimitIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +TEST_P(ConnectionLimitIntegrationTest, TestListenerLimit) { + std::function init_func = [this]() { + setListenerLimit(2); + initialize(); + }; + + doTest(init_func, "downstream_cx_overflow"); +} + +TEST_P(ConnectionLimitIntegrationTest, TestEmptyGlobalCxRuntimeLimit) { + const std::string log_line = "no configured limit to the number of allowed active connections."; + EXPECT_LOG_CONTAINS("warn", log_line, { initialize(); }); +} + +TEST_P(ConnectionLimitIntegrationTest, TestEmptyListenerRuntimeLimit) { + const std::string log_line = + "Listener connection limit runtime key " + "envoy.resource_limits.listener.listener_0.connection_limit is empty. There are currently " + "no limitations on the number of accepted connections for listener listener_0."; + EXPECT_LOG_CONTAINS("warn", log_line, { + setEmptyListenerLimit(); + initialize(); + }); +} + +TEST_P(ConnectionLimitIntegrationTest, TestGlobalLimit) { + std::function init_func = [this]() { + // Includes twice the number of connections expected because the tracking is performed via a + // static variable and the fake upstream has a listener. This causes upstream connections to the + // fake upstream to also be tracked as part of the global downstream connection tracking. + setGlobalLimit("4"); + initialize(); + }; + + doTest(init_func, "downstream_global_cx_overflow"); +} + +TEST_P(ConnectionLimitIntegrationTest, TestBothLimits) { + std::function init_func = [this]() { + // Setting the listener limit to a much higher value and making sure the right stat gets + // incremented when both limits are set. + setGlobalLimit("4"); + setListenerLimit(100); + initialize(); + }; + + doTest(init_func, "downstream_global_cx_overflow"); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index c85f1c6028c9..aac2111e22a5 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -18,6 +18,7 @@ #include "common/buffer/buffer_impl.h" #include "common/buffer/zero_copy_input_stream_impl.h" +#include "common/common/basic_resource_impl.h" #include "common/common/callback_impl.h" #include "common/common/linked_object.h" #include "common/common/lock_guard.h" @@ -633,10 +634,17 @@ class FakeUpstream : Logger::Loggable, envoy::api::v2::core::TrafficDirection direction() const override { return envoy::api::v2::core::TrafficDirection::UNSPECIFIED; } + ResourceLimit& openConnections() override { return connection_resource_; } + + void setMaxConnections(const uint32_t num_connections) { + connection_resource_.setMax(num_connections); + } + void clearMaxConnections() { connection_resource_.resetMax(); } FakeUpstream& parent_; const std::string name_; Network::NopConnectionBalancerImpl connection_balancer_; + BasicResourceLimitImpl connection_resource_; }; void threadRoutine(); diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 80b03bd88ad2..830f043374e0 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -66,6 +66,28 @@ TEST_P(Http2IntegrationTest, RetryAttemptCount) { testRetryAttemptCountHeader(); TEST_P(Http2IntegrationTest, LargeRequestTrailersRejected) { testLargeRequestTrailers(66, 60); } +// Verify downstream codec stream flush timeout. +TEST_P(Http2IntegrationTest, CodecStreamIdleTimeout) { + config_helper_.setBufferLimits(1024, 1024); + config_helper_.addConfigModifier( + [&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) + -> void { + hcm.mutable_stream_idle_timeout()->set_seconds(0); + constexpr uint64_t IdleTimeoutMs = 400; + hcm.mutable_stream_idle_timeout()->set_nanos(IdleTimeoutMs * 1000 * 1000); + }); + initialize(); + Http::Http2Settings http2_options; + http2_options.initial_stream_window_size_ = 65535; + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(70000, true); + test_server_->waitForCounterEq("http2.tx_flush_timeout", 1); + response->waitForReset(); +} + static std::string response_metadata_filter = R"EOF( name: response-metadata-filter typed_config: diff --git a/test/integration/http2_upstream_integration_test.cc b/test/integration/http2_upstream_integration_test.cc index 8b32a232f97d..15f00afddcc1 100644 --- a/test/integration/http2_upstream_integration_test.cc +++ b/test/integration/http2_upstream_integration_test.cc @@ -237,6 +237,9 @@ void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_byt EXPECT_EQ("503", responses[i]->headers().Status()->value().getStringView()); } } + + EXPECT_EQ(0, test_server_->gauge("http2.streams_active")->value()); + EXPECT_EQ(0, test_server_->gauge("http2.pending_send_bytes")->value()); } TEST_P(Http2UpstreamIntegrationTest, ManySimultaneousRequest) { diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 310542194c8a..681a363e9a73 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -189,11 +189,16 @@ IntegrationCodecClientPtr HttpIntegrationTest::makeHttpConnection(uint32_t port) } IntegrationCodecClientPtr -HttpIntegrationTest::makeRawHttpConnection(Network::ClientConnectionPtr&& conn) { +HttpIntegrationTest::makeRawHttpConnection(Network::ClientConnectionPtr&& conn, + absl::optional http2_options) { std::shared_ptr cluster{new NiceMock()}; cluster->max_response_headers_count_ = 200; - cluster->http2_settings_.allow_connect_ = true; - cluster->http2_settings_.allow_metadata_ = true; + if (http2_options.has_value()) { + cluster->http2_settings_ = http2_options.value(); + } else { + cluster->http2_settings_.allow_connect_ = true; + cluster->http2_settings_.allow_metadata_ = true; + } Upstream::HostDescriptionConstSharedPtr host_description{Upstream::makeTestHostDescription( cluster, fmt::format("tcp://{}:80", Network::Test::getLoopbackAddressUrlString(version_)))}; return std::make_unique(*dispatcher_, std::move(conn), host_description, @@ -202,7 +207,7 @@ HttpIntegrationTest::makeRawHttpConnection(Network::ClientConnectionPtr&& conn) IntegrationCodecClientPtr HttpIntegrationTest::makeHttpConnection(Network::ClientConnectionPtr&& conn) { - auto codec = makeRawHttpConnection(std::move(conn)); + auto codec = makeRawHttpConnection(std::move(conn), absl::nullopt); EXPECT_TRUE(codec->connected()); return codec; } @@ -884,6 +889,44 @@ void HttpIntegrationTest::testTwoRequests(bool network_backup) { EXPECT_EQ(1024U, response->body().size()); } +void HttpIntegrationTest::testLargeRequestUrl(uint32_t url_size, uint32_t max_headers_size) { + // `size` parameter dictates the size of each header that will be added to the request and `count` + // parameter is the number of headers to be added. The actual request byte size will exceed `size` + // due to the keys and other headers. The actual request header count will exceed `count` by four + // due to default headers. + + config_helper_.addConfigModifier( + [&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) + -> void { hcm.mutable_max_request_headers_kb()->set_value(max_headers_size); }); + max_request_headers_kb_ = max_headers_size; + + Http::TestHeaderMapImpl big_headers{{":method", "GET"}, + {":path", "/" + std::string(url_size * 1024, 'a')}, + {":scheme", "http"}, + {":authority", "host"}}; + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + if (url_size >= max_headers_size) { + // header size includes keys too, so expect rejection when equal + auto encoder_decoder = codec_client_->startRequest(big_headers); + auto response = std::move(encoder_decoder.second); + + if (downstream_protocol_ == Http::CodecClient::Type::HTTP1) { + codec_client_->waitForDisconnect(); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("431", response->headers().Status()->value().getStringView()); + } else { + response->waitForReset(); + codec_client_->close(); + } + } else { + auto response = sendRequestAndWaitForResponse(big_headers, 0, default_response_headers_, 0); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + } +} + void HttpIntegrationTest::testLargeRequestHeaders(uint32_t size, uint32_t count, uint32_t max_size, uint32_t max_count) { // `size` parameter dictates the size of each header that will be added to the request and `count` diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 8bbae101304b..9839fd55ae9b 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -108,7 +108,9 @@ class HttpIntegrationTest : public BaseIntegrationTest { IntegrationCodecClientPtr makeHttpConnection(uint32_t port); // Makes a http connection object without checking its connected state. - IntegrationCodecClientPtr makeRawHttpConnection(Network::ClientConnectionPtr&& conn); + virtual IntegrationCodecClientPtr + makeRawHttpConnection(Network::ClientConnectionPtr&& conn, + absl::optional http2_options); // Makes a http connection object with asserting a connected state. IntegrationCodecClientPtr makeHttpConnection(Network::ClientConnectionPtr&& conn); @@ -190,6 +192,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testTwoRequests(bool force_network_backup = false); void testLargeHeaders(Http::TestHeaderMapImpl request_headers, Http::TestHeaderMapImpl request_trailers, uint32_t size, uint32_t max_size); + void testLargeRequestUrl(uint32_t url_size, uint32_t max_headers_size); void testLargeRequestHeaders(uint32_t size, uint32_t count, uint32_t max_size = 60, uint32_t max_count = 100); void testLargeRequestTrailers(uint32_t size, uint32_t max_size = 60); diff --git a/test/integration/integration_admin_test.cc b/test/integration/integration_admin_test.cc index 7e39838bc1a6..548181de30e2 100644 --- a/test/integration/integration_admin_test.cc +++ b/test/integration/integration_admin_test.cc @@ -278,14 +278,17 @@ TEST_P(IntegrationAdminTest, Admin) { " 1 http2.inbound_window_update_frames_flood\n" " 1 http2.outbound_control_flood\n" " 1 http2.outbound_flood\n" + " 1 http2.pending_send_bytes\n" " 1 http2.requests_rejected_with_underscores_in_headers\n" " 1 http2.rx_messaging_error\n" " 1 http2.rx_reset\n" + " 1 http2.streams_active\n" " 1 http2.too_many_header_frames\n" " 1 http2.trailers\n" + " 1 http2.tx_flush_timeout\n" " 1 http2.tx_reset\n" "\n" - "total: 14\n", + "total: 17\n", response->body()); break; case Http::CodecClient::Type::HTTP3: diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 2302e34a3c4d..d94fe99d5057 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -1006,6 +1006,16 @@ name: decode-headers-only EXPECT_EQ(0, upstream_request_->body().length()); } +TEST_P(DownstreamProtocolIntegrationTest, LargeRequestUrlRejected) { + // Send one 95 kB URL with limit 60 kB headers. + testLargeRequestUrl(95, 60); +} + +TEST_P(DownstreamProtocolIntegrationTest, LargeRequestUrlAccepted) { + // Send one 95 kB URL with limit 96 kB headers. + testLargeRequestUrl(95, 96); +} + TEST_P(DownstreamProtocolIntegrationTest, LargeRequestHeadersRejected) { // Send one 95 kB header with limit 60 kB and 100 headers. testLargeRequestHeaders(95, 1, 60, 100); diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 38d8391444e9..e86a28a79e61 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -222,7 +222,7 @@ TEST_P(SdsDynamicDownstreamIntegrationTest, WrongSecretFirst) { }; initialize(); - codec_client_ = makeRawHttpConnection(makeSslClientConnection()); + codec_client_ = makeRawHttpConnection(makeSslClientConnection(), absl::nullopt); // the connection state is not connected. EXPECT_FALSE(codec_client_->connected()); codec_client_->connection()->close(Network::ConnectionCloseType::NoFlush); diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index 54b81c4fd912..16531b3c7977 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -19,6 +19,7 @@ class MockStream : public Stream { MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD2(setWriteBufferWatermarks, void(uint32_t, uint32_t)); MOCK_METHOD0(bufferLimit, uint32_t()); + MOCK_METHOD1(setFlushTimeout, void(std::chrono::milliseconds timeout)); std::list callbacks_{}; diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 2387e363db9d..2605fb057784 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -128,7 +128,8 @@ class MockListenerCallbacks : public ListenerCallbacks { void onAccept(ConnectionSocketPtr&& socket) override { onAccept_(socket); } - MOCK_METHOD1(onAccept_, void(ConnectionSocketPtr& socket)); + MOCK_METHOD(void, onAccept_, (ConnectionSocketPtr & socket)); + MOCK_METHOD(void, onReject, ()); }; class MockUdpListenerCallbacks : public UdpListenerCallbacks { @@ -314,6 +315,7 @@ class MockListenerConfig : public ListenerConfig { MOCK_CONST_METHOD0(name, const std::string&()); MOCK_METHOD0(udpListenerFactory, const Network::ActiveUdpListenerFactory*()); MOCK_METHOD0(connectionBalancer, ConnectionBalancer&()); + MOCK_METHOD0(openConnections, ResourceLimit&()); envoy::api::v2::core::TrafficDirection direction() const override { return envoy::api::v2::core::TrafficDirection::UNSPECIFIED; diff --git a/test/mocks/runtime/BUILD b/test/mocks/runtime/BUILD index 7aed549bc387..f406cbc85b95 100644 --- a/test/mocks/runtime/BUILD +++ b/test/mocks/runtime/BUILD @@ -16,6 +16,7 @@ envoy_cc_mock( deps = [ "//include/envoy/runtime:runtime_interface", "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:empty_string", "//test/mocks:common_lib", ], ) diff --git a/test/mocks/runtime/mocks.cc b/test/mocks/runtime/mocks.cc index 2556987922c1..e34429e74d98 100644 --- a/test/mocks/runtime/mocks.cc +++ b/test/mocks/runtime/mocks.cc @@ -1,11 +1,14 @@ #include "mocks.h" +#include "common/common/empty_string.h" + #include "gmock/gmock.h" #include "gtest/gtest.h" using testing::_; using testing::Return; using testing::ReturnArg; +using testing::ReturnRef; namespace Envoy { namespace Runtime { @@ -18,6 +21,7 @@ MockSnapshot::MockSnapshot() { ON_CALL(*this, getInteger(_, _)).WillByDefault(ReturnArg<1>()); ON_CALL(*this, getDouble(_, _)).WillByDefault(ReturnArg<1>()); ON_CALL(*this, getBoolean(_, _)).WillByDefault(ReturnArg<1>()); + ON_CALL(*this, get(_)).WillByDefault(ReturnRef(EMPTY_STRING)); } MockSnapshot::~MockSnapshot() = default; diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 4ce3f337b79c..f7244ac04d1b 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -86,6 +86,12 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable udp_listener_factory_; Network::ConnectionBalancerPtr connection_balancer_; + BasicResourceLimitImpl open_connections_; }; using TestListenerPtr = std::unique_ptr; @@ -176,6 +183,98 @@ TEST_F(ConnectionHandlerTest, RemoveListenerDuringRebalance) { #endif } +TEST_F(ConnectionHandlerTest, ListenerConnectionLimitEnforced) { + auto listener1 = new NiceMock(); + Network::ListenerCallbacks* listener_callbacks1; + TestListener* test_listener1 = addListener(1, false, false, "test_listener1"); + EXPECT_CALL(dispatcher_, createListener_(_, _, _)) + .WillOnce( + Invoke([&](Network::Socket&, Network::ListenerCallbacks& cb, bool) -> Network::Listener* { + listener_callbacks1 = &cb; + return listener1; + })); + Network::Address::InstanceConstSharedPtr normal_address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + EXPECT_CALL(test_listener1->socket_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); + // Only allow a single connection on this listener. + test_listener1->setMaxConnections(1); + handler_->addListener(*test_listener1); + + auto listener2 = new NiceMock(); + Network::ListenerCallbacks* listener_callbacks2; + TestListener* test_listener2 = addListener(2, false, false, "test_listener2"); + EXPECT_CALL(dispatcher_, createListener_(_, _, _)) + .WillOnce( + Invoke([&](Network::Socket&, Network::ListenerCallbacks& cb, bool) -> Network::Listener* { + listener_callbacks2 = &cb; + return listener2; + })); + Network::Address::InstanceConstSharedPtr alt_address( + new Network::Address::Ipv4Instance("127.0.0.2", 20002)); + EXPECT_CALL(test_listener2->socket_, localAddress()).WillRepeatedly(ReturnRef(alt_address)); + // Do not allow any connections on this listener. + test_listener2->setMaxConnections(0); + handler_->addListener(*test_listener2); + + EXPECT_CALL(manager_, findFilterChain(_)).WillRepeatedly(Return(filter_chain_.get())); + EXPECT_CALL(factory_, createNetworkFilterChain(_, _)).WillRepeatedly(Return(true)); + Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(); + EXPECT_CALL(*test_filter, destroy_()); + EXPECT_CALL(factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + manager.addAcceptFilter(Network::ListenerFilterPtr{test_filter}); + return true; + })); + EXPECT_CALL(*test_filter, onAccept(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterCallbacks&) -> Network::FilterStatus { + return Network::FilterStatus::Continue; + })); + + // For listener 2, verify its connection limit is independent of listener 1. + + // We expect that listener 2 accepts the connection, so there will be a call to + // createServerConnection and active cx should increase, while cx overflow remains the same. + listener_callbacks2->onAccept( + Network::ConnectionSocketPtr{new NiceMock()}); + EXPECT_EQ(0, handler_->numConnections()); + EXPECT_EQ(0, TestUtility::findCounter(stats_store_, "downstream_cx_total")->value()); + EXPECT_EQ(0, TestUtility::findGauge(stats_store_, "downstream_cx_active")->value()); + EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "downstream_cx_overflow")->value()); + + // For listener 1, verify connections are limited after one goes active. + + // First connection attempt should result in an active connection being created. + auto conn1 = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(conn1)); + listener_callbacks1->onAccept( + Network::ConnectionSocketPtr{new NiceMock()}); + EXPECT_EQ(1, handler_->numConnections()); + // Note that these stats are not the per-worker stats, but the per-listener stats. + EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "downstream_cx_total")->value()); + EXPECT_EQ(1, TestUtility::findGauge(stats_store_, "downstream_cx_active")->value()); + EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "downstream_cx_overflow")->value()); + + // Don't expect server connection to be created, should be instantly closed and increment + // overflow stat. + listener_callbacks1->onAccept( + Network::ConnectionSocketPtr{new NiceMock()}); + EXPECT_EQ(1, handler_->numConnections()); + EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "downstream_cx_total")->value()); + EXPECT_EQ(1, TestUtility::findGauge(stats_store_, "downstream_cx_active")->value()); + EXPECT_EQ(2, TestUtility::findCounter(stats_store_, "downstream_cx_overflow")->value()); + + // Check behavior again for good measure. + listener_callbacks1->onAccept( + Network::ConnectionSocketPtr{new NiceMock()}); + EXPECT_EQ(1, handler_->numConnections()); + EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "downstream_cx_total")->value()); + EXPECT_EQ(1, TestUtility::findGauge(stats_store_, "downstream_cx_active")->value()); + EXPECT_EQ(3, TestUtility::findCounter(stats_store_, "downstream_cx_overflow")->value()); + + EXPECT_CALL(*listener1, onDestroy()); + EXPECT_CALL(*listener2, onDestroy()); +} + TEST_F(ConnectionHandlerTest, RemoveListener) { InSequence s; diff --git a/test/test_common/logging.cc b/test/test_common/logging.cc index 636bb56c4bad..d2cb5fa69cfe 100644 --- a/test/test_common/logging.cc +++ b/test/test_common/logging.cc @@ -2,6 +2,8 @@ #include "common/common/assert.h" +#include "absl/synchronization/mutex.h" + namespace Envoy { LogLevelSetter::LogLevelSetter(spdlog::level::level_enum log_level) { @@ -27,6 +29,8 @@ LogRecordingSink::~LogRecordingSink() = default; void LogRecordingSink::log(absl::string_view msg) { previous_delegate()->log(msg); + + absl::MutexLock ml(&mtx_); messages_.push_back(std::string(msg)); } diff --git a/test/test_common/logging.h b/test/test_common/logging.h index 62fe9f6d7d53..49d5b8ed5a0e 100644 --- a/test/test_common/logging.h +++ b/test/test_common/logging.h @@ -8,6 +8,7 @@ #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" +#include "absl/synchronization/mutex.h" #include "spdlog/spdlog.h" namespace Envoy { @@ -58,7 +59,8 @@ class LogRecordingSink : public Logger::SinkDelegate { const std::vector& messages() const { return messages_; } private: - std::vector messages_; + absl::Mutex mtx_; + std::vector messages_ ABSL_GUARDED_BY(mtx_); }; using StringPair = std::pair;