diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index c26c7e9..35a1f31 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -2,8 +2,6 @@ name: Downstream on: workflow_dispatch: - branches: - - trunk push: branches: - trunk diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 10a7a7e..d5d0eee 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,14 +9,14 @@ jobs: - uses: actions/checkout@v1 - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.6.3 + ruby-version: 2.6.9 - name: Cache Gems - uses: actions/cache@v1 + uses: actions/cache@v4 with: path: vendor/bundle - key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }} + key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }} restore-keys: | - ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }} + ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }} - name: Install Gems run: | sudo gem install bundler -v '1.17.3' diff --git a/.ruby-version b/.ruby-version index 338a5b5..d48d370 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -2.6.6 +2.6.9 diff --git a/Gemfile.lock b/Gemfile.lock index 0d80874..cd3e940 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -55,7 +55,7 @@ GEM timers (~> 4.0.0) coderay (1.1.0) concurrent-ruby (1.1.6) - crass (1.0.4) + crass (1.0.6) diff-lcs (1.3) erubis (2.7.0) ffi (1.10.0) @@ -85,20 +85,21 @@ GEM celluloid (~> 0.16.0) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9) - loofah (2.2.2) + loofah (2.25.0) crass (~> 1.0.2) - nokogiri (>= 1.5.9) + nokogiri (>= 1.12.0) lumberjack (1.0.9) mail (2.6.3) mime-types (>= 1.16, < 3) method_source (0.8.2) mime-types (2.99.3) - mini_portile2 (2.4.0) + mini_portile2 (2.8.9) minitest (5.14.0) multi_json (1.13.1) nenv (0.2.0) - nokogiri (1.9.1) - mini_portile2 (~> 2.4.0) + nokogiri (1.13.10) + mini_portile2 (~> 2.8.0) + racc (~> 1.4) notiffany (0.0.6) nenv (~> 0.1) shellany (~> 0.0) @@ -111,6 +112,7 @@ GEM pry-remote (0.1.8) pry (~> 0.9) slop (~> 3.0) + racc (1.8.1) rack (1.6.9) rack-test (0.6.3) rack (>= 1.0) diff --git a/lib/acapi.rb b/lib/acapi.rb index 91b5089..4c039f4 100644 --- a/lib/acapi.rb +++ b/lib/acapi.rb @@ -2,6 +2,8 @@ require "active_support" require "acapi/config" +require "acapi/errors" + require "acapi/notifiers" require "acapi/publisher" require "acapi/subscriber" diff --git a/lib/acapi/amqp/client.rb b/lib/acapi/amqp/client.rb index 2adade5..51613c9 100644 --- a/lib/acapi/amqp/client.rb +++ b/lib/acapi/amqp/client.rb @@ -12,6 +12,9 @@ def initialize(chan, q) @argument_errors = [] @bad_argument_queue = "acapi.error.middleware.service.bad_arguments" @processing_failed_queue = "acapi.error.middleware.service.processing_failed" + @republish_channel = @channel.connection.create_channel + @republish_channel.confirm_select + @republish_queue = @republish_channel.queue(@queue.name, @queue.options) @exit_after_work = false end @@ -102,7 +105,8 @@ def subscribe(opts = {}) publish_processing_failed(delivery_info, properties, payload, e) else new_properties = redelivery_properties(existing_retry_count, delivery_info, properties) - queue.publish(payload, new_properties) + @republish_queue.publish(payload, new_properties) + @republish_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message republication could not be confirmed") channel.acknowledge(delivery_info.delivery_tag, false) end rescue => e diff --git a/lib/acapi/amqp/messaging_exchange_topology.rb b/lib/acapi/amqp/messaging_exchange_topology.rb index ae6a572..e74d899 100644 --- a/lib/acapi/amqp/messaging_exchange_topology.rb +++ b/lib/acapi/amqp/messaging_exchange_topology.rb @@ -4,14 +4,14 @@ module Acapi module Amqp class MessagingExchangeTopology - def self.ensure_topology_exists(connection_string) - topology = new(connection_string) + def self.ensure_topology_exists(connection_settings) + topology = new(connection_settings) topology.setup topology.close end - def initialize(connection_string) - @connection = Bunny.new(connection_string, :heartbeat => 15) + def initialize(connection_settings) + @connection = Bunny.new(connection_settings) @connection.start @channel = @connection.create_channel end diff --git a/lib/acapi/amqp/requestor.rb b/lib/acapi/amqp/requestor.rb index 8c6173c..69704f2 100644 --- a/lib/acapi/amqp/requestor.rb +++ b/lib/acapi/amqp/requestor.rb @@ -11,10 +11,13 @@ def initialize(conn) def request(properties, payload, timeout = 15) delivery_info, r_props, r_payload = [nil, nil, nil] channel = @connection.create_channel + p_channel = @connection.create_channel temp_queue = channel.queue("", :exclusive => true) channel.prefetch(1) - request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true) + p_channel.confirm_select + request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true) request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true })) + p_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed") delivery_info, r_props, r_payload = [nil, nil, nil] begin Timeout::timeout(timeout) do @@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15) end ensure temp_queue.delete + p_channel.close channel.close end [delivery_info, r_props, r_payload] diff --git a/lib/acapi/amqp/responder.rb b/lib/acapi/amqp/responder.rb index dae0ac4..9a78a81 100644 --- a/lib/acapi/amqp/responder.rb +++ b/lib/acapi/amqp/responder.rb @@ -3,8 +3,10 @@ module Amqp module Responder def with_response_exchange(connection) channel = connection.create_channel + channel.confirm_select publish_exchange = channel.default_exchange yield publish_exchange + channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed") channel.close end end diff --git a/lib/acapi/amqp_event_worker.rb b/lib/acapi/amqp_event_worker.rb index 91c0d86..e61d9ce 100644 --- a/lib/acapi/amqp_event_worker.rb +++ b/lib/acapi/amqp_event_worker.rb @@ -68,11 +68,11 @@ def self.run pid_file_location = File.join(File.expand_path(Rails.root), "pids", "sneakers.pid") worker_classes = Rails.application.config.acapi.sneakers_worker_classes ensure_messaging_exchanges + connection = Bunny.new(Rails.application.config.acapi.to_connection_settings) Sneakers.configure( :workers => worker_classes.length, - :amqp => Rails.application.config.acapi.remote_broker_uri, + :connection => connection, :start_worker_delay => 0.2, - :heartbeat => 5, :log => STDOUT, :pid_path => pid_file_location, :handler => Sneakers::Handlers::Maxretry, @@ -88,7 +88,7 @@ def self.run end def self.ensure_messaging_exchanges - ::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.remote_broker_uri) + ::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.to_connection_settings) end end end diff --git a/lib/acapi/errors.rb b/lib/acapi/errors.rb new file mode 100644 index 0000000..faddaca --- /dev/null +++ b/lib/acapi/errors.rb @@ -0,0 +1,7 @@ +require "acapi/errors/publish_confirmation_failed_error" +require "acapi/errors/remote_connection_unspecified_error" + +module Acapi + module Errors + end +end \ No newline at end of file diff --git a/lib/acapi/errors/acapi_error.rb b/lib/acapi/errors/acapi_error.rb deleted file mode 100644 index f04a793..0000000 --- a/lib/acapi/errors/acapi_error.rb +++ /dev/null @@ -1,7 +0,0 @@ -module Acapi - module Errors - - class AcapiError < StandardError - end - end -end \ No newline at end of file diff --git a/lib/acapi/errors/pub_sub_error.rb b/lib/acapi/errors/pub_sub_error.rb deleted file mode 100644 index a8ddf59..0000000 --- a/lib/acapi/errors/pub_sub_error.rb +++ /dev/null @@ -1,7 +0,0 @@ -module Acapi - module Errors - - class PubSubError < AcapiError - end - end -end \ No newline at end of file diff --git a/lib/acapi/errors/publish_confirmation_failed_error.rb b/lib/acapi/errors/publish_confirmation_failed_error.rb new file mode 100644 index 0000000..489dcc9 --- /dev/null +++ b/lib/acapi/errors/publish_confirmation_failed_error.rb @@ -0,0 +1,5 @@ +module Acapi + module Errors + class PublishConfirmationFailedError < StandardError; end + end +end \ No newline at end of file diff --git a/lib/acapi/errors/remote_connection_unspecified_error.rb b/lib/acapi/errors/remote_connection_unspecified_error.rb new file mode 100644 index 0000000..f8a2db3 --- /dev/null +++ b/lib/acapi/errors/remote_connection_unspecified_error.rb @@ -0,0 +1,5 @@ +module Acapi + module Errors + class RemoteConnectionUnspecifiedError < StandardError; end + end +end \ No newline at end of file diff --git a/lib/acapi/local_amqp_publisher.rb b/lib/acapi/local_amqp_publisher.rb index 3fc38fb..be35e63 100644 --- a/lib/acapi/local_amqp_publisher.rb +++ b/lib/acapi/local_amqp_publisher.rb @@ -61,18 +61,25 @@ def log(name, started, finished, unique_id, data = {}) end msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data) @exchange.publish(*msg.to_message_properties) + @p_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed") end def open_connection_if_needed return if @connection.present? && @connection.connected? - @connection = Bunny.new + @connection = Bunny.new(connection_url) @connection.start @channel = @connection.create_channel @queue = @channel.queue(QUEUE_NAME, {:durable => true}) - @exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true}) + @p_channel = @connection.create_channel + @p_channel.confirm_select + @exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true}) @queue.bind(@exchange, {}) end + def connection_url + Rails.application.config.acapi.to_connection_settings + end + def reconnect! disconnect! end @@ -83,6 +90,10 @@ def disconnect! @connection.close rescue Timeout::Error end + @queue = nil + @channel = nil + @p_channel = nil + @exchange = nil @connection = nil end end diff --git a/lib/acapi/publishers/upstream_event_publisher.rb b/lib/acapi/publishers/upstream_event_publisher.rb index 2fd5e97..d240a75 100644 --- a/lib/acapi/publishers/upstream_event_publisher.rb +++ b/lib/acapi/publishers/upstream_event_publisher.rb @@ -18,10 +18,9 @@ def run if @after_fork @after_fork.call end - bunny_url = Rails.application.config.acapi.remote_broker_uri event_q_name = Rails.application.config.acapi.remote_event_queue app_id = Rails.application.config.acapi.app_id - conn = Bunny.new(bunny_url, :heartbeat => 15) + conn = Bunny.new(Rails.application.config.acapi.to_connection_settings) conn.start chan = conn.create_channel chan.prefetch(1) diff --git a/lib/acapi/railties/amqp_configuration_options.rb b/lib/acapi/railties/amqp_configuration_options.rb index 2de38d2..0a03208 100644 --- a/lib/acapi/railties/amqp_configuration_options.rb +++ b/lib/acapi/railties/amqp_configuration_options.rb @@ -1,10 +1,71 @@ +require 'uri' + module Acapi + class ClusterSettings + attr_accessor :hosts + attr_accessor :port + attr_accessor :username + attr_accessor :password + + def to_connection_settings + { + :hosts => @hosts, + :port => @port || 5672, + :username => @username || "guest", + :password => @password || "guest", + :heartbeat => 10 + } + end + end + class ConfigurationSettings attr_accessor :remote_broker_uri attr_accessor :remote_event_queue attr_accessor :remote_request_exchange attr_accessor :hbx_id attr_accessor :environment_name + + def clear! + @remote_broker_uri = nil + @remote_event_queue = nil + @remote_request_exchange = nil + @hbx_id = nil + @environment_name = nil + @cluster = nil + end + + def empty_connection_settings? + remote_broker_uri.blank? && @cluster.blank? + end + + def cluster + @cluster ||= Acapi::ClusterSettings.new + yield @cluster if block_given? + @cluster + end + + def to_connection_settings + raise ::Acapi::Errors::RemoteConnectionUnspecifiedError.new("No remote broker connection specified") if empty_connection_settings? + @connection_settings_hash ||= encode_connection_settings + end + + def encode_connection_settings + if @cluster.blank? + uri = URI.parse(remote_broker_uri) + port_value = uri.port.blank? ? 5672 : uri.port + user_value = uri.user.blank? ? "guest" : uri.user + password_value = uri.password.blank? ? "guest" : uri.password + { + :host => uri.host, + :port => port_value, + :username => user_value, + :password => password_value, + :heartbeat => 10 + } + else + cluster.to_connection_settings + end + end end end @@ -28,12 +89,12 @@ module Railties class AmqpConfigurationSettings < Rails::Railtie config.after_initialize do |app| app_id = Rails.application.config.acapi.app_id - setting = Rails.application.config.acapi.remote_broker_uri + setting = Rails.application.config.acapi r_exchange = Rails.application.config.acapi.remote_request_exchange - if !setting + if Rails.application.config.acapi.empty_connection_settings? disable_requestor else - boot_requestor(app_id, setting, r_exchange) + boot_requestor(app_id, setting.to_connection_settings, r_exchange) end end diff --git a/lib/acapi/requestor.rb b/lib/acapi/requestor.rb index 3df0216..2a9601f 100644 --- a/lib/acapi/requestor.rb +++ b/lib/acapi/requestor.rb @@ -32,15 +32,14 @@ def request(req_name, payload,timeout=1) end def open_connection_for_request - if !@connection - @connection = Bunny.new(@uri, :heartbeat => 15) - @connection.start - end + return if @connection.present? && @connection.connected? + @connection = Bunny.new(@uri) + @connection.start end def reconnect! disconnect! - @connection = Bunny.new(@uri, :heartbeat => 15) + @connection = Bunny.new(@uri) @connection.start end diff --git a/lib/acapi/sneakers_extensions.rb b/lib/acapi/sneakers_extensions.rb index 039d412..b61e9b7 100644 --- a/lib/acapi/sneakers_extensions.rb +++ b/lib/acapi/sneakers_extensions.rb @@ -17,7 +17,7 @@ def with_confirmed_channel begin chan.confirm_select yield chan - chan.wait_for_confirms + chan.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed") ensure chan.close end diff --git a/spec/lib/acapi/local_amqp_publisher_spec.rb b/spec/lib/acapi/local_amqp_publisher_spec.rb index 0502b70..8859bf9 100644 --- a/spec/lib/acapi/local_amqp_publisher_spec.rb +++ b/spec/lib/acapi/local_amqp_publisher_spec.rb @@ -4,6 +4,13 @@ let(:forwarding_queue_name) { "acapi.queue.events.local" } let(:forwarding_exchange_name ) { "acapi.exchange.events.local" } + before :each do + Rails.application.config.acapi.remote_broker_uri = "amqp://localhost:5672" + end + + after :each do + Rails.application.config.acapi.clear! + end describe "which publishes messages" do let(:session) { instance_double("Bunny::Session") } @@ -28,11 +35,19 @@ } } before :each do - allow(Bunny).to receive(:new).and_return(session) + allow(Bunny).to receive(:new).with({ + :heartbeat => 10, + :host=>"localhost", + :password=>"guest", + :port=>5672, + :username=>"guest" + }).and_return(session) allow(session).to receive(:start) allow(session).to receive(:create_channel).and_return(channel) allow(channel).to receive(:queue).with(forwarding_queue_name, {:durable => true}).and_return(queue) allow(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange) + allow(channel).to receive(:confirm_select) + allow(channel).to receive(:wait_for_confirms).and_return(true) allow(queue).to receive(:bind).with(exchange, {}) end @@ -123,10 +138,18 @@ it "supports reconnection for after_fork" do #publish to force the connection allow(exchange).to receive(:publish) + allow(channel).to receive(:confirm_select) + allow(channel).to receive(:wait_for_confirms).and_return(true) expect(session).to receive(:close) - expect(Bunny).to receive(:new).and_return(session) + expect(Bunny).to receive(:new).with({ + :heartbeat => 10, + :host=>"localhost", + :password=>"guest", + :port=>5672, + :username=>"guest" + }).and_return(session) expect(session).to receive(:start) - expect(session).to receive(:create_channel).and_return(channel) + allow(session).to receive(:create_channel).and_return(channel) expect(channel).to receive(:queue).with(forwarding_queue_name, {:durable=> true}).and_return(queue) expect(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange) expect(queue).to receive(:bind).with(exchange, {}) diff --git a/spec/lib/acapi/subscribers/logger_spec.rb b/spec/lib/acapi/subscribers/logger_spec.rb index d5627fe..761273d 100644 --- a/spec/lib/acapi/subscribers/logger_spec.rb +++ b/spec/lib/acapi/subscribers/logger_spec.rb @@ -20,7 +20,7 @@ allow(Acapi::LocalAmqpPublisher).to receive(:log).and_raise("error") Acapi::Subscribers::Logger.register("acapi.logger") - expect{logger("hello")}.to raise_error + expect{logger("hello")}.to raise_error(StandardError) end end end