From 5844e96c9982812ce1bc131782de1a93fee41348 Mon Sep 17 00:00:00 2001 From: Christian Norbert Menges Date: Sun, 22 Oct 2023 14:24:15 +0200 Subject: [PATCH 1/7] out_http: Add option to reuse connections Signed-off-by: Christian Norbert Menges --- lib/fluent/plugin/out_http.rb | 59 ++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 55887065a7..58fdb0db3f 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -60,6 +60,8 @@ class RetryableResponse < StandardError; end config_param :read_timeout, :integer, default: nil desc 'The TLS timeout in seconds' config_param :ssl_timeout, :integer, default: nil + desc 'Try to reuse connections' + config_param :reuse_connections, :bool, default: false desc 'The CA certificate path for TLS' config_param :tls_ca_cert_path, :string, default: nil @@ -106,11 +108,25 @@ def initialize @uri = nil @proxy_uri = nil @formatter = nil + + @Cache = [] + @CacheIdMutex = Mutex.new + @CacheEntry = Struct.new(:uri, :conn) + end + + def close + super + + # Close all open connections + @Cache.each {|entry| entry.conn.finish if entry.conn&.started? } end def configure(conf) super + @Cache = Array.new(actual_flush_thread_count, @CacheEntry.new("", nil)) if @reuse_connections + @CacheId = 0 + if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] @@ -302,16 +318,45 @@ def create_request(chunk, uri) req end + def make_request_cached(uri, req) + id = Thread.current.thread_variable_get(plugin_id) + if id.nil? + @CacheIdMutex.synchronize { + id = @CacheId + @CacheId += 1 + } + Thread.current.thread_variable_set(plugin_id, id) + end + uri_str = uri.to_s + if @Cache[id].uri != uri_str + @Cache[id].conn.finish if @Cache[id].conn&.started? + http = if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) + else + Net::HTTP.start(uri.host, uri.port, @http_opt) + end + @Cache[id] = @CacheEntry.new(uri_str, http) + end + @Cache[id].conn.request(req) + end + + def make_request(uri, req) + if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| + http.request(req) + } + else + Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| + http.request(req) + } + end + end def send_request(uri, req) - res = if @proxy_uri - Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| - http.request(req) - } + res = if @reuse_connections + make_request_cached(uri, req) else - Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| - http.request(req) - } + make_request(uri, req) end if res.is_a?(Net::HTTPSuccess) From 7a5662aaaff014cf07ca6dbfbc5a42708257f011 Mon Sep 17 00:00:00 2001 From: Christian Norbert Menges Date: Mon, 6 Nov 2023 11:17:53 +0100 Subject: [PATCH 2/7] out_http: Add test for connection recreation Signed-off-by: Christian Norbert Menges --- test/plugin/test_out_http.rb | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/test/plugin/test_out_http.rb b/test/plugin/test_out_http.rb index 04c80137b3..a0b3969a8d 100644 --- a/test/plugin/test_out_http.rb +++ b/test/plugin/test_out_http.rb @@ -518,4 +518,40 @@ def test_write_with_https assert_not_empty result.headers end end + + sub_test_case 'connection_reuse' do + def server_port + 19883 + end + + def test_connection_recreation + d = create_driver(%[ + endpoint http://127.0.0.1:#{server_port}/test + reuse_connections true + ]) + + d.run(default_tag: 'test.http', shutdown: false) do + d.feed(test_events[0]) + end + + data = @@result.data + + # Restart server to simulate connection loss + @@http_server_thread.kill + @@http_server_thread.join + @@http_server_thread = Thread.new do + run_http_server + end + + d.run(default_tag: 'test.http') do + d.feed(test_events[1]) + end + + result = @@result + assert_equal 'POST', result.method + assert_equal 'application/x-ndjson', result.content_type + assert_equal test_events, data.concat(result.data) + assert_not_empty result.headers + end + end end From fa5b18318d41d5a8188c1b3c7087d6c52fb646a0 Mon Sep 17 00:00:00 2001 From: Christian Norbert Menges Date: Thu, 14 Dec 2023 13:06:44 +0100 Subject: [PATCH 3/7] out_http: Use snake case Signed-off-by: Christian Norbert Menges --- lib/fluent/plugin/out_http.rb | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 58fdb0db3f..977ab8d525 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -109,23 +109,23 @@ def initialize @proxy_uri = nil @formatter = nil - @Cache = [] - @CacheIdMutex = Mutex.new - @CacheEntry = Struct.new(:uri, :conn) + @cache = [] + @cache_id_mutex = Mutex.new + @cache_entry = Struct.new(:uri, :conn) end def close super # Close all open connections - @Cache.each {|entry| entry.conn.finish if entry.conn&.started? } + @cache.each {|entry| entry.conn.finish if entry.conn&.started? } end def configure(conf) super - @Cache = Array.new(actual_flush_thread_count, @CacheEntry.new("", nil)) if @reuse_connections - @CacheId = 0 + @cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections + @cache_id = 0 if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @@ -321,23 +321,23 @@ def create_request(chunk, uri) def make_request_cached(uri, req) id = Thread.current.thread_variable_get(plugin_id) if id.nil? - @CacheIdMutex.synchronize { - id = @CacheId - @CacheId += 1 + @cache_id_mutex.synchronize { + id = @cache_id + @cache_id += 1 } Thread.current.thread_variable_set(plugin_id, id) end uri_str = uri.to_s - if @Cache[id].uri != uri_str - @Cache[id].conn.finish if @Cache[id].conn&.started? + if @cache[id].uri != uri_str + @cache[id].conn.finish if @cache[id].conn&.started? http = if @proxy_uri Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) else Net::HTTP.start(uri.host, uri.port, @http_opt) end - @Cache[id] = @CacheEntry.new(uri_str, http) + @cache[id] = @cache_entry.new(uri_str, http) end - @Cache[id].conn.request(req) + @cache[id].conn.request(req) end def make_request(uri, req) From af18531fa2f11d84f8383e8a75af79e3bd92fd7c Mon Sep 17 00:00:00 2001 From: Christian Menges Date: Sun, 4 Feb 2024 11:50:15 +0100 Subject: [PATCH 4/7] Apply suggestions from code review Signed-off-by: Daijiro Fukuda Co-authored-by: Daijiro Fukuda --- lib/fluent/plugin/out_http.rb | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 977ab8d525..78ba4cf88d 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -340,15 +340,11 @@ def make_request_cached(uri, req) @cache[id].conn.request(req) end - def make_request(uri, req) + def make_request(uri, req, &block) if @proxy_uri - Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| - http.request(req) - } + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block) else - Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| - http.request(req) - } + Net::HTTP.start(uri.host, uri.port, @http_opt, &block) end end @@ -356,7 +352,7 @@ def send_request(uri, req) res = if @reuse_connections make_request_cached(uri, req) else - make_request(uri, req) + make_request(uri, req) { |http| http.request(req) } end if res.is_a?(Net::HTTPSuccess) From d57402eb5a5e9e24cb08d1c3a5dcc50f4f8b8dcd Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 30 Apr 2024 12:31:45 +0900 Subject: [PATCH 5/7] Move struct definition outside of constructor Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_http.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 78ba4cf88d..09ab90858d 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -37,6 +37,8 @@ class HTTPOutput < Output class RetryableResponse < StandardError; end + CacheEntry = Struct.new(:uri, :conn) + helpers :formatter desc 'The endpoint for HTTP request, e.g. http://example.com/api' @@ -111,7 +113,6 @@ def initialize @cache = [] @cache_id_mutex = Mutex.new - @cache_entry = Struct.new(:uri, :conn) end def close @@ -124,7 +125,7 @@ def close def configure(conf) super - @cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections + @cache = Array.new(actual_flush_thread_count, CacheEntry.new("", nil)) if @reuse_connections @cache_id = 0 if @retryable_response_codes.nil? @@ -335,7 +336,7 @@ def make_request_cached(uri, req) else Net::HTTP.start(uri.host, uri.port, @http_opt) end - @cache[id] = @cache_entry.new(uri_str, http) + @cache[id] = CacheEntry.new(uri_str, http) end @cache[id].conn.request(req) end From 6763f3dc38d7260c94398d77b2ab637457d1740d Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 30 Apr 2024 12:37:04 +0900 Subject: [PATCH 6/7] Make some parameter names more specific Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_http.rb | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 09ab90858d..6fb972c3d3 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -37,7 +37,7 @@ class HTTPOutput < Output class RetryableResponse < StandardError; end - CacheEntry = Struct.new(:uri, :conn) + ConnectionCache = Struct.new(:uri, :conn) helpers :formatter @@ -111,22 +111,21 @@ def initialize @proxy_uri = nil @formatter = nil - @cache = [] - @cache_id_mutex = Mutex.new + @connection_cache = [] + @connection_cache_id_mutex = Mutex.new end def close super - # Close all open connections - @cache.each {|entry| entry.conn.finish if entry.conn&.started? } + @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? } end def configure(conf) super - @cache = Array.new(actual_flush_thread_count, CacheEntry.new("", nil)) if @reuse_connections - @cache_id = 0 + @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections + @connection_cache_id = 0 if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @@ -322,23 +321,23 @@ def create_request(chunk, uri) def make_request_cached(uri, req) id = Thread.current.thread_variable_get(plugin_id) if id.nil? - @cache_id_mutex.synchronize { - id = @cache_id - @cache_id += 1 + @connection_cache_id_mutex.synchronize { + id = @connection_cache_id + @connection_cache_id += 1 } Thread.current.thread_variable_set(plugin_id, id) end uri_str = uri.to_s - if @cache[id].uri != uri_str - @cache[id].conn.finish if @cache[id].conn&.started? + if @connection_cache[id].uri != uri_str + @connection_cache[id].conn.finish if @connection_cache[id].conn&.started? http = if @proxy_uri Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) else Net::HTTP.start(uri.host, uri.port, @http_opt) end - @cache[id] = CacheEntry.new(uri_str, http) + @connection_cache[id] = ConnectionCache.new(uri_str, http) end - @cache[id].conn.request(req) + @connection_cache[id].conn.request(req) end def make_request(uri, req, &block) From 9ce635ce524a30622267b5686d9a53ab00269d1c Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 30 Apr 2024 13:48:36 +0900 Subject: [PATCH 7/7] Use more specific thread key name for cache-id and following: * Use `Thread#[]` style * because it is the common for Fluentd code * Rename `connection_cache_id` to `connection_cache_next_id` * for clarity. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_http.rb | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 6fb972c3d3..4d45b9706c 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -104,6 +104,18 @@ class RetryableResponse < StandardError; end config_param :aws_role_arn, :string, default: nil end + def connection_cache_id_thread_key + "#{plugin_id}_connection_cache_id" + end + + def connection_cache_id_for_thread + Thread.current[connection_cache_id_thread_key] + end + + def connection_cache_id_for_thread=(id) + Thread.current[connection_cache_id_thread_key] = id + end + def initialize super @@ -113,6 +125,7 @@ def initialize @connection_cache = [] @connection_cache_id_mutex = Mutex.new + @connection_cache_next_id = 0 end def close @@ -125,7 +138,6 @@ def configure(conf) super @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections - @connection_cache_id = 0 if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @@ -319,13 +331,13 @@ def create_request(chunk, uri) end def make_request_cached(uri, req) - id = Thread.current.thread_variable_get(plugin_id) + id = self.connection_cache_id_for_thread if id.nil? @connection_cache_id_mutex.synchronize { - id = @connection_cache_id - @connection_cache_id += 1 + id = @connection_cache_next_id + @connection_cache_next_id += 1 } - Thread.current.thread_variable_set(plugin_id, id) + self.connection_cache_id_for_thread = id end uri_str = uri.to_s if @connection_cache[id].uri != uri_str