Skip to content

Commit

Permalink
Merge pull request #83 from salemove/make_exchange_name_configurable_…
Browse files Browse the repository at this point in the history
…for_tap_into

Make exchange name configurable for tap into consumer
  • Loading branch information
meticulesque committed Jun 16, 2023
2 parents b999434 + 79f7711 commit a4c199f
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 4 deletions.
12 changes: 10 additions & 2 deletions lib/freddy.rb
Expand Up @@ -117,8 +117,16 @@ def respond_to(destination, &callback)
# `:ack` simply acknowledges the message and re-raises the exception. `:reject`
# rejects the message without requeueing it. `:requeue` rejects the message with
# `requeue` flag.
#
# @yield [message] Yields received message to the block
# @option options [String] :exchange_name
# Exchange to bind to. Default is `freddy-topic`.
#
# @yield [message] Yields received message to the block.
# @yieldparam [Object] payload
# Yields the received message's payload.
# @yieldparam [String] routing_key
# Yields the received message's routing key.
# @yieldparam [Time] timestamp
# Yields received message's timestamp.
#
# @return [#shutdown]
#
Expand Down
8 changes: 6 additions & 2 deletions lib/freddy/consumers/tap_into_consumer.rb
Expand Up @@ -29,7 +29,7 @@ def consume(&block)
private

def create_queue
topic_exchange = @channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME)
topic_exchange = @channel.topic(exchange_name)

queue =
if group
Expand All @@ -48,7 +48,7 @@ def create_queue
def process_message(_queue, delivery)
@consume_thread_pool.post do
delivery.in_span do
yield delivery.payload, delivery.routing_key
yield delivery.payload, delivery.routing_key, delivery.timestamp
@channel.acknowledge(delivery.tag)
end
rescue StandardError
Expand Down Expand Up @@ -76,6 +76,10 @@ def durable?
def on_exception
@options.fetch(:on_exception, :ack)
end

def exchange_name
@options.fetch(:exchange_name, Freddy::FREDDY_TOPIC_EXCHANGE_NAME)
end
end
end
end
4 changes: 4 additions & 0 deletions lib/freddy/delivery.rb
Expand Up @@ -24,6 +24,10 @@ def reply_to
@metadata.reply_to
end

def timestamp
@metadata[:timestamp]
end

def in_span(&block)
name = "#{Tracing.span_destination(@exchange, @routing_key)} process"
kind = OpenTelemetry::Trace::SpanKind::CONSUMER
Expand Down
44 changes: 44 additions & 0 deletions spec/integration/tap_into_with_exchange_spec.rb
@@ -0,0 +1,44 @@
require 'spec_helper'

describe 'Tapping into with exchange identifier' do
let(:freddy) { Freddy.build(logger, **config) }

let(:connection) { Freddy::Adapters.determine.connect(config) }
let(:topic) { 'test_topic_exchange' }
let(:channel) { connection.create_channel }
let(:message_payload) { { test: 'test' }.to_json }
let(:expected_payload) { { test: 'test' } }
let(:publish_timestamp) { Time.now.to_i }

after do
connection.close
freddy.close
end

it 'receives message' do
freddy.tap_into('pattern.*', exchange_name: topic) do |payload, _routing_key, timestamp|
@received_payload = payload
@received_timestamp = timestamp
end

channel.topic(topic).publish(message_payload, { routing_key: 'pattern.random', timestamp: publish_timestamp })

wait_for { @received_payload }
wait_for { @received_timestamp }

expect(@received_payload).to eq(expected_payload)
expect(@received_timestamp.to_i).to eq(publish_timestamp)
end

it 'receives message with nil timestamp when timestamp is not published' do
received_timestamp = 0
freddy.tap_into('pattern.*', exchange_name: topic) do |_payload, _routing_key, timestamp|
received_timestamp = timestamp
end

channel.topic(topic).publish(message_payload, { routing_key: 'pattern.random' })
default_sleep

expect(received_timestamp).to eq(nil)
end
end

0 comments on commit a4c199f

Please sign in to comment.