Skip to content

Commit

Permalink
Simple Queue Service
Browse files Browse the repository at this point in the history
  • Loading branch information
SFEley committed Feb 21, 2012
1 parent b671d32 commit 42e8a0f
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 51 deletions.
11 changes: 8 additions & 3 deletions Gemfile
Expand Up @@ -3,6 +3,11 @@ source "http://rubygems.org"
# Specify your gem's dependencies in em-aws.gemspec
gemspec

# 2012-02-06:
# Use latest Webmock until the hash_including functionality is released
gem "webmock", git: 'https://github.com/bblimke/webmock.git'
group :development do
# 2012-02-06:
# Use latest Webmock until the hash_including functionality is released
gem "webmock", git: 'https://github.com/bblimke/webmock.git'
gem "guard-rspec"
gem "growl"
end

10 changes: 10 additions & 0 deletions Guardfile
@@ -0,0 +1,10 @@
# A sample Guardfile
# More info at https://github.com/guard/guard#readme

guard 'rspec', version: 2, cli: '-c -f doc' do
watch(%r{^spec/.+_spec\.rb$})
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" }
watch('spec/spec_helper.rb') { "spec" }
watch(%r{^spec/support/(.+)\.rb$}) { "spec" }
end

25 changes: 22 additions & 3 deletions README.markdown
Expand Up @@ -83,19 +83,38 @@ All request blocks are passed a **Response** object. If the query succeeded (i.

If the query failed (usually with a status in the 400s), the `errback` blocks are called. The response is a subclass of **FailureResponse** and contains the error `:code` and `:message` returned by Amazon. Attempting to reference other attributes raises an exception with the same information.

## Miscellany ##
## General Notes ##

The following behavior is true for all AWS services:

* **EM::AWS** uses HTTP POST by default for all Query Protocol calls. It is possible to override this by passing `method: :get` on service object initialization, but this will limit the amount of data that can be passed.
* SSL is enabled by default. You can disable it globally with `EM::AWS.ssl = false` or locally by passing `ssl: false` on service object initialization.
* XML response values that include lists of `<member>` elements will be flattened into arrays.
* XML response values that include `<key>` and `<value>` pairs will be flattened into Ruby hashes.
* Some services (e.g. SQS) allow you to set multiple attributes in a single call. Passing a hash as the value of a query parameter will automatically expand to the appropriate `Attribute.1.Name`, `Attribute.1.Value`, etc. parameters.
* Do not taunt Happy Fun Ball.
* Network errors and Amazon HTTP 500 errors are automatically retried; the number of attempts can be set with the `EM::AWS.retries` attribute. (The default is 10.)
* The retry delay follows a Fibonacci sequence: the first two retries are 1 second apart, then 2 seconds, then 3, then 5, etc. A full cycle of 10 retries thus takes 143 seconds. If the error is not resolved by that time, it will be returned as a **FailureResponse**.
* If any query receives a `Throttling` response from Amazon, it will also be retried, and subsequent calls to the same service will be subject to a 1 second delay. The delay will expire if two minutes pass without a throttling error.

## SQS ##

The Simple Queue Service behaves differently from most other Amazon services, in that most calls must be made to a _queue URL_ rather than a root path. This must be supplied on initialization of the **EM::AWS::SQS** object. If you already know the URL of the queue you want to work with, you can simply pass it with the `:url` parameter:

queue = EM::AWS::SQS.new url: 'https://sqs.us-east-1.amazonaws.com/1234567890/My-Interesting-Queue'

If you know a queue's name but not its URL, you can use the `.get` class method to call 'GetQueueUrl' and create the proper SQS object:

queue = EM::AWS::SQS.get 'My-Interesting-Queue'

You can also create a queue that doesn't exist yet using the `.create` class method, passing any optional attributes as a hash:

queue = EM::AWS::SQS.create 'My-Interesting-Queue',
visibility_timeout: 120,
maximum_message_size: 8192

(If a queue with that name already exists, the `.create` class method has the same net effect as `.get`, except that Amazon will return an error if you pass any attributes that are different from the ones already set.)





45 changes: 45 additions & 0 deletions Rakefile
@@ -1 +1,46 @@
require "bundler/gem_tasks"
$:.unshift File.join(File.dirname(__FILE__), 'lib')
require 'em-aws'


namespace :clean do
EM::AWS.aws_access_key_id = ENV['AWS_ACCESS_KEY_ID']
EM::AWS.aws_secret_access_key = ENV['AWS_SECRET_ACCESS_KEY']

desc "Deletes any leftover SQS queues (EM-AWS-Test-Queue-*)"
task :queues do
EM.run do
q = EM::Queue.new
sqs = EM::AWS::SQS.new
puts "Retrieving test queues..."
list = sqs.list_queues(queue_name_prefix: 'EM-AWS-Test-Queue')
list.callback {|r| q.push *Array(r[:queue_url])}
list.errback do |r|
puts "ERROR: #{r.error}"
EM.stop
end

EM.add_periodic_timer(0.1) do
q.pop do |url|
puts "Deleting #{url}..."
queue = EM::AWS::SQS.new url: url
del = queue.delete_queue
del.callback {|r| puts " --Deleted #{url}"}
del.errback do |r|
puts " **ERROR: #{r.error} on #{url}"
q.push url
end
end
end

EM.add_periodic_timer(5) do
if q.empty?
puts "All queues deleted."
EM.stop
end
end

end
end

end
2 changes: 1 addition & 1 deletion lib/em-aws/query.rb
Expand Up @@ -19,7 +19,7 @@ module Query
def initialize(options = {})
super
@method = options.delete(:method) || :post
@signer = SignatureV2.new(aws_access_key_id, aws_secret_access_key, method, endpoint) if aws_access_key_id && aws_secret_access_key
@signer = SignatureV2.new(aws_access_key_id, aws_secret_access_key, method, url) if aws_access_key_id && aws_secret_access_key
end

def call(action, params = {}, &block)
Expand Down
6 changes: 5 additions & 1 deletion lib/em-aws/query/response_parser.rb
Expand Up @@ -68,7 +68,11 @@ def collapse_stack(value)
@stack.last[value[:key]] = value[:value]
when Symbol
if @stack.last.is_a?(Hash)
@stack.last[element] = value
if @stack.last.has_key?(element)
@stack.last[element] = Array(@stack.last[element]) << value
else
@stack.last[element] = value
end
else
raise "I don't know how to add #{element} to #{@stack}"
end
Expand Down
4 changes: 2 additions & 2 deletions lib/em-aws/query/signature_v2.rb
Expand Up @@ -10,11 +10,11 @@ module Query
class SignatureV2
attr_reader :access_key, :secret_key, :method, :host, :path

def initialize(access_key, secret_key, method, endpoint)
def initialize(access_key, secret_key, method, url)
@access_key = access_key
@secret_key = secret_key
@method = method
if endpoint =~ %r!^(https?://)?([^/]+)(/[^?]*)!
if url =~ %r!^(https?://)?([^/]+)(/[^?]*)!
@host, @path = $2, $3
end
end
Expand Down
13 changes: 7 additions & 6 deletions lib/em-aws/service.rb
Expand Up @@ -17,6 +17,7 @@ class Service
:aws_secret_access_key,
:region,
:ssl,
:path,
:options

def initialize(options = {})
Expand All @@ -29,25 +30,25 @@ def initialize(options = {})
else
@ssl = EventMachine::AWS.ssl
end
@endpoint = options.delete(:endpoint)
@url = options.delete(:url)
@options = options
end

def service
self.class.name[/.*::(?<class>.+)/, :class].downcase
end

def endpoint
@endpoint ||= "#{ssl ? 'https' : 'http'}://#{service}.#{region}.amazonaws.com/"
def url
@url ||= "#{ssl ? 'https' : 'http'}://#{service}.#{region}.amazonaws.com/#{path}"
end

protected
private

def send_request(request, &block)
if request.method == :get
http_request = EventMachine::HttpRequest.new(endpoint).get query: request.params
http_request = EventMachine::HttpRequest.new(self.url).get query: request.params
else
http_request = EventMachine::HttpRequest.new(endpoint).send request.method, body: request.params
http_request = EventMachine::HttpRequest.new(self.url).send request.method, body: request.params
end

http_request.errback do |raw_response|
Expand Down
33 changes: 33 additions & 0 deletions lib/em-aws/sqs.rb
Expand Up @@ -2,8 +2,41 @@

module EventMachine
module AWS

# NOTE: If you want to work with an individual queue, be sure to specify either the
# :url parameter for the queue URL or the :queue_name.
class SQS < Service
include Query

API_VERSION='2011-10-01'

def queue_name
url[/https?:\/\/.*?\/(.+)/,1]
end

# Retrieves an SQS object by queue name. Returns nil if the queue can't be found.
def self.get(name)
url, retriever = nil, self.new
retriever.get_queue_url(queue_name: name) {|r| url = r.queue_url}
if url
self.new url: url
else
nil
end
end

# Creates a queue by name and returns an SQS object pointing to it. This operation
# is idempotent (i.e. will return the same object) if the queue name already exists,
# so long as no attributes are different.
def self.create(name)
url, creator = nil, self.new
creator.create_queue(queue_name: name) {|r| url = r.queue_url}
if url
self.new url: url
else
nil
end
end
end
end
end
4 changes: 4 additions & 0 deletions spec/em-aws/query/query_result_spec.rb
Expand Up @@ -26,6 +26,10 @@
subject.topics.first.should == {topic_arn: 'arn:aws:sns:us-east-1:429167422711:EM-AWS-Test-Topic'}
end

it "treats multiple elements with the same name as an array" do
subject.plural_thing.should == [17, 'hello', 9.2]
end

it "handles key/value entry pairs" do
subject[:attributes].should have(3).keys
subject.attributes['Foo'].should == 'Bar'
Expand Down
61 changes: 42 additions & 19 deletions spec/em-aws/sqs_spec.rb
@@ -1,30 +1,53 @@
require_relative '../spec_helper'

describe EventMachine::AWS::SQS, :live do
subject {EM::AWS::SQS.new}
describe EventMachine::AWS::SQS do

before(:all) do
queue_name = "EM-AWS-Test-Queue-#{Time.now.to_i}"
create_response = subject.create_queue(queue_name: queue_name)
create_response.should be_success
@queue = create_response.queue_url
@queue_name = "EM-AWS-Test-Queue-#{Time.now.to_i}"
end

it_behaves_like "an AWS Query"

it "can create a queue" do
@topic.should =~ /^arn:aws:sns:.*#{@test_topic}$/
end
it_behaves_like "an AWS Query"

it "can retrieve a list of queues" do
response = subject.list_queues
response.queues.should include({queue_url: @topic})
it "derives the queue name from the URL" do
this = EM::AWS::SQS::new url: 'http://dummy.amazonaws.com/fake-queue-name'
this.queue_name.should == 'fake-queue-name'
end

after(:all) do
delete_response = subject.delete_queue(topic_arn: @topic)
delete_response.should be_success

context "operations", :live do
before(:all) do
@queue = EM::AWS::SQS.create @queue_name
sleep 60
end

subject { @queue }

it "points to the proper queue" do
subject.url.should =~ /http.*\/#{@queue_name}$/
end

it "can retrieve a list of queues" do
response = subject.list_queues
response.queue_url.should include(subject.url)
end

it "can get the queue by name" do
queue = EM::AWS::SQS.get @queue_name
queue.url.should == subject.url
end

it "can set attributes on the queue" do
subject.set_queue_attributes attribute: {maximum_message_size: 1024}
sleep 10
response = subject.get_queue_attributes attribute_name: [:maximum_message_size]
response.attribute[:maximum_message_size].should == 1024
end


after(:all) do
sleep 10
delete_response = subject.delete_queue
delete_response.should be_success
end
end


end
3 changes: 3 additions & 0 deletions spec/support/dummy_http_response.rb
Expand Up @@ -19,6 +19,9 @@ def response
<TopicArn>arn:aws:sns:us-east-1:429167422711:bigthink_alarms</TopicArn>
</member>
</Topics>
<PluralThing>17</PluralThing>
<PluralThing>hello</PluralThing>
<PluralThing>9.2</PluralThing>
<Attributes>
<entry>
<key>Foo</key>
Expand Down

0 comments on commit 42e8a0f

Please sign in to comment.