From f7ef5fdc90c380876eb4634251a785dfafd0b552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io> Date: Fri, 3 Apr 2020 16:05:38 +0300 Subject: [PATCH 01/11] Support list patterns --- lib/logstash/inputs/redis.rb | 159 ++++++++++++++++++++++++++++++++--- spec/inputs/redis_spec.rb | 102 +++++++++++++++++++++- 2 files changed, 247 insertions(+), 14 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 9722d8b..432874c 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -3,6 +3,8 @@ require "logstash/inputs/base" require "logstash/inputs/threadable" require 'redis' +require 'concurrent' +require 'concurrent/executors' # This input will read events from a Redis instance; it supports both Redis channels and lists. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and @@ -49,9 +51,11 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable config :key, :validate => :string, :required => true # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the - # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. - # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. - config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true + # key. If `data_type` is `pattern_list`, then we will spawn a number of worker + # threads that will LPOP from keys matching that pattern. If `data_type` is + # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`, + # then we will PSUBSCRIBE to the key. + config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 @@ -59,6 +63,18 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # Redefined Redis commands to be passed to the Redis client. config :command_map, :validate => :hash, :default => {} + # Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + config :pattern_list_threads, :validate => :number, :default => 20 + + # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. + # After the list is empty or this number of items have been processed, the thread will exit and a + # new one will be started if there are non-empty lists matching the pattern without a consumer. + config :pattern_list_max_items, :validate => :number, :default => 1000 + + # Time to sleep in main loop after checking if more threads can/need to be spawned. + # Applies to `data_type` is `pattern_list` + config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2 + public # public API # use to store a proc that can provide a Redis instance or mock @@ -77,6 +93,15 @@ def new_redis_instance @redis_builder.call end + def init_threadpool + @threadpool ||= Concurrent::ThreadPoolExecutor.new( + min_threads: @pattern_list_threads, + max_threads: @pattern_list_threads, + max_queue: 2 * @pattern_list_threads + ) + @current_workers ||= Concurrent::Set.new + end + def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" @@ -86,6 +111,9 @@ def register if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @stop_method = method(:list_stop) + elsif @data_type == 'pattern_list' + @run_method = method(:pattern_list_runner) + @stop_method = method(:pattern_list_stop) elsif @data_type == 'channel' @run_method = method(:channel_runner) @stop_method = method(:subscribe_stop) @@ -94,8 +122,6 @@ def register @stop_method = method(:subscribe_stop) end - @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) - @identity = "#{@redis_url} #{@data_type}:#{@key}" @logger.info("Registering Redis", :identity => @identity) end # def register @@ -119,7 +145,7 @@ def batched? # private def is_list_type? - @data_type == 'list' + @data_type == 'list' || @data_type == 'pattern_list' end # private @@ -193,15 +219,21 @@ def queue_event(msg, output_queue, channel=nil) end # private - def list_stop + def reset_redis return if @redis.nil? || !@redis.connected? @redis.quit rescue nil @redis = nil end + # private + def list_stop + reset_redis + end + # private def list_runner(output_queue) + @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) while !stop? begin @redis ||= connect @@ -217,16 +249,113 @@ def list_runner(output_queue) end end - def list_batch_listener(redis, output_queue) + #private + def reset_threadpool + return if @threadpool.nil? + @threadpool.shutdown + @threadpool.wait_for_termination + @threadpool = nil + end + + # private + def pattern_list_stop + reset_redis + reset_threadpool + end + + # private + def pattern_list_process_item(redis, output_queue, key) + if stop? + @logger.debug("Breaking from thread #{key} as it was requested to stop") + return false + end + value = redis.lpop(key) + return false if value.nil? + queue_event(value, output_queue) + true + end + + # private + def pattern_list_single_processor(redis, output_queue, key) + (0...@pattern_list_max_items).each do + break unless pattern_list_process_item(redis, output_queue, key) + end + end + + # private + def pattern_list_batch_processor(redis, output_queue, key) + items_left = @pattern_list_max_items + while items_left > 0 + limit = [items_left, @batch_count].min + processed = process_batch(redis, output_queue, key, limit, 0) + if processed.zero? || processed < limit + return + end + items_left -= processed + end + end + + # private + def pattern_list_worker_consume(output_queue, key) begin - results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) - results.each do |item| - queue_event(item, output_queue) + redis ||= connect + @pattern_list_processor.call(redis, output_queue, key) + rescue ::Redis::BaseError => e + @logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e) + sleep 1 + return + ensure + redis.quit rescue nil + end + end + + # private + def threadpool_capacity? + @threadpool.remaining_capacity > 0 + end + + # private + def pattern_list_launch_worker(output_queue, key) + @current_workers.add(key) + @threadpool.post do + begin + pattern_list_worker_consume(output_queue, key) + ensure + @current_workers.delete(key) end + end + end - if results.size.zero? - sleep BATCH_EMPTY_SLEEP + # private + def pattern_list_ensure_workers(output_queue) + return unless threadpool_capacity? + redis_runner do + @redis.keys(@key).shuffle.each do |key| + next if @current_workers.include?(key) + pattern_list_launch_worker(output_queue, key) + break unless threadpool_capacity? end + end + end + + # private + def pattern_list_runner(output_queue) + @pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor) + while !stop? + init_threadpool if @threadpool.nil? + pattern_list_ensure_workers(output_queue) + sleep(@pattern_list_threadpool_sleep) + end + end + + def process_batch(redis, output_queue, key, batch_size, sleep_time) + begin + results = redis.evalsha(@redis_script_sha, [key], [batch_size-1]) + results.each do |item| + queue_event(item, output_queue) + end + sleep sleep_time if results.size.zero? && sleep_time > 0 + results.size # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to @@ -255,6 +384,10 @@ def list_batch_listener(redis, output_queue) end end + def list_batch_listener(redis, output_queue) + process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP) + end + def list_single_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 6675a8e..45dabfe 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -63,6 +63,48 @@ def process(conf, event_count) populate(key, event_count) process(conf, event_count) end + + it "should read events from a list pattern" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}.*" + data_type => "pattern_list" + batch_count => 1 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + process(conf, total_event_count) + end + + it "should read events from a list pattern using batch_count (default 125)" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}.*" + data_type => "pattern_list" + batch_count => 125 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + process(conf, total_event_count) + end end # unit tests --------------------- @@ -264,6 +306,64 @@ def process(conf, event_count) end end + context 'runtime for pattern_list data_type' do + let(:data_type) { 'pattern_list' } + let(:key) { 'foo.*' } + before do + subject.register + subject.init_threadpool + end + + context 'close when redis is unset' do + let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } + + it 'does not attempt to quit' do + allow(redis).to receive(:nil?).and_return(true) + quit_calls.each do |call| + expect(redis).not_to receive(call) + end + expect {subject.do_stop}.not_to raise_error + end + end + + it 'calling the run method, adds events to the queue' do + expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar']) + expect(redis).to receive(:lpop).at_least(:once).and_return('l1') + + allow(redis).to receive(:connected?).and_return(connected.last) + allow(redis).to receive(:quit) + + tt = Thread.new do + end_by = Time.now + 3 + while accumulator.size < 1 and Time.now <= end_by + sleep 0.1 + end + subject.do_stop + end + + subject.run(accumulator) + + tt.join + + expect(accumulator.size).to be > 0 + end + + it 'multiple close calls, calls to redis once' do + subject.use_redis(redis) + allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar']) + allow(redis).to receive(:lpop).and_return('l1') + expect(redis).to receive(:connected?).and_return(connected.last) + quit_calls.each do |call| + expect(redis).to receive(call).at_most(:once) + end + + subject.do_stop + connected.push(false) #can't use let block here so push to array + expect {subject.do_stop}.not_to raise_error + subject.do_stop + end + end + context 'for the subscribe data_types' do def run_it_thread(inst) Thread.new(inst) do |subj| @@ -396,7 +496,7 @@ def close_thread(inst, rt) describe LogStash::Inputs::Redis do context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| + ["list", "channel", "pattern_channel", "pattern_list"].each do |data_type| context data_type do it_behaves_like "an interruptible input plugin" do let(:config) { {'key' => 'foo', 'data_type' => data_type } } From 12e270ca0f43297e10722964949727da969f321a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io> Date: Tue, 7 Apr 2020 14:44:04 +0300 Subject: [PATCH 02/11] Update docs --- docs/index.asciidoc | 48 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index cc8340e..7f80126 100755 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -84,12 +84,14 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht ===== `data_type` * This is a required setting. - * Value can be any of: `list`, `channel`, `pattern_channel` + * Value can be any of: `list`, `pattern_list`, `channel`, `pattern_channel` * There is no default value for this setting. Specify either list or channel. If `data_type` is `list`, then we will BLPOP the -key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. -If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. +key. If `data_type` is `pattern_list`, then we will spawn a number of worker +threads that will LPOP from keys matching that pattern. If `data_type` is +`channel`, then we will SUBSCRIBE to the key. If `data_type` is +`pattern_channel`, then we will PSUBSCRIBE to the key. [id="plugins-{type}s-{plugin}-db"] ===== `db` @@ -125,6 +127,7 @@ The unix socket path of your Redis server. The name of a Redis list or channel. + [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -133,6 +136,37 @@ The name of a Redis list or channel. Password to authenticate with. There is no authentication by default. + +[id="plugins-{type}s-{plugin}-pattern_list_max_items"] +===== `pattern_list_max_items` + + * Value type is <<number,number>> + * Default value is `1000` + +Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. +After the list is empty or this number of items have been processed, the thread will exit and a +new one will be started if there are non-empty lists matching the pattern without a consumer. + + +[id="plugins-{type}s-{plugin}-pattern_list_threadpool_sleep"] +===== `pattern_list_threadpool_sleep` + + * Value type is <<number,number>> + * Default value is `0.2` + +Time to sleep in main loop after checking if more threads can/need to be spawned. +Applies to `data_type` is `pattern_list` + + +[id="plugins-{type}s-{plugin}-pattern_list_threads"] +===== `pattern_list_threads` + + * Value type is <<number,number>> + * Default value is `20` + +Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + + [id="plugins-{type}s-{plugin}-port"] ===== `port` @@ -141,8 +175,9 @@ Password to authenticate with. There is no authentication by default. The port to connect on. + [id="plugins-{type}s-{plugin}-ssl"] -===== `ssl` +===== `ssl` * Value type is <<boolean,boolean>> * Default value is `false` @@ -157,7 +192,6 @@ Enable SSL support. * Default value is `1` - [id="plugins-{type}s-{plugin}-timeout"] ===== `timeout` @@ -166,7 +200,9 @@ Enable SSL support. Initial connection timeout in seconds. + + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] -:default_codec!: \ No newline at end of file +:default_codec!: From 1d220f84a5d5c0df3f7f8bb001b94e65c39b0c73 Mon Sep 17 00:00:00 2001 From: Kaise Cheng <kaise.cheng@elastic.co> Date: Tue, 8 Dec 2020 15:35:36 +0100 Subject: [PATCH 03/11] remove Logstash::Pipeline --- batch_perf/perf_batch.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch_perf/perf_batch.rb b/batch_perf/perf_batch.rb index 3863d85..ac81e85 100644 --- a/batch_perf/perf_batch.rb +++ b/batch_perf/perf_batch.rb @@ -3,7 +3,7 @@ require "securerandom" require "logstash/event" -require "logstash/pipeline" +require "logstash/java_pipeline" require_relative "../lib/logstash/inputs/redis" class BenchOptions @@ -35,7 +35,7 @@ def cfg_batch(d) bench_options = BenchOptions.new def input(cfg, slow, &block) - pipeline = LogStash::Pipeline.new(cfg) + pipeline = LogStash::JavaPipeline.new(cfg) queue = Queue.new pipeline.instance_eval do From 05e9b5324d4822d8e186c167db079e28a357b4f8 Mon Sep 17 00:00:00 2001 From: Kaise Cheng <kaise.cheng@elastic.co> Date: Tue, 8 Dec 2020 16:02:16 +0100 Subject: [PATCH 04/11] bump version --- CHANGELOG.md | 3 +++ logstash-input-redis.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65fc096..63e70e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.6.1 + - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84) + ## 3.5.1 - [DOC] Reordered config option to alpha order [#79](https://github.com/logstash-plugins/logstash-input-redis/issues/79) diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index e68f21a..96cce30 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.5.1' + s.version = '3.6.1' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From bb4885f6516b3f47cba85ba75151cef6576f24ea Mon Sep 17 00:00:00 2001 From: Kaise Cheng <kaise.cheng@elastic.co> Date: Mon, 21 Dec 2020 15:32:30 +0100 Subject: [PATCH 05/11] bump version --- CHANGELOG.md | 2 +- logstash-input-redis.gemspec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63e70e3..1677336 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 3.6.1 +## 3.6.0 - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84) ## 3.5.1 diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 96cce30..9ef3f4c 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.6.1' + s.version = '3.6.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From f09a4cfeba0dd161a7557410d007c4cd29cc1664 Mon Sep 17 00:00:00 2001 From: Kaise Cheng <kaise.cheng@elastic.co> Date: Tue, 22 Dec 2020 11:04:20 +0100 Subject: [PATCH 06/11] fix broken test --- lib/logstash/inputs/redis.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 9722d8b..7f0a19b 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -3,6 +3,7 @@ require "logstash/inputs/base" require "logstash/inputs/threadable" require 'redis' +require "stud/interval" # This input will read events from a Redis instance; it supports both Redis channels and lists. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and From 45a67022030f9827858803d394178a91ef35e296 Mon Sep 17 00:00:00 2001 From: Joao Duarte <jsvduarte@gmail.com> Date: Mon, 4 Jan 2021 12:49:19 +0000 Subject: [PATCH 07/11] [skip ci] update travis ci badge from .org to .com --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5fd2cf6..10fa417 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Logstash Plugin Travis Build -[](https://travis-ci.org/logstash-plugins/logstash-input-redis) +[](https://travis-ci.com/logstash-plugins/logstash-input-redis) This is a plugin for [Logstash](https://github.com/elastic/logstash). From dba90d8761fad79cb05b32cc8f8fd0a6d99146b9 Mon Sep 17 00:00:00 2001 From: Karol Bucek <kares@users.noreply.github.com> Date: Wed, 7 Apr 2021 13:36:37 +0200 Subject: [PATCH 08/11] Fix: resolve crash when commands_map is set (#86) * Fix: do not rely on redis.client anymore * Test: redo specs with 'real' Redis mocking * Refactor: cleanup plugin (internals) Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com> --- CHANGELOG.md | 3 + lib/logstash/inputs/redis.rb | 41 ++---- logstash-input-redis.gemspec | 4 +- spec/inputs/redis_spec.rb | 271 ++++++++++++++++------------------- 4 files changed, 136 insertions(+), 183 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1677336..81972ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.6.1 + - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86) + ## 3.6.0 - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 7f0a19b..c92c170 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -61,28 +61,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable config :command_map, :validate => :hash, :default => {} public - # public API - # use to store a proc that can provide a Redis instance or mock - def add_external_redis_builder(builder) #callable - @redis_builder = builder - self - end - - # use to apply an instance directly and bypass the builder - def use_redis(instance) - @redis = instance - self - end - - def new_redis_instance - @redis_builder.call - end def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" - @redis_builder ||= method(:internal_redis_builder) - # just switch on data_type once if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @@ -147,8 +129,7 @@ def redis_params return connectionParams.merge(baseParams) end - # private - def internal_redis_builder + def new_redis_instance ::Redis.new(redis_params) end @@ -157,14 +138,12 @@ def connect redis = new_redis_instance # register any renamed Redis commands - if @command_map.any? - client_command_map = redis.client.command_map - @command_map.each do |name, renamed| - client_command_map[name.to_sym] = renamed.to_sym - end + @command_map.each do |name, renamed| + redis._client.command_map[name.to_sym] = renamed.to_sym end load_batch_script(redis) if batched? && is_list_type? + redis end # def connect @@ -208,7 +187,9 @@ def list_runner(output_queue) @redis ||= connect @list_method.call(@redis, output_queue) rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) + info = { message: e.message, exception: e.class } + info[:backtrace] = e.backtrace if @logger.debug? + @logger.warn("Redis connection problem", info) # Reset the redis variable to trigger reconnect @redis = nil # this sleep does not need to be stoppable as its @@ -270,14 +251,14 @@ def subscribe_stop return if @redis.nil? || !@redis.connected? # if its a SubscribedClient then: # it does not have a disconnect method (yet) - if @redis.client.is_a?(::Redis::SubscribedClient) + if @redis.subscribed? if @data_type == 'pattern_channel' - @redis.client.punsubscribe + @redis.punsubscribe else - @redis.client.unsubscribe + @redis.unsubscribe end else - @redis.client.disconnect + @redis.disconnect! end @redis = nil end diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 9ef3f4c..030338f 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.6.0' + s.version = '3.6.1' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -23,7 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'redis', '~> 4' + s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 6675a8e..45a6318 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -1,12 +1,12 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/devutils/rspec/shared_examples" -require "redis" -require "stud/try" require 'logstash/inputs/redis' require 'securerandom' def populate(key, event_count) require "logstash/event" + require "redis" + require "stud/try" redis = Redis.new(:host => "localhost") event_count.times do |value| event = LogStash::Event.new("sequence" => value) @@ -65,153 +65,138 @@ def process(conf, event_count) end end -# unit tests --------------------- - describe LogStash::Inputs::Redis do - let(:redis) { double('redis') } - let(:builder) { ->{ redis } } - let(:connection) { double('redis_connection') } - let(:connected) { [true] } + let(:queue) { Queue.new } + let(:data_type) { 'list' } let(:batch_count) { 1 } - let(:cfg) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } + let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } let(:quit_calls) { [:quit] } - let(:accumulator) { [] } - let(:command_map) { {} } subject do - LogStash::Plugin.lookup("input", "redis") - .new(cfg).add_external_redis_builder(builder) + LogStash::Inputs::Redis.new(config) end context 'construction' do it 'registers the input' do - expect {subject.register}.not_to raise_error + expect { subject.register }.not_to raise_error end end context 'renamed redis commands' do - let(:cfg) { - {'key' => 'foo', - 'data_type' => data_type, - 'command_map' => - { - 'blpop' => 'testblpop', - 'evalsha' => 'testevalsha', - 'lrange' => 'testlrange', - 'ltrim' => 'testltrim', - 'script' => 'testscript', - 'subscribe' => 'testsubscribe', - 'psubscribe' => 'testpsubscribe', - }, - 'batch_count' => 2 + let(:config) do + { + 'key' => 'foo', + 'data_type' => data_type, + 'command_map' => { + 'blpop' => 'testblpop', + 'evalsha' => 'testevalsha', + 'lrange' => 'testlrange', + 'ltrim' => 'testltrim', + 'script' => 'testscript', + 'subscribe' => 'testsubscribe', + 'psubscribe' => 'testpsubscribe', + }, + 'batch_count' => 2 } - } - - before do - subject.register - allow(redis).to receive(:connected?) - allow(redis).to receive(:client).and_return(connection) - allow(connection).to receive(:command_map).and_return(command_map) end it 'sets the renamed commands in the command map' do - allow(redis).to receive(:script) - allow(redis).to receive(:evalsha).and_return([]) - - tt = Thread.new do - sleep 0.01 - subject.do_stop + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' end - subject.run(accumulator) - tt.join + subject.register + redis = subject.send :connect + + command_map = redis._client.command_map - expect(command_map[:blpop]).to eq cfg['command_map']['blpop'].to_sym - expect(command_map[:evalsha]).to eq cfg['command_map']['evalsha'].to_sym - expect(command_map[:lrange]).to eq cfg['command_map']['lrange'].to_sym - expect(command_map[:ltrim]).to eq cfg['command_map']['ltrim'].to_sym - expect(command_map[:script]).to eq cfg['command_map']['script'].to_sym - expect(command_map[:subscribe]).to eq cfg['command_map']['subscribe'].to_sym - expect(command_map[:psubscribe]).to eq cfg['command_map']['psubscribe'].to_sym + expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym + expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym + expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym + expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym + expect(command_map[:script]).to eq config['command_map']['script'].to_sym + expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym + expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym end it 'loads the batch script with the renamed command' do - capture = nil - allow(redis).to receive(:script) { |load, lua_script| capture = lua_script } - allow(redis).to receive(:evalsha).and_return([]) + expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' - tt = Thread.new do - sleep 0.01 - subject.do_stop + script = command[2] + expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)" + expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" end - subject.run(accumulator) - tt.join - - expect(capture).to include "redis.call('#{cfg['command_map']['lrange']}', KEYS[1], 0, batchsize)" - expect(capture).to include "redis.call('#{cfg['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" + subject.register + subject.send :connect end end - context 'runtime for list data_type' do + before do subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) + end + + after do + subject.stop end context 'close when redis is unset' do - let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } it 'does not attempt to quit' do - allow(redis).to receive(:nil?).and_return(true) - quit_calls.each do |call| - expect(redis).not_to receive(call) - end - expect {subject.do_stop}.not_to raise_error + expect_any_instance_of( Redis::Client ).to_not receive(:call) + expect_any_instance_of( Redis::Client ).to_not receive(:disconnect) + + expect { subject.do_stop }.not_to raise_error end end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:blpop).at_least(:once).and_return(['foo', 'l1']) - - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:quit) + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + expect(command[1]).to eql ['foo', 0] + expect(command[2]).to eql 1 + end.and_return ['foo', "{\"foo1\":\"bar\""], nil tt = Thread.new do - sleep 0.01 + sleep 0.25 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join - expect(accumulator.size).to be > 0 + expect( queue.size ).to be > 0 end context "when the batch size is greater than 1" do let(:batch_count) { 10 } - let(:rates) { [] } - - before do - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) - end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:evalsha).at_least(:once).and_return(['a', 'b']) + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha + end.and_return ['{"a": 1}', '{"b":'], [] tt = Thread.new do - sleep 0.01 + sleep 0.25 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join - expect(accumulator.size).to be > 0 + + expect( queue.size ).to be > 0 end end @@ -220,20 +205,18 @@ def process(conf, event_count) let(:rates) { [] } it 'will throttle the loop' do - allow(redis).to receive(:evalsha) do + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha rates.unshift Time.now.to_f - [] - end - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) + end.and_return [] tt = Thread.new do - sleep 1 + sleep 0.25 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join @@ -242,7 +225,7 @@ def process(conf, event_count) inters << x - y end - expect(accumulator.size).to eq(0) + expect( queue.size ).to eq(0) inters.each do |delta| expect(delta).to be_within(0.01).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) end @@ -250,16 +233,17 @@ def process(conf, event_count) end it 'multiple close calls, calls to redis once' do - subject.use_redis(redis) - allow(redis).to receive(:blpop).and_return(['foo', 'l1']) - expect(redis).to receive(:connected?).and_return(connected.last) + # subject.use_redis(redis) + # allow(redis).to receive(:blpop).and_return(['foo', 'l1']) + # expect(redis).to receive(:connected?).and_return(connected.last) + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false + # allow_any_instance_of( Redis::Client ).to receive(:disconnect) quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -267,7 +251,7 @@ def process(conf, event_count) context 'for the subscribe data_types' do def run_it_thread(inst) Thread.new(inst) do |subj| - subj.run(accumulator) + subj.run(queue) end end @@ -283,35 +267,21 @@ def publish_thread(new_redis, prefix) def close_thread(inst, rt) Thread.new(inst, rt) do |subj, runner| # block for the messages - e1 = accumulator.pop - e2 = accumulator.pop + e1 = queue.pop + e2 = queue.pop # put em back for the tests - accumulator.push(e1) - accumulator.push(e2) + queue.push(e1) + queue.push(e2) runner.raise(LogStash::ShutdownSignal) subj.close end end - let(:accumulator) { Queue.new } - - let(:instance) do - inst = described_class.new(cfg) - inst.register - inst - end - before(:example, type: :mocked) do subject.register - subject.use_redis(redis) - allow(connection).to receive(:is_a?).and_return(true) - allow(redis).to receive(:client).and_return(connection) - expect(redis).to receive(:connected?).and_return(connected.last) - allow(connection).to receive(:unsubscribe) - allow(connection).to receive(:punsubscribe) - + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end end @@ -322,8 +292,7 @@ def close_thread(inst, rt) context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -331,23 +300,23 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -361,8 +330,7 @@ def close_thread(inst, rt) context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -370,23 +338,24 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end + it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -394,15 +363,15 @@ def close_thread(inst, rt) end end - describe LogStash::Inputs::Redis do - context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| - context data_type do - it_behaves_like "an interruptible input plugin" do - let(:config) { {'key' => 'foo', 'data_type' => data_type } } - end + context "when using data type" do + + ["list", "channel", "pattern_channel"].each do |data_type| + context data_type do + it_behaves_like "an interruptible input plugin", :redis => true do + let(:config) { { 'key' => 'foo', 'data_type' => data_type } } end end end + end end From 71d3ee5d75bd42458685e0ffd0520727d9ccaa21 Mon Sep 17 00:00:00 2001 From: Karol Bucek <kares@users.noreply.github.com> Date: Mon, 3 May 2021 19:08:11 +0200 Subject: [PATCH 09/11] Fix: better (Redis) exception handling (#89) Redis doesn't wrap all of the errors e.g. (retriable) IOError might be raised from an IO write (#88). With the refactored error handling we make sure these are logged and retried instead of causing pipeline crashes. Also not all specs from the suite were run on the CI, as some are tagged with redis. These require a real Redis server, no reason not to run them against CI as well. --- .ci/docker-compose.override.yml | 5 ++ .ci/run.sh | 10 +++ .travis.yml | 13 +++- CHANGELOG.md | 4 ++ lib/logstash/inputs/redis.rb | 104 +++++++++++++++++++------------- logstash-input-redis.gemspec | 2 +- spec/inputs/redis_spec.rb | 89 ++++++++++++++++++++++++--- 7 files changed, 174 insertions(+), 53 deletions(-) create mode 100644 .ci/docker-compose.override.yml create mode 100755 .ci/run.sh diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml new file mode 100644 index 0000000..7c3fd31 --- /dev/null +++ b/.ci/docker-compose.override.yml @@ -0,0 +1,5 @@ +version: '3' + +services: + logstash: + network_mode: host diff --git a/.ci/run.sh b/.ci/run.sh new file mode 100755 index 0000000..daf3303 --- /dev/null +++ b/.ci/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# This is intended to be run inside the docker container as the command of the docker-compose. + +env + +set -ex + +jruby -rbundler/setup -S rspec -fd + +jruby -rbundler/setup -S rspec -fd --tag redis diff --git a/.travis.yml b/.travis.yml index a50fc73..028f060 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,13 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file +- logstash-plugins/.ci:travis/travis.yml@1.x + +addons: + apt: + sources: + - sourceline: 'ppa:chris-lea/redis-server' + packages: + - redis-server + +before_install: + - sudo service redis-server stop + - sudo service redis-server start --bind 0.0.0.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 81972ab..d6479c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.7.0 + - Fix: better (Redis) exception handling [#89](https://github.com/logstash-plugins/logstash-input-redis/pull/89) + - Test: start running integration specs on CI + ## 3.6.1 - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index c92c170..e8a2667 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -107,26 +107,22 @@ def is_list_type? # private def redis_params + params = { + :timeout => @timeout, + :db => @db, + :password => @password.nil? ? nil : @password.value, + :ssl => @ssl + } + if @path.nil? - connectionParams = { - :host => @host, - :port => @port - } + params[:host] = @host + params[:port] = @port else @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") - connectionParams = { - :path => @path - } + params[:path] = @path end - baseParams = { - :timeout => @timeout, - :db => @db, - :password => @password.nil? ? nil : @password.value, - :ssl => @ssl - } - - return connectionParams.merge(baseParams) + params end def new_redis_instance @@ -174,9 +170,12 @@ def queue_event(msg, output_queue, channel=nil) # private def list_stop - return if @redis.nil? || !@redis.connected? + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? - @redis.quit rescue nil + redis.quit rescue nil # does client.disconnect internally + # check if input retried while executing + list_stop unless redis.equal? @redis @redis = nil end @@ -186,15 +185,9 @@ def list_runner(output_queue) begin @redis ||= connect @list_method.call(@redis, output_queue) - rescue ::Redis::BaseError => e - info = { message: e.message, exception: e.class } - info[:backtrace] = e.backtrace if @logger.debug? - @logger.warn("Redis connection problem", info) - # Reset the redis variable to trigger reconnect - @redis = nil - # this sleep does not need to be stoppable as its - # in a while !stop? loop - sleep 1 + rescue => e + log_error(e) + retry if reset_for_error_retry(e) end end end @@ -248,18 +241,19 @@ def list_single_listener(redis, output_queue) # private def subscribe_stop - return if @redis.nil? || !@redis.connected? - # if its a SubscribedClient then: - # it does not have a disconnect method (yet) - if @redis.subscribed? + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? + + if redis.subscribed? if @data_type == 'pattern_channel' - @redis.punsubscribe + redis.punsubscribe else - @redis.unsubscribe + redis.unsubscribe end - else - @redis.disconnect! end + redis.close rescue nil # does client.disconnect + # check if input retried while executing + subscribe_stop unless redis.equal? @redis @redis = nil end @@ -268,15 +262,43 @@ def redis_runner begin @redis ||= connect yield - rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) - # Reset the redis variable to trigger reconnect - @redis = nil - Stud.stoppable_sleep(1) { stop? } - retry if !stop? + rescue => e + log_error(e) + retry if reset_for_error_retry(e) + end + end + + def log_error(e) + info = { message: e.message, exception: e.class } + info[:backtrace] = e.backtrace if @logger.debug? + + case e + when ::Redis::TimeoutError + # expected for channels in case no data is available + @logger.debug("Redis timeout, retrying", info) + when ::Redis::BaseConnectionError, ::Redis::ProtocolError + @logger.warn("Redis connection error", info) + when ::Redis::BaseError + @logger.error("Redis error", info) + when ::LogStash::ShutdownSignal + @logger.debug("Received shutdown signal") + else + info[:backtrace] ||= e.backtrace + @logger.error("Unexpected error", info) end end + # @return [true] if operation is fine to retry + def reset_for_error_retry(e) + return if e.is_a?(::LogStash::ShutdownSignal) + + # Reset the redis variable to trigger reconnect + @redis = nil + + Stud.stoppable_sleep(1) { stop? } + !stop? # retry if not stop-ing + end + # private def channel_runner(output_queue) redis_runner do @@ -324,6 +346,4 @@ def pattern_channel_listener(output_queue) end end -# end - end end end # Redis Inputs LogStash diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 030338f..520cacd 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.6.1' + s.version = '3.7.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 45a6318..d070f13 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -17,11 +17,15 @@ def populate(key, event_count) end def process(conf, event_count) - events = input(conf) do |pipeline, queue| - event_count.times.map{queue.pop} + events = input(conf) do |_, queue| + sleep 0.1 until queue.size >= event_count + queue.size.times.map { queue.pop } end - - expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) + # due multiple workers we get events out-of-order in the output + events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') } + expect(events[0].get('sequence')).to eq(0) + expect(events[100].get('sequence')).to eq(100) + expect(events[1000].get('sequence')).to eq(1000) end # integration tests --------------------- @@ -31,7 +35,6 @@ def process(conf, event_count) it "should read events from a list" do key = SecureRandom.hex event_count = 1000 + rand(50) - # event_count = 100 conf = <<-CONFIG input { redis { @@ -163,7 +166,6 @@ def process(conf, event_count) allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| expect(command[0]).to eql :blpop expect(command[1]).to eql ['foo', 0] - expect(command[2]).to eql 1 end.and_return ['foo', "{\"foo1\":\"bar\""], nil tt = Thread.new do @@ -178,6 +180,69 @@ def process(conf, event_count) expect( queue.size ).to be > 0 end + it 'keeps running when a connection error occurs' do + raised = false + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + unless raised + raised = true + raise Redis::CannotConnectError.new('test') + end + ['foo', "{\"after\":\"raise\"}"] + end + + expect(subject.logger).to receive(:warn).with('Redis connection error', + hash_including(:message=>"test", :exception=>Redis::CannotConnectError) + ).and_call_original + + tt = Thread.new do + sleep 2.0 # allow for retry (sleep) after handle_error + subject.do_stop + end + + subject.run(queue) + + tt.join + + try(3) { expect( queue.size ).to be > 0 } + end + + context 'error handling' do + + let(:config) do + super().merge 'batch_count' => 2 + end + + it 'keeps running when a (non-Redis) io error occurs' do + raised = false + allow(subject).to receive(:connect).and_return redis = double('redis') + allow(redis).to receive(:blpop).and_return nil + expect(redis).to receive(:evalsha) do + unless raised + raised = true + raise IOError.new('closed stream') + end + [] + end.at_least(1) + redis + allow(subject).to receive(:stop) + + expect(subject.logger).to receive(:error).with('Unexpected error', + hash_including(:message=>'closed stream', :exception=>IOError) + ).and_call_original + + tt = Thread.new do + sleep 2.0 # allow for retry (sleep) after handle_error + subject.do_stop + end + + subject.run(queue) + + tt.join + end + + end + context "when the batch size is greater than 1" do let(:batch_count) { 10 } @@ -233,9 +298,6 @@ def process(conf, event_count) end it 'multiple close calls, calls to redis once' do - # subject.use_redis(redis) - # allow(redis).to receive(:blpop).and_return(['foo', 'l1']) - # expect(redis).to receive(:connected?).and_return(connected.last) allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false # allow_any_instance_of( Redis::Client ).to receive(:disconnect) quit_calls.each do |call| @@ -249,6 +311,9 @@ def process(conf, event_count) end context 'for the subscribe data_types' do + + before { subject.register } + def run_it_thread(inst) Thread.new(inst) do |subj| subj.run(queue) @@ -289,6 +354,8 @@ def close_thread(inst, rt) let(:data_type) { 'channel' } let(:quit_calls) { [:unsubscribe, :connection] } + before { subject.register } + context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop @@ -367,6 +434,10 @@ def close_thread(inst, rt) ["list", "channel", "pattern_channel"].each do |data_type| context data_type do + # TODO pending + # redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout + next unless data_type == 'list' + it_behaves_like "an interruptible input plugin", :redis => true do let(:config) { { 'key' => 'foo', 'data_type' => data_type } } end From 35c954f8b3e23585a3753d970c6a25298386270e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io> Date: Fri, 7 May 2021 14:21:25 +0300 Subject: [PATCH 10/11] Update pattern list tests according to recent refactorings --- spec/inputs/redis_spec.rb | 42 ++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index de7bdf9..f354839 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -114,8 +114,9 @@ def process(conf, event_count) let(:queue) { Queue.new } let(:data_type) { 'list' } + let(:redis_key) { 'foo' } let(:batch_count) { 1 } - let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } + let(:config) { {'key' => redis_key, 'data_type' => data_type, 'batch_count' => batch_count} } let(:quit_calls) { [:quit] } subject do @@ -354,57 +355,58 @@ def process(conf, event_count) context 'runtime for pattern_list data_type' do let(:data_type) { 'pattern_list' } - let(:key) { 'foo.*' } + let(:redis_key) { 'foo.*' } + before do subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) subject.init_threadpool end + after do + subject.stop + end + context 'close when redis is unset' do let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } it 'does not attempt to quit' do - allow(redis).to receive(:nil?).and_return(true) + allow_any_instance_of( Redis::Client ).to receive(:nil?).and_return(true) quit_calls.each do |call| - expect(redis).not_to receive(call) + expect_any_instance_of( Redis::Client ).not_to receive(call) end expect {subject.do_stop}.not_to raise_error end end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar']) - expect(redis).to receive(:lpop).at_least(:once).and_return('l1') - - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:quit) + expect_any_instance_of( Redis ).to receive(:keys).at_least(:once).with(redis_key).and_return ['foo.bar'] + expect_any_instance_of( Redis ).to receive(:lpop).at_least(:once).with('foo.bar').and_return 'l1' tt = Thread.new do end_by = Time.now + 3 - while accumulator.size < 1 and Time.now <= end_by - sleep 0.1 - end + sleep 0.1 until queue.size > 0 or Time.now > end_by subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join - expect(accumulator.size).to be > 0 + expect(queue.size).to be > 0 end it 'multiple close calls, calls to redis once' do - subject.use_redis(redis) - allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar']) - allow(redis).to receive(:lpop).and_return('l1') - expect(redis).to receive(:connected?).and_return(connected.last) + allow_any_instance_of( Redis ).to receive(:keys).with(redis_key).and_return(['foo.bar']) + allow_any_instance_of( Redis ).to receive(:lpop).with('foo.bar').and_return('l1') + quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end subject.do_stop - connected.push(false) #can't use let block here so push to array expect {subject.do_stop}.not_to raise_error subject.do_stop end From 40c52e9f3899c747e393a9ea4b5190d79588e916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io> Date: Fri, 7 May 2021 14:47:32 +0300 Subject: [PATCH 11/11] Fix integration tests --- spec/inputs/redis_spec.rb | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index f354839..78c3213 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -16,11 +16,17 @@ def populate(key, event_count) end end -def process(conf, event_count) +def wait_events(conf, event_count) events = input(conf) do |_, queue| sleep 0.1 until queue.size >= event_count queue.size.times.map { queue.pop } end + expect(events.size).to eq event_count + events +end + +def process(conf, event_count) + events = wait_events(conf, event_count) # due multiple workers we get events out-of-order in the output events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') } expect(events[0].get('sequence')).to eq(0) @@ -73,7 +79,7 @@ def process(conf, event_count) input { redis { type => "blah" - key => "#{key}.*" + key => "#{key_base}.*" data_type => "pattern_list" batch_count => 1 } @@ -85,7 +91,7 @@ def process(conf, event_count) total_event_count += event_count populate("#{key_base}.#{idx}", event_count) end - process(conf, total_event_count) + wait_events(conf, total_event_count) end it "should read events from a list pattern using batch_count (default 125)" do @@ -94,7 +100,7 @@ def process(conf, event_count) input { redis { type => "blah" - key => "#{key}.*" + key => "#{key_base}.*" data_type => "pattern_list" batch_count => 125 } @@ -106,7 +112,7 @@ def process(conf, event_count) total_event_count += event_count populate("#{key_base}.#{idx}", event_count) end - process(conf, total_event_count) + wait_events(conf, total_event_count) end end