Skip to content
31 changes: 24 additions & 7 deletions lib/ruote/postgres/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
require 'ruote/storage/base'
require 'ruote/postgres/version'


module Ruote
module Postgres
CONNECTION_ERRORS = [
Expand Down Expand Up @@ -112,8 +111,9 @@ def initialize(pg, options={})
@mutex = Mutex.new
@pg = pg

@table = options.fetch('pg_table_name', :documents).to_sym
@abort_on_connection_error = options.fetch('abort_on_connection_error', true)
@table = options.fetch('pg_table_name', :documents).to_sym
@abort_on_connection_error = options.fetch('abort_on_connection_error', true)
@retries_on_connection_error = options.fetch('retries_on_connection_error', 0).to_i

replace_engine_configuration(options)
end
Expand Down Expand Up @@ -297,15 +297,32 @@ def has_json?
server_version[0] >= 9 && server_version[1] >= 2
end

def reconnect
$stderr.puts "[RP] #{Time.now} RECONNECT"
@pg.reset
end

def safe_pg(&block)
retries ||= 0
@mutex.synchronize do
yield
end
rescue *CONNECTION_ERRORS => e
if @abort_on_connection_error
abort "ruote-postgres fatal error: #{e.class.name} #{e.message}\n#{e.backtrace.join("\n")}"
else
raise e
$stderr.puts "[RP] #{Time.now} CONNECTION_ERROR => Retries #{retries} out of #{@retries_on_connection_error}"
$stderr.puts e
result = if retries < @retries_on_connection_error
retries += 1
sleep 0.5
reconnect && retry
end

$stderr.puts result
unless result
if @abort_on_connection_error
abort "ruote-postgres fatal error: #{e.class.name} #{e.message}\n#{e.backtrace.join("\n")}"
else
raise e
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions spec/support/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ def db_connect()
PG.connect(config)
end

def db_unable_to_send_connect()
PGconn.connect_start( '127.0.0.1', 54320, "", "", "me", "xxxx", "somedb" )
end

def columns(pg, table)
pg.exec(%{SELECT attname FROM pg_attribute, pg_type
WHERE typname = '#{table}'
Expand Down
36 changes: 36 additions & 0 deletions spec/unit/storage_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@
end
end

describe "connection errors" do
subject do
Ruote::Postgres::Storage.new(pg, storage_options.merge({'abort_on_connection_error' => false})).tap do |rps|
rps.instance_variable_set :@pg, db_unable_to_send_connect
end
end

context "triggers UnableToSend" do
let(:storage_options) { { 'retries_on_connection_error' => 0 } }

it "raises the PG error" do
expect { subject.get('msgs', '1') }.to raise_error(PG::UnableToSend)
end
end

context "retries on time on UnableToSend" do
let(:storage_options) { { 'retries_on_connection_error' => 1 } }

it "rescues the PG error" do
expect(subject).to receive(:db_connect).and_return(db_connect)

expect { subject.get('msgs', '1') }.not_to raise_error
end
end

context "retries many times on UnableToSend" do
let(:storage_options) { { 'retries_on_connection_error' => 10 } }

it "raises the PG error" do
expect(subject).to receive(:reconnect).exactly(10).times.and_return(db_unable_to_send_connect)

expect { subject.get('msgs', '1') }.to raise_error(PG::UnableToSend)
end
end
end

describe "interface" do
subject { Ruote::Postgres::Storage.new(pg) }

Expand Down