From f7ef5fdc90c380876eb4634251a785dfafd0b552 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io>
Date: Fri, 3 Apr 2020 16:05:38 +0300
Subject: [PATCH 01/11] Support list patterns

---
 lib/logstash/inputs/redis.rb | 159 ++++++++++++++++++++++++++++++++---
 spec/inputs/redis_spec.rb    | 102 +++++++++++++++++++++-
 2 files changed, 247 insertions(+), 14 deletions(-)

diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index 9722d8b..432874c 100755
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -3,6 +3,8 @@
 require "logstash/inputs/base"
 require "logstash/inputs/threadable"
 require 'redis'
+require 'concurrent'
+require 'concurrent/executors'
 
 # This input will read events from a Redis instance; it supports both Redis channels and lists.
 # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -49,9 +51,11 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
   config :key, :validate => :string, :required => true
 
   # Specify either list or channel.  If `data_type` is `list`, then we will BLPOP the
-  # key.  If `data_type` is `channel`, then we will SUBSCRIBE to the key.
-  # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
-  config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
+  # key. If `data_type` is `pattern_list`, then we will spawn a number of worker
+  # threads that will LPOP from keys matching that pattern. If `data_type` is
+  # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
+  # then we will PSUBSCRIBE to the key.
+  config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true
 
   # The number of events to return from Redis using EVAL.
   config :batch_count, :validate => :number, :default => 125
@@ -59,6 +63,18 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
   # Redefined Redis commands to be passed to the Redis client.
   config :command_map, :validate => :hash, :default => {}
 
+  # Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
+  config :pattern_list_threads, :validate => :number, :default => 20
+
+  # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
+  # After the list is empty or this number of items have been processed, the thread will exit and a
+  # new one will be started if there are non-empty lists matching the pattern without a consumer.
+  config :pattern_list_max_items, :validate => :number, :default => 1000
+
+  # Time to sleep in main loop after checking if more threads can/need to be spawned.
+  # Applies to `data_type` is `pattern_list`
+  config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2
+
   public
   # public API
   # use to store a proc that can provide a Redis instance or mock
@@ -77,6 +93,15 @@ def new_redis_instance
     @redis_builder.call
   end
 
+  def init_threadpool
+    @threadpool ||= Concurrent::ThreadPoolExecutor.new(
+        min_threads: @pattern_list_threads,
+        max_threads: @pattern_list_threads,
+        max_queue: 2 * @pattern_list_threads
+    )
+    @current_workers ||= Concurrent::Set.new
+  end
+
   def register
     @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
 
@@ -86,6 +111,9 @@ def register
     if @data_type == 'list' || @data_type == 'dummy'
       @run_method = method(:list_runner)
       @stop_method = method(:list_stop)
+    elsif @data_type == 'pattern_list'
+      @run_method = method(:pattern_list_runner)
+      @stop_method = method(:pattern_list_stop)
     elsif @data_type == 'channel'
       @run_method = method(:channel_runner)
       @stop_method = method(:subscribe_stop)
@@ -94,8 +122,6 @@ def register
       @stop_method = method(:subscribe_stop)
     end
 
-    @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
-
     @identity = "#{@redis_url} #{@data_type}:#{@key}"
     @logger.info("Registering Redis", :identity => @identity)
   end # def register
@@ -119,7 +145,7 @@ def batched?
 
   # private
   def is_list_type?
-    @data_type == 'list'
+    @data_type == 'list' || @data_type == 'pattern_list'
   end
 
   # private
@@ -193,15 +219,21 @@ def queue_event(msg, output_queue, channel=nil)
   end
 
   # private
-  def list_stop
+  def reset_redis
     return if @redis.nil? || !@redis.connected?
 
     @redis.quit rescue nil
     @redis = nil
   end
 
+  # private
+  def list_stop
+    reset_redis
+  end
+
   # private
   def list_runner(output_queue)
+    @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
     while !stop?
       begin
         @redis ||= connect
@@ -217,16 +249,113 @@ def list_runner(output_queue)
     end
   end
 
-  def list_batch_listener(redis, output_queue)
+  #private
+  def reset_threadpool
+    return if @threadpool.nil?
+    @threadpool.shutdown
+    @threadpool.wait_for_termination
+    @threadpool = nil
+  end
+
+  # private
+  def pattern_list_stop
+    reset_redis
+    reset_threadpool
+  end
+
+  # private
+  def pattern_list_process_item(redis, output_queue, key)
+    if stop?
+      @logger.debug("Breaking from thread #{key} as it was requested to stop")
+      return false
+    end
+    value = redis.lpop(key)
+    return false if value.nil?
+    queue_event(value, output_queue)
+    true
+  end
+
+  # private
+  def pattern_list_single_processor(redis, output_queue, key)
+    (0...@pattern_list_max_items).each do
+      break unless pattern_list_process_item(redis, output_queue, key)
+    end
+  end
+
+  # private
+  def pattern_list_batch_processor(redis, output_queue, key)
+    items_left = @pattern_list_max_items
+    while items_left > 0
+      limit = [items_left, @batch_count].min
+      processed = process_batch(redis, output_queue, key, limit, 0)
+      if processed.zero? || processed < limit
+        return
+      end
+      items_left -= processed
+    end
+  end
+
+  # private
+  def pattern_list_worker_consume(output_queue, key)
     begin
-      results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
-      results.each do |item|
-        queue_event(item, output_queue)
+      redis ||= connect
+      @pattern_list_processor.call(redis, output_queue, key)
+    rescue ::Redis::BaseError => e
+      @logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
+      sleep 1
+      return
+    ensure
+      redis.quit rescue nil
+    end
+  end
+
+  # private
+  def threadpool_capacity?
+    @threadpool.remaining_capacity > 0
+  end
+
+  # private
+  def pattern_list_launch_worker(output_queue, key)
+    @current_workers.add(key)
+    @threadpool.post do
+      begin
+        pattern_list_worker_consume(output_queue, key)
+      ensure
+        @current_workers.delete(key)
       end
+    end
+  end
 
-      if results.size.zero?
-        sleep BATCH_EMPTY_SLEEP
+  # private
+  def pattern_list_ensure_workers(output_queue)
+    return unless threadpool_capacity?
+    redis_runner do
+      @redis.keys(@key).shuffle.each do |key|
+        next if @current_workers.include?(key)
+        pattern_list_launch_worker(output_queue, key)
+        break unless threadpool_capacity?
       end
+    end
+  end
+
+  # private
+  def pattern_list_runner(output_queue)
+    @pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor)
+    while !stop?
+      init_threadpool if @threadpool.nil?
+      pattern_list_ensure_workers(output_queue)
+      sleep(@pattern_list_threadpool_sleep)
+    end
+  end
+
+  def process_batch(redis, output_queue, key, batch_size, sleep_time)
+    begin
+      results = redis.evalsha(@redis_script_sha, [key], [batch_size-1])
+      results.each do |item|
+        queue_event(item, output_queue)
+      end
+      sleep sleep_time if results.size.zero? && sleep_time > 0
+      results.size
 
       # Below is a commented-out implementation of 'batch fetch'
       # using pipelined LPOP calls. This in practice has been observed to
@@ -255,6 +384,10 @@ def list_batch_listener(redis, output_queue)
     end
   end
 
+  def list_batch_listener(redis, output_queue)
+    process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP)
+  end
+
   def list_single_listener(redis, output_queue)
     item = redis.blpop(@key, 0, :timeout => 1)
     return unless item # from timeout or other conditions
diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index 6675a8e..45dabfe 100755
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -63,6 +63,48 @@ def process(conf, event_count)
     populate(key, event_count)
     process(conf, event_count)
   end
+
+  it "should read events from a list pattern" do
+    key_base = SecureRandom.hex
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}.*"
+          data_type => "pattern_list"
+          batch_count => 1
+        }
+      }
+    CONFIG
+    total_event_count = 0
+    (0..10).each do |idx|
+      event_count = 100 + rand(50)
+      total_event_count += event_count
+      populate("#{key_base}.#{idx}", event_count)
+    end
+    process(conf, total_event_count)
+  end
+
+  it "should read events from a list pattern using batch_count (default 125)" do
+    key_base = SecureRandom.hex
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}.*"
+          data_type => "pattern_list"
+          batch_count => 125
+        }
+      }
+    CONFIG
+    total_event_count = 0
+    (0..10).each do |idx|
+      event_count = 100 + rand(50)
+      total_event_count += event_count
+      populate("#{key_base}.#{idx}", event_count)
+    end
+    process(conf, total_event_count)
+  end
 end
 
 # unit tests ---------------------
@@ -264,6 +306,64 @@ def process(conf, event_count)
     end
   end
 
+  context 'runtime for pattern_list data_type' do
+    let(:data_type) { 'pattern_list' }
+    let(:key) { 'foo.*' }
+    before do
+      subject.register
+      subject.init_threadpool
+    end
+
+    context 'close when redis is unset' do
+      let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
+
+      it 'does not attempt to quit' do
+        allow(redis).to receive(:nil?).and_return(true)
+        quit_calls.each do |call|
+          expect(redis).not_to receive(call)
+        end
+        expect {subject.do_stop}.not_to raise_error
+      end
+    end
+
+    it 'calling the run method, adds events to the queue' do
+      expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
+      expect(redis).to receive(:lpop).at_least(:once).and_return('l1')
+
+      allow(redis).to receive(:connected?).and_return(connected.last)
+      allow(redis).to receive(:quit)
+
+      tt = Thread.new do
+        end_by = Time.now + 3
+        while accumulator.size < 1 and Time.now <= end_by
+          sleep 0.1
+        end
+        subject.do_stop
+      end
+
+      subject.run(accumulator)
+
+      tt.join
+
+      expect(accumulator.size).to be > 0
+    end
+
+    it 'multiple close calls, calls to redis once' do
+      subject.use_redis(redis)
+      allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
+      allow(redis).to receive(:lpop).and_return('l1')
+      expect(redis).to receive(:connected?).and_return(connected.last)
+      quit_calls.each do |call|
+        expect(redis).to receive(call).at_most(:once)
+      end
+
+      subject.do_stop
+      connected.push(false) #can't use let block here so push to array
+      expect {subject.do_stop}.not_to raise_error
+      subject.do_stop
+    end
+  end
+
   context 'for the subscribe data_types' do
     def run_it_thread(inst)
       Thread.new(inst) do |subj|
@@ -396,7 +496,7 @@ def close_thread(inst, rt)
 
   describe LogStash::Inputs::Redis do
     context "when using data type" do
-      ["list", "channel", "pattern_channel"].each do |data_type|
+      ["list", "channel", "pattern_channel", "pattern_list"].each do |data_type|
         context data_type do
           it_behaves_like "an interruptible input plugin" do
             let(:config) { {'key' => 'foo', 'data_type' => data_type } }

From 12e270ca0f43297e10722964949727da969f321a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io>
Date: Tue, 7 Apr 2020 14:44:04 +0300
Subject: [PATCH 02/11] Update docs

---
 docs/index.asciidoc | 48 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 42 insertions(+), 6 deletions(-)

diff --git a/docs/index.asciidoc b/docs/index.asciidoc
index cc8340e..7f80126 100755
--- a/docs/index.asciidoc
+++ b/docs/index.asciidoc
@@ -84,12 +84,14 @@ Redis allows for the renaming or disabling of commands in its protocol, see:  ht
 ===== `data_type`
 
   * This is a required setting.
-  * Value can be any of: `list`, `channel`, `pattern_channel`
+  * Value can be any of: `list`, `pattern_list`, `channel`, `pattern_channel`
   * There is no default value for this setting.
 
 Specify either list or channel.  If `data_type` is `list`, then we will BLPOP the
-key.  If `data_type` is `channel`, then we will SUBSCRIBE to the key.
-If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
+key. If `data_type` is `pattern_list`, then we will spawn a number of worker
+threads that will LPOP from keys matching that pattern.  If `data_type` is
+`channel`, then we will SUBSCRIBE to the key. If `data_type` is
+`pattern_channel`, then we will PSUBSCRIBE to the key.
 
 [id="plugins-{type}s-{plugin}-db"]
 ===== `db`
@@ -125,6 +127,7 @@ The unix socket path of your Redis server.
 
 The name of a Redis list or channel.
 
+
 [id="plugins-{type}s-{plugin}-password"]
 ===== `password`
 
@@ -133,6 +136,37 @@ The name of a Redis list or channel.
 
 Password to authenticate with. There is no authentication by default.
 
+
+[id="plugins-{type}s-{plugin}-pattern_list_max_items"]
+===== `pattern_list_max_items`
+
+  * Value type is <<number,number>>
+  * Default value is `1000`
+
+Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
+After the list is empty or this number of items have been processed, the thread will exit and a
+new one will be started if there are non-empty lists matching the pattern without a consumer.
+
+
+[id="plugins-{type}s-{plugin}-pattern_list_threadpool_sleep"]
+===== `pattern_list_threadpool_sleep`
+
+  * Value type is <<number,number>>
+  * Default value is `0.2`
+
+Time to sleep in main loop after checking if more threads can/need to be spawned.
+Applies to `data_type` is `pattern_list`
+
+
+[id="plugins-{type}s-{plugin}-pattern_list_threads"]
+===== `pattern_list_threads`
+
+  * Value type is <<number,number>>
+  * Default value is `20`
+
+Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
+
+
 [id="plugins-{type}s-{plugin}-port"]
 ===== `port`
 
@@ -141,8 +175,9 @@ Password to authenticate with. There is no authentication by default.
 
 The port to connect on.
 
+
 [id="plugins-{type}s-{plugin}-ssl"]
-===== `ssl` 
+===== `ssl`
 
   * Value type is <<boolean,boolean>>
   * Default value is `false`
@@ -157,7 +192,6 @@ Enable SSL support.
   * Default value is `1`
 
 
-
 [id="plugins-{type}s-{plugin}-timeout"]
 ===== `timeout`
 
@@ -166,7 +200,9 @@ Enable SSL support.
 
 Initial connection timeout in seconds.
 
+
+
 [id="plugins-{type}s-{plugin}-common-options"]
 include::{include_path}/{type}.asciidoc[]
 
-:default_codec!:
\ No newline at end of file
+:default_codec!:

From 1d220f84a5d5c0df3f7f8bb001b94e65c39b0c73 Mon Sep 17 00:00:00 2001
From: Kaise Cheng <kaise.cheng@elastic.co>
Date: Tue, 8 Dec 2020 15:35:36 +0100
Subject: [PATCH 03/11] remove Logstash::Pipeline

---
 batch_perf/perf_batch.rb | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/batch_perf/perf_batch.rb b/batch_perf/perf_batch.rb
index 3863d85..ac81e85 100644
--- a/batch_perf/perf_batch.rb
+++ b/batch_perf/perf_batch.rb
@@ -3,7 +3,7 @@
 require "securerandom"
 
 require "logstash/event"
-require "logstash/pipeline"
+require "logstash/java_pipeline"
 require_relative "../lib/logstash/inputs/redis"
 
 class BenchOptions
@@ -35,7 +35,7 @@ def cfg_batch(d)
 bench_options = BenchOptions.new
 
 def input(cfg, slow, &block)
-  pipeline = LogStash::Pipeline.new(cfg)
+  pipeline = LogStash::JavaPipeline.new(cfg)
   queue = Queue.new
 
   pipeline.instance_eval do

From 05e9b5324d4822d8e186c167db079e28a357b4f8 Mon Sep 17 00:00:00 2001
From: Kaise Cheng <kaise.cheng@elastic.co>
Date: Tue, 8 Dec 2020 16:02:16 +0100
Subject: [PATCH 04/11] bump version

---
 CHANGELOG.md                 | 3 +++
 logstash-input-redis.gemspec | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 65fc096..63e70e3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+## 3.6.1
+  - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84)
+
 ## 3.5.1
   - [DOC] Reordered config option to alpha order [#79](https://github.com/logstash-plugins/logstash-input-redis/issues/79)
 
diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec
index e68f21a..96cce30 100755
--- a/logstash-input-redis.gemspec
+++ b/logstash-input-redis.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-input-redis'
-  s.version         = '3.5.1'
+  s.version         = '3.6.1'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = "Reads events from a Redis instance"
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

From bb4885f6516b3f47cba85ba75151cef6576f24ea Mon Sep 17 00:00:00 2001
From: Kaise Cheng <kaise.cheng@elastic.co>
Date: Mon, 21 Dec 2020 15:32:30 +0100
Subject: [PATCH 05/11] bump version

---
 CHANGELOG.md                 | 2 +-
 logstash-input-redis.gemspec | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 63e70e3..1677336 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,4 @@
-## 3.6.1
+## 3.6.0
   - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84)
 
 ## 3.5.1
diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec
index 96cce30..9ef3f4c 100755
--- a/logstash-input-redis.gemspec
+++ b/logstash-input-redis.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-input-redis'
-  s.version         = '3.6.1'
+  s.version         = '3.6.0'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = "Reads events from a Redis instance"
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

From f09a4cfeba0dd161a7557410d007c4cd29cc1664 Mon Sep 17 00:00:00 2001
From: Kaise Cheng <kaise.cheng@elastic.co>
Date: Tue, 22 Dec 2020 11:04:20 +0100
Subject: [PATCH 06/11] fix broken test

---
 lib/logstash/inputs/redis.rb | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index 9722d8b..7f0a19b 100755
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -3,6 +3,7 @@
 require "logstash/inputs/base"
 require "logstash/inputs/threadable"
 require 'redis'
+require "stud/interval"
 
 # This input will read events from a Redis instance; it supports both Redis channels and lists.
 # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and

From 45a67022030f9827858803d394178a91ef35e296 Mon Sep 17 00:00:00 2001
From: Joao Duarte <jsvduarte@gmail.com>
Date: Mon, 4 Jan 2021 12:49:19 +0000
Subject: [PATCH 07/11] [skip ci] update travis ci badge from .org to .com

---
 README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 5fd2cf6..10fa417 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
 # Logstash Plugin
 
 Travis Build
-[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-input-redis.svg)](https://travis-ci.org/logstash-plugins/logstash-input-redis)
+[![Travis Build Status](https://travis-ci.com/logstash-plugins/logstash-input-redis.svg)](https://travis-ci.com/logstash-plugins/logstash-input-redis)
 
 This is a plugin for [Logstash](https://github.com/elastic/logstash).
 

From dba90d8761fad79cb05b32cc8f8fd0a6d99146b9 Mon Sep 17 00:00:00 2001
From: Karol Bucek <kares@users.noreply.github.com>
Date: Wed, 7 Apr 2021 13:36:37 +0200
Subject: [PATCH 08/11] Fix: resolve crash when commands_map is set (#86)

* Fix: do not rely on redis.client anymore
* Test: redo specs with 'real' Redis mocking
* Refactor: cleanup plugin (internals)

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
---
 CHANGELOG.md                 |   3 +
 lib/logstash/inputs/redis.rb |  41 ++----
 logstash-input-redis.gemspec |   4 +-
 spec/inputs/redis_spec.rb    | 271 ++++++++++++++++-------------------
 4 files changed, 136 insertions(+), 183 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1677336..81972ab 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+## 3.6.1
+  - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86)
+
 ## 3.6.0
   - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84)
 
diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index 7f0a19b..c92c170 100755
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -61,28 +61,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
   config :command_map, :validate => :hash, :default => {}
 
   public
-  # public API
-  # use to store a proc that can provide a Redis instance or mock
-  def add_external_redis_builder(builder) #callable
-    @redis_builder = builder
-    self
-  end
-
-  # use to apply an instance directly and bypass the builder
-  def use_redis(instance)
-    @redis = instance
-    self
-  end
-
-  def new_redis_instance
-    @redis_builder.call
-  end
 
   def register
     @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
 
-    @redis_builder ||= method(:internal_redis_builder)
-
     # just switch on data_type once
     if @data_type == 'list' || @data_type == 'dummy'
       @run_method = method(:list_runner)
@@ -147,8 +129,7 @@ def redis_params
     return connectionParams.merge(baseParams)
   end
 
-  # private
-  def internal_redis_builder
+  def new_redis_instance
     ::Redis.new(redis_params)
   end
 
@@ -157,14 +138,12 @@ def connect
     redis = new_redis_instance
 
     # register any renamed Redis commands
-    if @command_map.any?
-      client_command_map = redis.client.command_map
-      @command_map.each do |name, renamed|
-        client_command_map[name.to_sym] = renamed.to_sym
-      end
+    @command_map.each do |name, renamed|
+      redis._client.command_map[name.to_sym] = renamed.to_sym
     end
 
     load_batch_script(redis) if batched? && is_list_type?
+
     redis
   end # def connect
 
@@ -208,7 +187,9 @@ def list_runner(output_queue)
         @redis ||= connect
         @list_method.call(@redis, output_queue)
       rescue ::Redis::BaseError => e
-        @logger.warn("Redis connection problem", :exception => e)
+        info = { message: e.message, exception: e.class }
+        info[:backtrace] = e.backtrace if @logger.debug?
+        @logger.warn("Redis connection problem", info)
         # Reset the redis variable to trigger reconnect
         @redis = nil
         # this sleep does not need to be stoppable as its
@@ -270,14 +251,14 @@ def subscribe_stop
     return if @redis.nil? || !@redis.connected?
     # if its a SubscribedClient then:
     # it does not have a disconnect method (yet)
-    if @redis.client.is_a?(::Redis::SubscribedClient)
+    if @redis.subscribed?
       if @data_type == 'pattern_channel'
-        @redis.client.punsubscribe
+        @redis.punsubscribe
       else
-        @redis.client.unsubscribe
+        @redis.unsubscribe
       end
     else
-      @redis.client.disconnect
+      @redis.disconnect!
     end
     @redis = nil
   end
diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec
index 9ef3f4c..030338f 100755
--- a/logstash-input-redis.gemspec
+++ b/logstash-input-redis.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-input-redis'
-  s.version         = '3.6.0'
+  s.version         = '3.6.1'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = "Reads events from a Redis instance"
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -23,7 +23,7 @@ Gem::Specification.new do |s|
   s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
 
   s.add_runtime_dependency 'logstash-codec-json'
-  s.add_runtime_dependency 'redis', '~> 4'
+  s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5'
 
   s.add_development_dependency 'logstash-devutils'
 end
diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index 6675a8e..45a6318 100755
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -1,12 +1,12 @@
 require "logstash/devutils/rspec/spec_helper"
 require "logstash/devutils/rspec/shared_examples"
-require "redis"
-require "stud/try"
 require 'logstash/inputs/redis'
 require 'securerandom'
 
 def populate(key, event_count)
   require "logstash/event"
+  require "redis"
+  require "stud/try"
   redis = Redis.new(:host => "localhost")
   event_count.times do |value|
     event = LogStash::Event.new("sequence" => value)
@@ -65,153 +65,138 @@ def process(conf, event_count)
   end
 end
 
-# unit tests ---------------------
-
 describe LogStash::Inputs::Redis do
-  let(:redis) { double('redis') }
-  let(:builder) { ->{ redis } }
-  let(:connection) { double('redis_connection') }
-  let(:connected) { [true] }
+  let(:queue) { Queue.new }
+
   let(:data_type) { 'list' }
   let(:batch_count) { 1 }
-  let(:cfg) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} }
+  let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} }
   let(:quit_calls) { [:quit] }
-  let(:accumulator) { [] }
-  let(:command_map) { {} }
 
   subject do
-    LogStash::Plugin.lookup("input", "redis")
-      .new(cfg).add_external_redis_builder(builder)
+    LogStash::Inputs::Redis.new(config)
   end
 
   context 'construction' do
     it 'registers the input' do
-      expect {subject.register}.not_to raise_error
+      expect { subject.register }.not_to raise_error
     end
   end
 
   context 'renamed redis commands' do
-    let(:cfg) {
-      {'key' => 'foo',
-      'data_type' => data_type,
-      'command_map' =>
-        {
-        'blpop' => 'testblpop',
-        'evalsha' => 'testevalsha',
-        'lrange' => 'testlrange',
-        'ltrim' => 'testltrim',
-        'script' => 'testscript',
-        'subscribe' => 'testsubscribe',
-        'psubscribe' => 'testpsubscribe',
-        },
-        'batch_count' => 2
+    let(:config) do
+      {
+          'key' => 'foo',
+          'data_type' => data_type,
+          'command_map' => {
+              'blpop' => 'testblpop',
+              'evalsha' => 'testevalsha',
+              'lrange' => 'testlrange',
+              'ltrim' => 'testltrim',
+              'script' => 'testscript',
+              'subscribe' => 'testsubscribe',
+              'psubscribe' => 'testpsubscribe',
+          },
+          'batch_count' => 2
       }
-    }
-
-    before do
-      subject.register
-      allow(redis).to receive(:connected?)
-      allow(redis).to receive(:client).and_return(connection)
-      allow(connection).to receive(:command_map).and_return(command_map)
     end
 
     it 'sets the renamed commands in the command map' do
-      allow(redis).to receive(:script)
-      allow(redis).to receive(:evalsha).and_return([])
-
-      tt = Thread.new do
-        sleep 0.01
-        subject.do_stop
+      allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
+        expect(command[0]).to eql :script
+        expect(command[1]).to eql 'load'
       end
 
-      subject.run(accumulator)
-      tt.join
+      subject.register
+      redis = subject.send :connect
+
+      command_map = redis._client.command_map
 
-      expect(command_map[:blpop]).to eq cfg['command_map']['blpop'].to_sym
-      expect(command_map[:evalsha]).to eq cfg['command_map']['evalsha'].to_sym
-      expect(command_map[:lrange]).to eq cfg['command_map']['lrange'].to_sym
-      expect(command_map[:ltrim]).to eq cfg['command_map']['ltrim'].to_sym
-      expect(command_map[:script]).to eq cfg['command_map']['script'].to_sym
-      expect(command_map[:subscribe]).to eq cfg['command_map']['subscribe'].to_sym
-      expect(command_map[:psubscribe]).to eq cfg['command_map']['psubscribe'].to_sym
+      expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym
+      expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym
+      expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym
+      expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym
+      expect(command_map[:script]).to eq config['command_map']['script'].to_sym
+      expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym
+      expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym
     end
 
     it 'loads the batch script with the renamed command' do
-      capture = nil
-      allow(redis).to receive(:script) { |load, lua_script| capture = lua_script }
-      allow(redis).to receive(:evalsha).and_return([])
+      expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
+        expect(command[0]).to eql :script
+        expect(command[1]).to eql 'load'
 
-      tt = Thread.new do
-        sleep 0.01
-        subject.do_stop
+        script = command[2]
+        expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)"
+        expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)"
       end
 
-      subject.run(accumulator)
-      tt.join
-
-      expect(capture).to include "redis.call('#{cfg['command_map']['lrange']}', KEYS[1], 0, batchsize)"
-      expect(capture).to include "redis.call('#{cfg['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)"
+      subject.register
+      subject.send :connect
     end
   end
 
-
   context 'runtime for list data_type' do
+
     before do
       subject.register
+      allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true
+      allow_any_instance_of( Redis::Client ).to receive(:disconnect)
+      allow_any_instance_of( Redis ).to receive(:quit)
+    end
+
+    after do
+      subject.stop
     end
 
     context 'close when redis is unset' do
-      let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
 
       it 'does not attempt to quit' do
-        allow(redis).to receive(:nil?).and_return(true)
-        quit_calls.each do |call|
-          expect(redis).not_to receive(call)
-        end
-        expect {subject.do_stop}.not_to raise_error
+        expect_any_instance_of( Redis::Client ).to_not receive(:call)
+        expect_any_instance_of( Redis::Client ).to_not receive(:disconnect)
+
+        expect { subject.do_stop }.not_to raise_error
       end
     end
 
     it 'calling the run method, adds events to the queue' do
-      expect(redis).to receive(:blpop).at_least(:once).and_return(['foo', 'l1'])
-
-      allow(redis).to receive(:connected?).and_return(connected.last)
-      allow(redis).to receive(:quit)
+      allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
+        expect(command[0]).to eql :blpop
+        expect(command[1]).to eql ['foo', 0]
+        expect(command[2]).to eql 1
+      end.and_return ['foo', "{\"foo1\":\"bar\""], nil
 
       tt = Thread.new do
-        sleep 0.01
+        sleep 0.25
         subject.do_stop
       end
 
-      subject.run(accumulator)
+      subject.run(queue)
 
       tt.join
 
-      expect(accumulator.size).to be > 0
+      expect( queue.size ).to be > 0
     end
 
     context "when the batch size is greater than 1" do
       let(:batch_count) { 10 }
-      let(:rates) { [] }
-
-      before do
-        allow(redis).to receive(:connected?).and_return(connected.last)
-        allow(redis).to receive(:script)
-        allow(redis).to receive(:quit)
-      end
 
       it 'calling the run method, adds events to the queue' do
-        expect(redis).to receive(:evalsha).at_least(:once).and_return(['a', 'b'])
+        allow_any_instance_of( Redis ).to receive(:script)
+        allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
+          expect(command[0]).to eql :evalsha
+        end.and_return ['{"a": 1}', '{"b":'], []
 
         tt = Thread.new do
-          sleep 0.01
+          sleep 0.25
           subject.do_stop
         end
 
-        subject.run(accumulator)
+        subject.run(queue)
 
         tt.join
-        expect(accumulator.size).to be > 0
+
+        expect( queue.size ).to be > 0
       end
     end
 
@@ -220,20 +205,18 @@ def process(conf, event_count)
       let(:rates) { [] }
 
       it 'will throttle the loop' do
-        allow(redis).to receive(:evalsha) do
+        allow_any_instance_of( Redis ).to receive(:script)
+        allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
+          expect(command[0]).to eql :evalsha
           rates.unshift Time.now.to_f
-          []
-        end
-        allow(redis).to receive(:connected?).and_return(connected.last)
-        allow(redis).to receive(:script)
-        allow(redis).to receive(:quit)
+        end.and_return []
 
         tt = Thread.new do
-          sleep 1
+          sleep 0.25
           subject.do_stop
         end
 
-        subject.run(accumulator)
+        subject.run(queue)
 
         tt.join
 
@@ -242,7 +225,7 @@ def process(conf, event_count)
           inters << x - y
         end
 
-        expect(accumulator.size).to eq(0)
+        expect( queue.size ).to eq(0)
         inters.each do |delta|
           expect(delta).to be_within(0.01).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP)
         end
@@ -250,16 +233,17 @@ def process(conf, event_count)
     end
 
     it 'multiple close calls, calls to redis once' do
-      subject.use_redis(redis)
-      allow(redis).to receive(:blpop).and_return(['foo', 'l1'])
-      expect(redis).to receive(:connected?).and_return(connected.last)
+      # subject.use_redis(redis)
+      # allow(redis).to receive(:blpop).and_return(['foo', 'l1'])
+      # expect(redis).to receive(:connected?).and_return(connected.last)
+      allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
+      # allow_any_instance_of( Redis::Client ).to receive(:disconnect)
       quit_calls.each do |call|
-        expect(redis).to receive(call).at_most(:once)
+        allow_any_instance_of( Redis ).to receive(call).at_most(:once)
       end
 
       subject.do_stop
-      connected.push(false) #can't use let block here so push to array
-      expect {subject.do_stop}.not_to raise_error
+      expect { subject.do_stop }.not_to raise_error
       subject.do_stop
     end
   end
@@ -267,7 +251,7 @@ def process(conf, event_count)
   context 'for the subscribe data_types' do
     def run_it_thread(inst)
       Thread.new(inst) do |subj|
-        subj.run(accumulator)
+        subj.run(queue)
       end
     end
 
@@ -283,35 +267,21 @@ def publish_thread(new_redis, prefix)
     def close_thread(inst, rt)
       Thread.new(inst, rt) do |subj, runner|
         # block for the messages
-        e1 = accumulator.pop
-        e2 = accumulator.pop
+        e1 = queue.pop
+        e2 = queue.pop
         # put em back for the tests
-        accumulator.push(e1)
-        accumulator.push(e2)
+        queue.push(e1)
+        queue.push(e2)
         runner.raise(LogStash::ShutdownSignal)
         subj.close
       end
     end
 
-    let(:accumulator) { Queue.new }
-
-    let(:instance) do
-      inst = described_class.new(cfg)
-      inst.register
-      inst
-    end
-
     before(:example, type: :mocked) do
       subject.register
-      subject.use_redis(redis)
-      allow(connection).to receive(:is_a?).and_return(true)
-      allow(redis).to receive(:client).and_return(connection)
-      expect(redis).to receive(:connected?).and_return(connected.last)
-      allow(connection).to receive(:unsubscribe)
-      allow(connection).to receive(:punsubscribe)
-
+      allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
       quit_calls.each do |call|
-        expect(redis).to receive(call).at_most(:once)
+        allow_any_instance_of( Redis ).to receive(call).at_most(:once)
       end
     end
 
@@ -322,8 +292,7 @@ def close_thread(inst, rt)
       context 'mocked redis' do
         it 'multiple stop calls, calls to redis once', type: :mocked do
           subject.do_stop
-          connected.push(false) #can't use let block here so push to array
-          expect {subject.do_stop}.not_to raise_error
+          expect { subject.do_stop }.not_to raise_error
           subject.do_stop
         end
       end
@@ -331,23 +300,23 @@ def close_thread(inst, rt)
       context 'real redis', :redis => true do
         it 'calling the run method, adds events to the queue' do
           #simulate the input thread
-          rt = run_it_thread(instance)
+          rt = run_it_thread(subject)
           #simulate the other system thread
-          publish_thread(instance.new_redis_instance, 'c').join
+          publish_thread(subject.send(:new_redis_instance), 'c').join
           #simulate the pipeline thread
-          close_thread(instance, rt).join
+          close_thread(subject, rt).join
 
-          expect(accumulator.size).to eq(2)
+          expect(queue.size).to eq(2)
         end
         it 'events had redis_channel' do
           #simulate the input thread
-          rt = run_it_thread(instance)
+          rt = run_it_thread(subject)
           #simulate the other system thread
-          publish_thread(instance.new_redis_instance, 'c').join
+          publish_thread(subject.send(:new_redis_instance), 'c').join
           #simulate the pipeline thread
-          close_thread(instance, rt).join
-          e1 = accumulator.pop
-          e2 = accumulator.pop
+          close_thread(subject, rt).join
+          e1 = queue.pop
+          e2 = queue.pop
           expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
           expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
         end
@@ -361,8 +330,7 @@ def close_thread(inst, rt)
       context 'mocked redis' do
         it 'multiple stop calls, calls to redis once', type: :mocked do
           subject.do_stop
-          connected.push(false) #can't use let block here so push to array
-          expect {subject.do_stop}.not_to raise_error
+          expect { subject.do_stop }.not_to raise_error
           subject.do_stop
         end
       end
@@ -370,23 +338,24 @@ def close_thread(inst, rt)
       context 'real redis', :redis => true do
         it 'calling the run method, adds events to the queue' do
           #simulate the input thread
-          rt = run_it_thread(instance)
+          rt = run_it_thread(subject)
           #simulate the other system thread
-          publish_thread(instance.new_redis_instance, 'pc').join
+          publish_thread(subject.send(:new_redis_instance), 'pc').join
           #simulate the pipeline thread
-          close_thread(instance, rt).join
+          close_thread(subject, rt).join
 
-          expect(accumulator.size).to eq(2)
+          expect(queue.size).to eq(2)
         end
+
         it 'events had redis_channel' do
           #simulate the input thread
-          rt = run_it_thread(instance)
+          rt = run_it_thread(subject)
           #simulate the other system thread
-          publish_thread(instance.new_redis_instance, 'pc').join
+          publish_thread(subject.send(:new_redis_instance), 'pc').join
           #simulate the pipeline thread
-          close_thread(instance, rt).join
-          e1 = accumulator.pop
-          e2 = accumulator.pop
+          close_thread(subject, rt).join
+          e1 = queue.pop
+          e2 = queue.pop
           expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
           expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
         end
@@ -394,15 +363,15 @@ def close_thread(inst, rt)
     end
   end
 
-  describe LogStash::Inputs::Redis do
-    context "when using data type" do
-      ["list", "channel", "pattern_channel"].each do |data_type|
-        context data_type do
-          it_behaves_like "an interruptible input plugin" do
-            let(:config) { {'key' => 'foo', 'data_type' => data_type } }
-          end
+  context "when using data type" do
+
+    ["list", "channel", "pattern_channel"].each do |data_type|
+      context data_type do
+        it_behaves_like "an interruptible input plugin", :redis => true do
+          let(:config) { { 'key' => 'foo', 'data_type' => data_type } }
         end
       end
     end
+
   end
 end

From 71d3ee5d75bd42458685e0ffd0520727d9ccaa21 Mon Sep 17 00:00:00 2001
From: Karol Bucek <kares@users.noreply.github.com>
Date: Mon, 3 May 2021 19:08:11 +0200
Subject: [PATCH 09/11] Fix: better (Redis) exception handling (#89)

Redis doesn't wrap all of the errors e.g. (retriable) IOError might be raised from an IO write (#88).
With the refactored error handling we make sure these are logged and retried instead of causing pipeline crashes.

Also not all specs from the suite were run on the CI, as some are tagged with redis.
These require a real Redis server, no reason not to run them against CI as well.
---
 .ci/docker-compose.override.yml |   5 ++
 .ci/run.sh                      |  10 +++
 .travis.yml                     |  13 +++-
 CHANGELOG.md                    |   4 ++
 lib/logstash/inputs/redis.rb    | 104 +++++++++++++++++++-------------
 logstash-input-redis.gemspec    |   2 +-
 spec/inputs/redis_spec.rb       |  89 ++++++++++++++++++++++++---
 7 files changed, 174 insertions(+), 53 deletions(-)
 create mode 100644 .ci/docker-compose.override.yml
 create mode 100755 .ci/run.sh

diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml
new file mode 100644
index 0000000..7c3fd31
--- /dev/null
+++ b/.ci/docker-compose.override.yml
@@ -0,0 +1,5 @@
+version: '3'
+
+services:
+    logstash:
+      network_mode: host
diff --git a/.ci/run.sh b/.ci/run.sh
new file mode 100755
index 0000000..daf3303
--- /dev/null
+++ b/.ci/run.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+# This is intended to be run inside the docker container as the command of the docker-compose.
+
+env
+
+set -ex
+
+jruby -rbundler/setup -S rspec -fd
+
+jruby -rbundler/setup -S rspec -fd --tag redis
diff --git a/.travis.yml b/.travis.yml
index a50fc73..028f060 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,2 +1,13 @@
 import:
-- logstash-plugins/.ci:travis/travis.yml@1.x
\ No newline at end of file
+- logstash-plugins/.ci:travis/travis.yml@1.x
+
+addons:
+  apt:
+    sources:
+      - sourceline: 'ppa:chris-lea/redis-server'
+    packages:
+      - redis-server
+
+before_install:
+  - sudo service redis-server stop
+  - sudo service redis-server start --bind 0.0.0.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 81972ab..d6479c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 3.7.0
+  - Fix: better (Redis) exception handling [#89](https://github.com/logstash-plugins/logstash-input-redis/pull/89)
+  - Test: start running integration specs on CI
+
 ## 3.6.1
   - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86)
 
diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index c92c170..e8a2667 100755
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -107,26 +107,22 @@ def is_list_type?
 
   # private
   def redis_params
+    params = {
+        :timeout => @timeout,
+        :db => @db,
+        :password => @password.nil? ? nil : @password.value,
+        :ssl => @ssl
+    }
+
     if @path.nil?
-      connectionParams = {
-        :host => @host,
-        :port => @port
-      }
+      params[:host] = @host
+      params[:port] = @port
     else
       @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
-      connectionParams = {
-        :path => @path
-      }
+      params[:path] = @path
     end
 
-    baseParams = {
-      :timeout => @timeout,
-      :db => @db,
-      :password => @password.nil? ? nil : @password.value,
-      :ssl => @ssl
-    }
-
-    return connectionParams.merge(baseParams)
+    params
   end
 
   def new_redis_instance
@@ -174,9 +170,12 @@ def queue_event(msg, output_queue, channel=nil)
 
   # private
   def list_stop
-    return if @redis.nil? || !@redis.connected?
+    redis = @redis # might change during method invocation
+    return if redis.nil? || !redis.connected?
 
-    @redis.quit rescue nil
+    redis.quit rescue nil # does client.disconnect internally
+    # check if input retried while executing
+    list_stop unless redis.equal? @redis
     @redis = nil
   end
 
@@ -186,15 +185,9 @@ def list_runner(output_queue)
       begin
         @redis ||= connect
         @list_method.call(@redis, output_queue)
-      rescue ::Redis::BaseError => e
-        info = { message: e.message, exception: e.class }
-        info[:backtrace] = e.backtrace if @logger.debug?
-        @logger.warn("Redis connection problem", info)
-        # Reset the redis variable to trigger reconnect
-        @redis = nil
-        # this sleep does not need to be stoppable as its
-        # in a while !stop? loop
-        sleep 1
+      rescue => e
+        log_error(e)
+        retry if reset_for_error_retry(e)
       end
     end
   end
@@ -248,18 +241,19 @@ def list_single_listener(redis, output_queue)
 
   # private
   def subscribe_stop
-    return if @redis.nil? || !@redis.connected?
-    # if its a SubscribedClient then:
-    # it does not have a disconnect method (yet)
-    if @redis.subscribed?
+    redis = @redis # might change during method invocation
+    return if redis.nil? || !redis.connected?
+
+    if redis.subscribed?
       if @data_type == 'pattern_channel'
-        @redis.punsubscribe
+        redis.punsubscribe
       else
-        @redis.unsubscribe
+        redis.unsubscribe
       end
-    else
-      @redis.disconnect!
     end
+    redis.close rescue nil # does client.disconnect
+    # check if input retried while executing
+    subscribe_stop unless redis.equal? @redis
     @redis = nil
   end
 
@@ -268,15 +262,43 @@ def redis_runner
     begin
       @redis ||= connect
       yield
-    rescue ::Redis::BaseError => e
-      @logger.warn("Redis connection problem", :exception => e)
-      # Reset the redis variable to trigger reconnect
-      @redis = nil
-      Stud.stoppable_sleep(1) { stop? }
-      retry if !stop?
+    rescue => e
+      log_error(e)
+      retry if reset_for_error_retry(e)
+    end
+  end
+
+  def log_error(e)
+    info = { message: e.message, exception: e.class }
+    info[:backtrace] = e.backtrace if @logger.debug?
+
+    case e
+    when ::Redis::TimeoutError
+      # expected for channels in case no data is available
+      @logger.debug("Redis timeout, retrying", info)
+    when ::Redis::BaseConnectionError, ::Redis::ProtocolError
+      @logger.warn("Redis connection error", info)
+    when ::Redis::BaseError
+      @logger.error("Redis error", info)
+    when ::LogStash::ShutdownSignal
+      @logger.debug("Received shutdown signal")
+    else
+      info[:backtrace] ||= e.backtrace
+      @logger.error("Unexpected error", info)
     end
   end
 
+  # @return [true] if operation is fine to retry
+  def reset_for_error_retry(e)
+    return if e.is_a?(::LogStash::ShutdownSignal)
+
+    # Reset the redis variable to trigger reconnect
+    @redis = nil
+
+    Stud.stoppable_sleep(1) { stop? }
+    !stop? # retry if not stop-ing
+  end
+
   # private
   def channel_runner(output_queue)
     redis_runner do
@@ -324,6 +346,4 @@ def pattern_channel_listener(output_queue)
     end
   end
 
-# end
-
 end end end # Redis Inputs  LogStash
diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec
index 030338f..520cacd 100755
--- a/logstash-input-redis.gemspec
+++ b/logstash-input-redis.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-input-redis'
-  s.version         = '3.6.1'
+  s.version         = '3.7.0'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = "Reads events from a Redis instance"
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index 45a6318..d070f13 100755
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -17,11 +17,15 @@ def populate(key, event_count)
 end
 
 def process(conf, event_count)
-  events = input(conf) do |pipeline, queue|
-    event_count.times.map{queue.pop}
+  events = input(conf) do |_, queue|
+    sleep 0.1 until queue.size >= event_count
+    queue.size.times.map { queue.pop }
   end
-
-  expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a)
+  # due multiple workers we get events out-of-order in the output
+  events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') }
+  expect(events[0].get('sequence')).to eq(0)
+  expect(events[100].get('sequence')).to eq(100)
+  expect(events[1000].get('sequence')).to eq(1000)
 end
 
 # integration tests ---------------------
@@ -31,7 +35,6 @@ def process(conf, event_count)
   it "should read events from a list" do
     key = SecureRandom.hex
     event_count = 1000 + rand(50)
-    # event_count = 100
     conf = <<-CONFIG
       input {
         redis {
@@ -163,7 +166,6 @@ def process(conf, event_count)
       allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
         expect(command[0]).to eql :blpop
         expect(command[1]).to eql ['foo', 0]
-        expect(command[2]).to eql 1
       end.and_return ['foo', "{\"foo1\":\"bar\""], nil
 
       tt = Thread.new do
@@ -178,6 +180,69 @@ def process(conf, event_count)
       expect( queue.size ).to be > 0
     end
 
+    it 'keeps running when a connection error occurs' do
+      raised = false
+      allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
+        expect(command[0]).to eql :blpop
+        unless raised
+          raised = true
+          raise Redis::CannotConnectError.new('test')
+        end
+        ['foo', "{\"after\":\"raise\"}"]
+      end
+
+      expect(subject.logger).to receive(:warn).with('Redis connection error',
+                                                    hash_including(:message=>"test", :exception=>Redis::CannotConnectError)
+      ).and_call_original
+
+      tt = Thread.new do
+        sleep 2.0 # allow for retry (sleep) after handle_error
+        subject.do_stop
+      end
+
+      subject.run(queue)
+
+      tt.join
+
+      try(3) { expect( queue.size ).to be > 0 }
+    end
+
+    context 'error handling' do
+
+      let(:config) do
+        super().merge 'batch_count' => 2
+      end
+
+      it 'keeps running when a (non-Redis) io error occurs' do
+        raised = false
+        allow(subject).to receive(:connect).and_return redis = double('redis')
+        allow(redis).to receive(:blpop).and_return nil
+        expect(redis).to receive(:evalsha) do
+          unless raised
+            raised = true
+            raise IOError.new('closed stream')
+          end
+          []
+        end.at_least(1)
+        redis
+        allow(subject).to receive(:stop)
+
+        expect(subject.logger).to receive(:error).with('Unexpected error',
+                                                       hash_including(:message=>'closed stream', :exception=>IOError)
+        ).and_call_original
+
+        tt = Thread.new do
+          sleep 2.0 # allow for retry (sleep) after handle_error
+          subject.do_stop
+        end
+
+        subject.run(queue)
+
+        tt.join
+      end
+
+    end
+
     context "when the batch size is greater than 1" do
       let(:batch_count) { 10 }
 
@@ -233,9 +298,6 @@ def process(conf, event_count)
     end
 
     it 'multiple close calls, calls to redis once' do
-      # subject.use_redis(redis)
-      # allow(redis).to receive(:blpop).and_return(['foo', 'l1'])
-      # expect(redis).to receive(:connected?).and_return(connected.last)
       allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
       # allow_any_instance_of( Redis::Client ).to receive(:disconnect)
       quit_calls.each do |call|
@@ -249,6 +311,9 @@ def process(conf, event_count)
   end
 
   context 'for the subscribe data_types' do
+
+    before { subject.register }
+
     def run_it_thread(inst)
       Thread.new(inst) do |subj|
         subj.run(queue)
@@ -289,6 +354,8 @@ def close_thread(inst, rt)
       let(:data_type) { 'channel' }
       let(:quit_calls) { [:unsubscribe, :connection] }
 
+      before { subject.register }
+
       context 'mocked redis' do
         it 'multiple stop calls, calls to redis once', type: :mocked do
           subject.do_stop
@@ -367,6 +434,10 @@ def close_thread(inst, rt)
 
     ["list", "channel", "pattern_channel"].each do |data_type|
       context data_type do
+        # TODO pending
+        # redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout
+        next unless data_type == 'list'
+
         it_behaves_like "an interruptible input plugin", :redis => true do
           let(:config) { { 'key' => 'foo', 'data_type' => data_type } }
         end

From 35c954f8b3e23585a3753d970c6a25298386270e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io>
Date: Fri, 7 May 2021 14:21:25 +0300
Subject: [PATCH 10/11] Update pattern list tests according to recent
 refactorings

---
 spec/inputs/redis_spec.rb | 42 ++++++++++++++++++++-------------------
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index de7bdf9..f354839 100755
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -114,8 +114,9 @@ def process(conf, event_count)
   let(:queue) { Queue.new }
 
   let(:data_type) { 'list' }
+  let(:redis_key) { 'foo' }
   let(:batch_count) { 1 }
-  let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} }
+  let(:config) { {'key' => redis_key, 'data_type' => data_type, 'batch_count' => batch_count} }
   let(:quit_calls) { [:quit] }
 
   subject do
@@ -354,57 +355,58 @@ def process(conf, event_count)
 
   context 'runtime for pattern_list data_type' do
     let(:data_type) { 'pattern_list' }
-    let(:key) { 'foo.*' }
+    let(:redis_key) { 'foo.*' }
+
     before do
       subject.register
+      allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true
+      allow_any_instance_of( Redis::Client ).to receive(:disconnect)
+      allow_any_instance_of( Redis ).to receive(:quit)
       subject.init_threadpool
     end
 
+    after do
+      subject.stop
+    end
+
     context 'close when redis is unset' do
       let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
 
       it 'does not attempt to quit' do
-        allow(redis).to receive(:nil?).and_return(true)
+        allow_any_instance_of( Redis::Client ).to receive(:nil?).and_return(true)
         quit_calls.each do |call|
-          expect(redis).not_to receive(call)
+          expect_any_instance_of( Redis::Client ).not_to receive(call)
         end
         expect {subject.do_stop}.not_to raise_error
       end
     end
 
     it 'calling the run method, adds events to the queue' do
-      expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
-      expect(redis).to receive(:lpop).at_least(:once).and_return('l1')
-
-      allow(redis).to receive(:connected?).and_return(connected.last)
-      allow(redis).to receive(:quit)
+      expect_any_instance_of( Redis ).to receive(:keys).at_least(:once).with(redis_key).and_return ['foo.bar']
+      expect_any_instance_of( Redis ).to receive(:lpop).at_least(:once).with('foo.bar').and_return 'l1'
 
       tt = Thread.new do
         end_by = Time.now + 3
-        while accumulator.size < 1 and Time.now <= end_by
-          sleep 0.1
-        end
+        sleep 0.1 until queue.size > 0 or Time.now > end_by
         subject.do_stop
       end
 
-      subject.run(accumulator)
+      subject.run(queue)
 
       tt.join
 
-      expect(accumulator.size).to be > 0
+      expect(queue.size).to be > 0
     end
 
     it 'multiple close calls, calls to redis once' do
-      subject.use_redis(redis)
-      allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
-      allow(redis).to receive(:lpop).and_return('l1')
-      expect(redis).to receive(:connected?).and_return(connected.last)
+      allow_any_instance_of( Redis ).to receive(:keys).with(redis_key).and_return(['foo.bar'])
+      allow_any_instance_of( Redis ).to receive(:lpop).with('foo.bar').and_return('l1')
+
       quit_calls.each do |call|
-        expect(redis).to receive(call).at_most(:once)
+        allow_any_instance_of( Redis ).to receive(call).at_most(:once)
       end
 
       subject.do_stop
-      connected.push(false) #can't use let block here so push to array
       expect {subject.do_stop}.not_to raise_error
       subject.do_stop
     end

From 40c52e9f3899c747e393a9ea4b5190d79588e916 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Juuso=20M=C3=A4yr=C3=A4nen?= <juuso@smartly.io>
Date: Fri, 7 May 2021 14:47:32 +0300
Subject: [PATCH 11/11] Fix integration tests

---
 spec/inputs/redis_spec.rb | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index f354839..78c3213 100755
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -16,11 +16,17 @@ def populate(key, event_count)
   end
 end
 
-def process(conf, event_count)
+def wait_events(conf, event_count)
   events = input(conf) do |_, queue|
     sleep 0.1 until queue.size >= event_count
     queue.size.times.map { queue.pop }
   end
+  expect(events.size).to eq event_count
+  events
+end
+
+def process(conf, event_count)
+  events = wait_events(conf, event_count)
   # due multiple workers we get events out-of-order in the output
   events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') }
   expect(events[0].get('sequence')).to eq(0)
@@ -73,7 +79,7 @@ def process(conf, event_count)
       input {
         redis {
           type => "blah"
-          key => "#{key}.*"
+          key => "#{key_base}.*"
           data_type => "pattern_list"
           batch_count => 1
         }
@@ -85,7 +91,7 @@ def process(conf, event_count)
       total_event_count += event_count
       populate("#{key_base}.#{idx}", event_count)
     end
-    process(conf, total_event_count)
+    wait_events(conf, total_event_count)
   end
 
   it "should read events from a list pattern using batch_count (default 125)" do
@@ -94,7 +100,7 @@ def process(conf, event_count)
       input {
         redis {
           type => "blah"
-          key => "#{key}.*"
+          key => "#{key_base}.*"
           data_type => "pattern_list"
           batch_count => 125
         }
@@ -106,7 +112,7 @@ def process(conf, event_count)
       total_event_count += event_count
       populate("#{key_base}.#{idx}", event_count)
     end
-    process(conf, total_event_count)
+    wait_events(conf, total_event_count)
   end
 end