Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DLQ (RedrivePolicy) Functionality #37

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions lib/fake_sqs/actions/receive_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,30 @@ def initialize(options = {})

def call(name, params)
queue = @queues.get(name)
messages = queue.receive_message(params)
filtered_attribute_names = []
params.select{|k,v | k =~ /AttributeName\.\d+/}.each do |key, value|
filtered_attribute_names << value
end
messages = queue.receive_message(params.merge(queues: @queues))
@responder.call :ReceiveMessage do |xml|
messages.each do |receipt, message|
xml.Message do
xml.MessageId message.id
xml.ReceiptHandle receipt
xml.MD5OfBody message.md5
xml.Body message.body
message.attributes.each do |name, value|
if filtered_attribute_names.include?("All") || filtered_attribute_names.include?(name)
xml.Attribute do
xml.Name name
xml.Value value
end
end
end
end
end
end
end

end
end
end
end
18 changes: 14 additions & 4 deletions lib/fake_sqs/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@
module FakeSQS
class Message

attr_reader :body, :id, :md5
attr_reader :body, :id, :md5, :approximate_receive_count,
:sender_id, :approximate_first_receive_timestamp, :sent_timestamp
attr_accessor :visibility_timeout

def initialize(options = {})
@body = options.fetch("MessageBody")
@id = options.fetch("Id") { SecureRandom.uuid }
@md5 = options.fetch("MD5") { Digest::MD5.hexdigest(@body) }
@sender_id = options.fetch("SenderId") { SecureRandom.uuid.delete('-').upcase[0...21] }
@approximate_receive_count = 0
@sent_timestamp = Time.now.to_i * 1000
end

def expire!
self.visibility_timeout = nil
end

def receive!
@approximate_first_receive_timestamp ||= Time.now.to_i * 1000
@approximate_receive_count += 1
end

def expired?( limit = Time.now )
self.visibility_timeout.nil? || self.visibility_timeout < limit
end
Expand All @@ -26,9 +35,10 @@ def expire_at(seconds)

def attributes
{
"MessageBody" => body,
"Id" => id,
"MD5" => md5,
"SenderId" => sender_id,
"ApproximateFirstReceiveTimestamp" => approximate_first_receive_timestamp,
"ApproximateReceiveCount"=> approximate_receive_count,
"SentTimestamp"=> sent_timestamp
}
end

Expand Down
26 changes: 20 additions & 6 deletions lib/fake_sqs/queue.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'securerandom'
require 'fake_sqs/collection_view'

require 'json'
module FakeSQS

MessageNotInflight = Class.new(RuntimeError)
Expand Down Expand Up @@ -45,7 +45,7 @@ def attributes

def send_message(options = {})
with_lock do
message = message_factory.new(options)
message = options.fetch(:message){ message_factory.new(options) }
@messages << message
message
end
Expand All @@ -65,10 +65,13 @@ def receive_message(options = {})

actual_amount.times do
message = @messages.delete_at(rand(size))
message.expire_at(default_visibility_timeout)
receipt = generate_receipt
@messages_in_flight[receipt] = message
result[receipt] = message
unless check_message_for_dlq(message, options)
message.expire_at(default_visibility_timeout)
message.receive!
receipt = generate_receipt
@messages_in_flight[receipt] = message
result[receipt] = message
end
end
end

Expand Down Expand Up @@ -115,6 +118,17 @@ def change_message_visibility(receipt, visibility)
end
end

def check_message_for_dlq(message, options={})
if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"])
dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]}
if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i
dlq.send_message(message: message)
message.expire!
true
end
end
end

def delete_message(receipt)
with_lock do
@messages_in_flight.delete(receipt)
Expand Down
88 changes: 85 additions & 3 deletions spec/acceptance/message_actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,55 @@
)

response = sqs.receive_message(
queue_url: queue_url,
queue_url: queue_url
)

expect(response.messages.size).to eq 1

expect(response.messages.first.body).to eq body
end

specify "ReceiveMessage with attribute_names parameters" do
body = "test 123"

sqs.send_message(
queue_url: queue_url,
message_body: body
)

sent_time = Time.now.to_i * 1000

response = sqs.receive_message(
queue_url: queue_url,
attribute_names: ["All"]
)

received_time = Time.now.to_i * 1000

expect(response.messages.first.attributes.reject{|k,v| k == "SenderId"}).to eq({
"SentTimestamp" => sent_time.to_s,
"ApproximateReceiveCount" => "1",
"ApproximateFirstReceiveTimestamp" => received_time.to_s
})
expect(response.messages.first.attributes["SenderId"]).to be_kind_of(String)
expire_message(response.messages.first)

response = sqs.receive_message(
queue_url: queue_url
)
expect(response.messages.first.attributes).to eq({})
expire_message(response.messages.first)

response = sqs.receive_message(
queue_url: queue_url,
attribute_names: ["SentTimestamp", "ApproximateReceiveCount", "ApproximateFirstReceiveTimestamp"]
)
expect(response.messages.first.attributes).to eq({
"SentTimestamp" => sent_time.to_s,
"ApproximateReceiveCount" => "3",
"ApproximateFirstReceiveTimestamp" => received_time.to_s
})
end

specify "DeleteMessage" do
sqs.send_message(
queue_url: queue_url,
Expand Down Expand Up @@ -193,7 +234,9 @@
)
expect(nothing.messages.size).to eq 0

sleep(5)
# Changed from sleep 5 to sleep 7 due to race conditions in Travis build
# see https://github.com/iain/fake_sqs/pull/32
sleep(7)

same_message = sqs.receive_message(
queue_url: queue_url,
Expand Down Expand Up @@ -228,8 +271,47 @@
}.to raise_error(Aws::SQS::Errors::MessageNotInflight)
end

specify 'should be moved to configured DLQ after maxReceiveCount if RedrivePolicy is set' do
dlq_queue_url = sqs.create_queue(queue_name: "TestSourceQueueDLQ").queue_url

dlq_arn = sqs.get_queue_attributes(queue_url: dlq_queue_url).attributes.fetch("QueueArn")
sqs.set_queue_attributes(
queue_url: queue_url,
attributes: {
"RedrivePolicy" => "{\"deadLetterTargetArn\":\"#{dlq_arn}\",\"maxReceiveCount\":2}"
}
)

message_id = sqs.send_message(
queue_url: queue_url,
message_body: "test",
).message_id


2.times do
message = sqs.receive_message(queue_url: queue_url)
expect(message.messages.size).to eq(1)
expect(message.messages.first.message_id).to eq(message_id)
expire_message(message.messages.first)
end

expect(sqs.receive_message(queue_url: queue_url).messages.size).to eq(0)

message = sqs.receive_message(queue_url: dlq_queue_url)
expect(message.messages.size).to eq(1)
expect(message.messages.first.message_id).to eq(message_id)
end

def let_messages_in_flight_expire
$fake_sqs.expire
end

def expire_message(message)
sqs.change_message_visibility(
queue_url: queue_url,
receipt_handle: message.receipt_handle,
visibility_timeout: 0
)
end

end
28 changes: 28 additions & 0 deletions spec/unit/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ def new(options = {})
send_message(options)
end

it "should set the message's SentTimestamp attribute" do
expect(send_message.attributes["SentTimestamp"]).to eq (Time.now.to_i * 1000)
end

it "should set the SenderId of the sender" do
sender_id = send_message.attributes["SenderId"]
expect(sender_id).to be_a String
expect(sender_id.length).to eq 21
end
end

describe "#receive_message" do
Expand Down Expand Up @@ -116,6 +125,25 @@ def new(options = {})
expect(receive_message).to eq({})
end

it "should increment the ApproximateReceiveCount" do
sent_message = send_message
expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 0
queue.change_message_visibility(receive_message.keys.first, 0)
expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 1
receive_message
expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 2
end

it "should set the ApproximateFirstReceiveTimestamp only when the message is first received" do
sent_message = send_message
expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq nil
receive_time = (Time.now.to_i * 1000)
queue.change_message_visibility(receive_message.keys.first, 0)
expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq receive_time
sleep 1
receive_message
expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq receive_time
end
end

describe "#delete_message" do
Expand Down