Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: Downstream

on:
workflow_dispatch:
branches:
- trunk
push:
branches:
- trunk
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ jobs:
- uses: actions/checkout@v1
- uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.3
ruby-version: 2.6.9
- name: Cache Gems
uses: actions/cache@v1
uses: actions/cache@v4
with:
path: vendor/bundle
key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
restore-keys: |
${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
- name: Install Gems
run: |
sudo gem install bundler -v '1.17.3'
Expand Down
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.6
2.6.9
14 changes: 8 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ GEM
timers (~> 4.0.0)
coderay (1.1.0)
concurrent-ruby (1.1.6)
crass (1.0.4)
crass (1.0.6)
diff-lcs (1.3)
erubis (2.7.0)
ffi (1.10.0)
Expand Down Expand Up @@ -85,20 +85,21 @@ GEM
celluloid (~> 0.16.0)
rb-fsevent (>= 0.9.3)
rb-inotify (>= 0.9)
loofah (2.2.2)
loofah (2.25.0)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
nokogiri (>= 1.12.0)
lumberjack (1.0.9)
mail (2.6.3)
mime-types (>= 1.16, < 3)
method_source (0.8.2)
mime-types (2.99.3)
mini_portile2 (2.4.0)
mini_portile2 (2.8.9)
minitest (5.14.0)
multi_json (1.13.1)
nenv (0.2.0)
nokogiri (1.9.1)
mini_portile2 (~> 2.4.0)
nokogiri (1.13.10)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
notiffany (0.0.6)
nenv (~> 0.1)
shellany (~> 0.0)
Expand All @@ -111,6 +112,7 @@ GEM
pry-remote (0.1.8)
pry (~> 0.9)
slop (~> 3.0)
racc (1.8.1)
rack (1.6.9)
rack-test (0.6.3)
rack (>= 1.0)
Expand Down
2 changes: 2 additions & 0 deletions lib/acapi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
require "active_support"

require "acapi/config"
require "acapi/errors"

require "acapi/notifiers"
require "acapi/publisher"
require "acapi/subscriber"
Expand Down
6 changes: 5 additions & 1 deletion lib/acapi/amqp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def initialize(chan, q)
@argument_errors = []
@bad_argument_queue = "acapi.error.middleware.service.bad_arguments"
@processing_failed_queue = "acapi.error.middleware.service.processing_failed"
@republish_channel = @channel.connection.create_channel
@republish_channel.confirm_select
@republish_queue = @republish_channel.queue(@queue.name, @queue.options)
@exit_after_work = false
end

Expand Down Expand Up @@ -102,7 +105,8 @@ def subscribe(opts = {})
publish_processing_failed(delivery_info, properties, payload, e)
else
new_properties = redelivery_properties(existing_retry_count, delivery_info, properties)
queue.publish(payload, new_properties)
@republish_queue.publish(payload, new_properties)
@republish_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message republication could not be confirmed")
channel.acknowledge(delivery_info.delivery_tag, false)
end
rescue => e
Expand Down
8 changes: 4 additions & 4 deletions lib/acapi/amqp/messaging_exchange_topology.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ module Acapi
module Amqp
class MessagingExchangeTopology

def self.ensure_topology_exists(connection_string)
topology = new(connection_string)
def self.ensure_topology_exists(connection_settings)
topology = new(connection_settings)
topology.setup
topology.close
end

def initialize(connection_string)
@connection = Bunny.new(connection_string, :heartbeat => 15)
def initialize(connection_settings)
@connection = Bunny.new(connection_settings)
@connection.start
@channel = @connection.create_channel
end
Expand Down
6 changes: 5 additions & 1 deletion lib/acapi/amqp/requestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ def initialize(conn)
def request(properties, payload, timeout = 15)
delivery_info, r_props, r_payload = [nil, nil, nil]
channel = @connection.create_channel
p_channel = @connection.create_channel
temp_queue = channel.queue("", :exclusive => true)
channel.prefetch(1)
request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
p_channel.confirm_select
request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true }))
p_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed")
delivery_info, r_props, r_payload = [nil, nil, nil]
begin
Timeout::timeout(timeout) do
Expand All @@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15)
end
ensure
temp_queue.delete
p_channel.close
channel.close
end
[delivery_info, r_props, r_payload]
Expand Down
2 changes: 2 additions & 0 deletions lib/acapi/amqp/responder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ module Amqp
module Responder
def with_response_exchange(connection)
channel = connection.create_channel
channel.confirm_select
publish_exchange = channel.default_exchange
yield publish_exchange
channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed")
channel.close
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/acapi/amqp_event_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ def self.run
pid_file_location = File.join(File.expand_path(Rails.root), "pids", "sneakers.pid")
worker_classes = Rails.application.config.acapi.sneakers_worker_classes
ensure_messaging_exchanges
connection = Bunny.new(Rails.application.config.acapi.to_connection_settings)
Sneakers.configure(
:workers => worker_classes.length,
:amqp => Rails.application.config.acapi.remote_broker_uri,
:connection => connection,
:start_worker_delay => 0.2,
:heartbeat => 5,
:log => STDOUT,
:pid_path => pid_file_location,
:handler => Sneakers::Handlers::Maxretry,
Expand All @@ -88,7 +88,7 @@ def self.run
end

def self.ensure_messaging_exchanges
::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.remote_broker_uri)
::Acapi::Amqp::MessagingExchangeTopology.ensure_topology_exists(Rails.application.config.acapi.to_connection_settings)
end
end
end
7 changes: 7 additions & 0 deletions lib/acapi/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require "acapi/errors/publish_confirmation_failed_error"
require "acapi/errors/remote_connection_unspecified_error"

module Acapi
module Errors
end
end
7 changes: 0 additions & 7 deletions lib/acapi/errors/acapi_error.rb

This file was deleted.

7 changes: 0 additions & 7 deletions lib/acapi/errors/pub_sub_error.rb

This file was deleted.

5 changes: 5 additions & 0 deletions lib/acapi/errors/publish_confirmation_failed_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module Acapi
module Errors
class PublishConfirmationFailedError < StandardError; end
end
end
5 changes: 5 additions & 0 deletions lib/acapi/errors/remote_connection_unspecified_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module Acapi
module Errors
class RemoteConnectionUnspecifiedError < StandardError; end
end
end
15 changes: 13 additions & 2 deletions lib/acapi/local_amqp_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,25 @@ def log(name, started, finished, unique_id, data = {})
end
msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data)
@exchange.publish(*msg.to_message_properties)
@p_channel.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed")
end

def open_connection_if_needed
return if @connection.present? && @connection.connected?
@connection = Bunny.new
@connection = Bunny.new(connection_url)
@connection.start
@channel = @connection.create_channel
@queue = @channel.queue(QUEUE_NAME, {:durable => true})
@exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true})
@p_channel = @connection.create_channel
@p_channel.confirm_select
@exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true})
@queue.bind(@exchange, {})
end
Comment on lines 67 to 77
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disconnect method doesn't clean up the new @p_channel instance variable introduced in this PR. When disconnect is called, @p_channel should also be set to nil to prevent potential issues on reconnection. Additionally, @channel and @eXchange should also be cleared to maintain consistency.

Copilot uses AI. Check for mistakes.

def connection_url
Rails.application.config.acapi.to_connection_settings
end

def reconnect!
disconnect!
end
Expand All @@ -83,6 +90,10 @@ def disconnect!
@connection.close
rescue Timeout::Error
end
@queue = nil
@channel = nil
@p_channel = nil
@exchange = nil
@connection = nil
end
end
Expand Down
3 changes: 1 addition & 2 deletions lib/acapi/publishers/upstream_event_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ def run
if @after_fork
@after_fork.call
end
bunny_url = Rails.application.config.acapi.remote_broker_uri
event_q_name = Rails.application.config.acapi.remote_event_queue
app_id = Rails.application.config.acapi.app_id
conn = Bunny.new(bunny_url, :heartbeat => 15)
conn = Bunny.new(Rails.application.config.acapi.to_connection_settings)
conn.start
chan = conn.create_channel
chan.prefetch(1)
Expand Down
67 changes: 64 additions & 3 deletions lib/acapi/railties/amqp_configuration_options.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,71 @@
require 'uri'

module Acapi
class ClusterSettings
attr_accessor :hosts
attr_accessor :port
attr_accessor :username
attr_accessor :password

def to_connection_settings
{
:hosts => @hosts,
:port => @port || 5672,
:username => @username || "guest",
:password => @password || "guest",
:heartbeat => 10
}
end
end

class ConfigurationSettings
attr_accessor :remote_broker_uri
attr_accessor :remote_event_queue
attr_accessor :remote_request_exchange
attr_accessor :hbx_id
attr_accessor :environment_name

def clear!
@remote_broker_uri = nil
@remote_event_queue = nil
@remote_request_exchange = nil
@hbx_id = nil
@environment_name = nil
@cluster = nil
end

def empty_connection_settings?
remote_broker_uri.blank? && @cluster.blank?
end

def cluster
@cluster ||= Acapi::ClusterSettings.new
yield @cluster if block_given?
@cluster
end

def to_connection_settings
raise ::Acapi::Errors::RemoteConnectionUnspecifiedError.new("No remote broker connection specified") if empty_connection_settings?
@connection_settings_hash ||= encode_connection_settings
end

def encode_connection_settings
if @cluster.blank?
uri = URI.parse(remote_broker_uri)
port_value = uri.port.blank? ? 5672 : uri.port
user_value = uri.user.blank? ? "guest" : uri.user
password_value = uri.password.blank? ? "guest" : uri.password
{
:host => uri.host,
:port => port_value,
:username => user_value,
:password => password_value,
:heartbeat => 10
}
else
cluster.to_connection_settings
end
end
end
end

Expand All @@ -28,12 +89,12 @@ module Railties
class AmqpConfigurationSettings < Rails::Railtie
config.after_initialize do |app|
app_id = Rails.application.config.acapi.app_id
setting = Rails.application.config.acapi.remote_broker_uri
setting = Rails.application.config.acapi
r_exchange = Rails.application.config.acapi.remote_request_exchange
if !setting
if Rails.application.config.acapi.empty_connection_settings?
disable_requestor
else
boot_requestor(app_id, setting, r_exchange)
boot_requestor(app_id, setting.to_connection_settings, r_exchange)
end
end

Expand Down
9 changes: 4 additions & 5 deletions lib/acapi/requestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ def request(req_name, payload,timeout=1)
end

def open_connection_for_request
if !@connection
@connection = Bunny.new(@uri, :heartbeat => 15)
@connection.start
end
return if @connection.present? && @connection.connected?
@connection = Bunny.new(@uri)
@connection.start
Comment on lines +36 to +37
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name uri is misleading. After the changes in lib/acapi/railties/amqp_configuration_options.rb, the boot_requestor method now passes setting.to_connection_settings (a hash) instead of a URI string. The parameter should be renamed to something like connection_settings or connection_params to better reflect that it now receives a hash rather than a URI string.

Copilot uses AI. Check for mistakes.
end

def reconnect!
disconnect!
@connection = Bunny.new(@uri, :heartbeat => 15)
@connection = Bunny.new(@uri)
@connection.start
end

Expand Down
2 changes: 1 addition & 1 deletion lib/acapi/sneakers_extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def with_confirmed_channel
begin
chan.confirm_select
yield chan
chan.wait_for_confirms
chan.wait_for_confirms || raise(Acapi::Errors::PublishConfirmationFailedError, "message publication could not be confirmed")
ensure
chan.close
end
Expand Down
Loading
Loading