From 927762ed61289c1ac6785b6dedf0fcf458c2769d Mon Sep 17 00:00:00 2001 From: nritholtz Date: Thu, 5 Nov 2015 11:29:11 -0500 Subject: [PATCH 1/2] Add attributes to Message object as per SQS Message struct Return attributes as part of receive_message call with optional attribute_names parameter --- lib/fake_sqs/actions/receive_message.rb | 15 ++++++- lib/fake_sqs/message.rb | 18 +++++++-- lib/fake_sqs/queue.rb | 1 + spec/acceptance/message_actions_spec.rb | 53 ++++++++++++++++++++++++- spec/unit/queue_spec.rb | 28 +++++++++++++ 5 files changed, 107 insertions(+), 8 deletions(-) diff --git a/lib/fake_sqs/actions/receive_message.rb b/lib/fake_sqs/actions/receive_message.rb index 55d3e55..d70f337 100644 --- a/lib/fake_sqs/actions/receive_message.rb +++ b/lib/fake_sqs/actions/receive_message.rb @@ -10,6 +10,10 @@ def initialize(options = {}) def call(name, params) queue = @queues.get(name) + filtered_attribute_names = [] + params.select{|k,v | k =~ /AttributeName\.\d+/}.each do |key, value| + filtered_attribute_names << value + end messages = queue.receive_message(params) @responder.call :ReceiveMessage do |xml| messages.each do |receipt, message| @@ -18,11 +22,18 @@ def call(name, params) xml.ReceiptHandle receipt xml.MD5OfBody message.md5 xml.Body message.body + message.attributes.each do |name, value| + if filtered_attribute_names.include?("All") || filtered_attribute_names.include?(name) + xml.Attribute do + xml.Name name + xml.Value value + end + end + end end end end end - end end -end +end \ No newline at end of file diff --git a/lib/fake_sqs/message.rb b/lib/fake_sqs/message.rb index 225147c..6cca714 100644 --- a/lib/fake_sqs/message.rb +++ b/lib/fake_sqs/message.rb @@ -3,19 +3,28 @@ module FakeSQS class Message - attr_reader :body, :id, :md5 + attr_reader :body, :id, :md5, :approximate_receive_count, + :sender_id, :approximate_first_receive_timestamp, :sent_timestamp attr_accessor :visibility_timeout def initialize(options = {}) @body = options.fetch("MessageBody") @id = options.fetch("Id") { SecureRandom.uuid } @md5 = options.fetch("MD5") { Digest::MD5.hexdigest(@body) } + @sender_id = options.fetch("SenderId") { SecureRandom.uuid.delete('-').upcase[0...21] } + @approximate_receive_count = 0 + @sent_timestamp = Time.now.to_i * 1000 end def expire! self.visibility_timeout = nil end + def receive! + @approximate_first_receive_timestamp ||= Time.now.to_i * 1000 + @approximate_receive_count += 1 + end + def expired?( limit = Time.now ) self.visibility_timeout.nil? || self.visibility_timeout < limit end @@ -26,9 +35,10 @@ def expire_at(seconds) def attributes { - "MessageBody" => body, - "Id" => id, - "MD5" => md5, + "SenderId" => sender_id, + "ApproximateFirstReceiveTimestamp" => approximate_first_receive_timestamp, + "ApproximateReceiveCount"=> approximate_receive_count, + "SentTimestamp"=> sent_timestamp } end diff --git a/lib/fake_sqs/queue.rb b/lib/fake_sqs/queue.rb index c6ea9fe..5c3b731 100644 --- a/lib/fake_sqs/queue.rb +++ b/lib/fake_sqs/queue.rb @@ -66,6 +66,7 @@ def receive_message(options = {}) actual_amount.times do message = @messages.delete_at(rand(size)) message.expire_at(default_visibility_timeout) + message.receive! receipt = generate_receipt @messages_in_flight[receipt] = message result[receipt] = message diff --git a/spec/acceptance/message_actions_spec.rb b/spec/acceptance/message_actions_spec.rb index 8008023..4b177ef 100644 --- a/spec/acceptance/message_actions_spec.rb +++ b/spec/acceptance/message_actions_spec.rb @@ -35,14 +35,55 @@ ) response = sqs.receive_message( - queue_url: queue_url, + queue_url: queue_url ) expect(response.messages.size).to eq 1 - expect(response.messages.first.body).to eq body end + specify "ReceiveMessage with attribute_names parameters" do + body = "test 123" + + sqs.send_message( + queue_url: queue_url, + message_body: body + ) + + sent_time = Time.now.to_i * 1000 + + response = sqs.receive_message( + queue_url: queue_url, + attribute_names: ["All"] + ) + + received_time = Time.now.to_i * 1000 + + expect(response.messages.first.attributes.reject{|k,v| k == "SenderId"}).to eq({ + "SentTimestamp" => sent_time.to_s, + "ApproximateReceiveCount" => "1", + "ApproximateFirstReceiveTimestamp" => received_time.to_s + }) + expect(response.messages.first.attributes["SenderId"]).to be_kind_of(String) + expire_message(response.messages.first) + + response = sqs.receive_message( + queue_url: queue_url + ) + expect(response.messages.first.attributes).to eq({}) + expire_message(response.messages.first) + + response = sqs.receive_message( + queue_url: queue_url, + attribute_names: ["SentTimestamp", "ApproximateReceiveCount", "ApproximateFirstReceiveTimestamp"] + ) + expect(response.messages.first.attributes).to eq({ + "SentTimestamp" => sent_time.to_s, + "ApproximateReceiveCount" => "3", + "ApproximateFirstReceiveTimestamp" => received_time.to_s + }) + end + specify "DeleteMessage" do sqs.send_message( queue_url: queue_url, @@ -232,4 +273,12 @@ def let_messages_in_flight_expire $fake_sqs.expire end + def expire_message(message) + sqs.change_message_visibility( + queue_url: queue_url, + receipt_handle: message.receipt_handle, + visibility_timeout: 0 + ) + end + end diff --git a/spec/unit/queue_spec.rb b/spec/unit/queue_spec.rb index 88438e0..b81a541 100644 --- a/spec/unit/queue_spec.rb +++ b/spec/unit/queue_spec.rb @@ -32,6 +32,15 @@ def new(options = {}) send_message(options) end + it "should set the message's SentTimestamp attribute" do + expect(send_message.attributes["SentTimestamp"]).to eq (Time.now.to_i * 1000) + end + + it "should set the SenderId of the sender" do + sender_id = send_message.attributes["SenderId"] + expect(sender_id).to be_a String + expect(sender_id.length).to eq 21 + end end describe "#receive_message" do @@ -116,6 +125,25 @@ def new(options = {}) expect(receive_message).to eq({}) end + it "should increment the ApproximateReceiveCount" do + sent_message = send_message + expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 0 + queue.change_message_visibility(receive_message.keys.first, 0) + expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 1 + receive_message + expect(sent_message.attributes["ApproximateReceiveCount"]).to eq 2 + end + + it "should set the ApproximateFirstReceiveTimestamp only when the message is first received" do + sent_message = send_message + expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq nil + receive_time = (Time.now.to_i * 1000) + queue.change_message_visibility(receive_message.keys.first, 0) + expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq receive_time + sleep 1 + receive_message + expect(sent_message.attributes["ApproximateFirstReceiveTimestamp"]).to eq receive_time + end end describe "#delete_message" do From f9fcb8a22174d41f305702f5bc48d88060a2b355 Mon Sep 17 00:00:00 2001 From: nritholtz Date: Mon, 23 Nov 2015 11:27:39 -0500 Subject: [PATCH 2/2] Implement DLQ (RedrivePolicy) Functionality --- lib/fake_sqs/actions/receive_message.rb | 2 +- lib/fake_sqs/queue.rb | 27 ++++++++++++++----- spec/acceptance/message_actions_spec.rb | 35 ++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/lib/fake_sqs/actions/receive_message.rb b/lib/fake_sqs/actions/receive_message.rb index d70f337..b624587 100644 --- a/lib/fake_sqs/actions/receive_message.rb +++ b/lib/fake_sqs/actions/receive_message.rb @@ -14,7 +14,7 @@ def call(name, params) params.select{|k,v | k =~ /AttributeName\.\d+/}.each do |key, value| filtered_attribute_names << value end - messages = queue.receive_message(params) + messages = queue.receive_message(params.merge(queues: @queues)) @responder.call :ReceiveMessage do |xml| messages.each do |receipt, message| xml.Message do diff --git a/lib/fake_sqs/queue.rb b/lib/fake_sqs/queue.rb index 5c3b731..556a95e 100644 --- a/lib/fake_sqs/queue.rb +++ b/lib/fake_sqs/queue.rb @@ -1,6 +1,6 @@ require 'securerandom' require 'fake_sqs/collection_view' - +require 'json' module FakeSQS MessageNotInflight = Class.new(RuntimeError) @@ -45,7 +45,7 @@ def attributes def send_message(options = {}) with_lock do - message = message_factory.new(options) + message = options.fetch(:message){ message_factory.new(options) } @messages << message message end @@ -65,11 +65,13 @@ def receive_message(options = {}) actual_amount.times do message = @messages.delete_at(rand(size)) - message.expire_at(default_visibility_timeout) - message.receive! - receipt = generate_receipt - @messages_in_flight[receipt] = message - result[receipt] = message + unless check_message_for_dlq(message, options) + message.expire_at(default_visibility_timeout) + message.receive! + receipt = generate_receipt + @messages_in_flight[receipt] = message + result[receipt] = message + end end end @@ -116,6 +118,17 @@ def change_message_visibility(receipt, visibility) end end + def check_message_for_dlq(message, options={}) + if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"]) + dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]} + if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i + dlq.send_message(message: message) + message.expire! + true + end + end + end + def delete_message(receipt) with_lock do @messages_in_flight.delete(receipt) diff --git a/spec/acceptance/message_actions_spec.rb b/spec/acceptance/message_actions_spec.rb index 4b177ef..ba2fabb 100644 --- a/spec/acceptance/message_actions_spec.rb +++ b/spec/acceptance/message_actions_spec.rb @@ -234,7 +234,9 @@ ) expect(nothing.messages.size).to eq 0 - sleep(5) + # Changed from sleep 5 to sleep 7 due to race conditions in Travis build + # see https://github.com/iain/fake_sqs/pull/32 + sleep(7) same_message = sqs.receive_message( queue_url: queue_url, @@ -269,6 +271,37 @@ }.to raise_error(Aws::SQS::Errors::MessageNotInflight) end + specify 'should be moved to configured DLQ after maxReceiveCount if RedrivePolicy is set' do + dlq_queue_url = sqs.create_queue(queue_name: "TestSourceQueueDLQ").queue_url + + dlq_arn = sqs.get_queue_attributes(queue_url: dlq_queue_url).attributes.fetch("QueueArn") + sqs.set_queue_attributes( + queue_url: queue_url, + attributes: { + "RedrivePolicy" => "{\"deadLetterTargetArn\":\"#{dlq_arn}\",\"maxReceiveCount\":2}" + } + ) + + message_id = sqs.send_message( + queue_url: queue_url, + message_body: "test", + ).message_id + + + 2.times do + message = sqs.receive_message(queue_url: queue_url) + expect(message.messages.size).to eq(1) + expect(message.messages.first.message_id).to eq(message_id) + expire_message(message.messages.first) + end + + expect(sqs.receive_message(queue_url: queue_url).messages.size).to eq(0) + + message = sqs.receive_message(queue_url: dlq_queue_url) + expect(message.messages.size).to eq(1) + expect(message.messages.first.message_id).to eq(message_id) + end + def let_messages_in_flight_expire $fake_sqs.expire end