Skip to content

Commit

Permalink
Migrate sqs xml to nokogiri
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Dec 13, 2009
1 parent ae6dcda commit 9a6ffc0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 51 deletions.
59 changes: 24 additions & 35 deletions qanat/lib/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def fiber_sleep(sec)
end

module SQS
DEFAULT_HOST = "queue.amazonaws.com"
API_VERSION = "2008-01-01"
DEFAULT_HOST = "queue.amazonaws.com"
API_VERSION = "2008-01-01"

def self.run(&block)
# Ensure graceful shutdown of the connection to the broker
Expand All @@ -32,7 +32,7 @@ def self.run(&block)
end

class Queue
REQUEST_TTL = 30
REQUEST_TTL = 30

include Amazon::Authentication

Expand Down Expand Up @@ -70,29 +70,29 @@ def receive_msg(count=1, &block)
'VisibilityTimeout' => 600)
http = async_operation(:get, request_hash, :timeout => timeout)
code = http.response_header.status
doc = parse_response(http.response)
msgs = doc.find('//sqs:Message')
if msgs.size > 0
msgs.each do |msg|
handle_el = msg.find_first('//sqs:ReceiptHandle')
(logger.info msg; next) if !handle_el
if code == 200
doc = Nokogiri::XML(http.response)
msgs = doc.xpath('//xmlns:Message')
if msgs.size > 0
msgs.each do |msg|
handle_el = msg.at_xpath('.//xmlns:ReceiptHandle')
(logger.info msg; next) if !handle_el

handle = msg.find_first('//sqs:ReceiptHandle').content.strip
message_id = msg.find_first('//sqs:MessageId').content.strip
checksum = msg.find_first('//sqs:MD5OfBody').content.strip
body = msg.find_first('//sqs:Body').content.strip
handle = msg.at_xpath('.//xmlns:ReceiptHandle').content
message_id = msg.at_xpath('.//xmlns:MessageId').content
checksum = msg.at_xpath('.//xmlns:MD5OfBody').content
body = msg.at_xpath('.//xmlns:Body').content

if checksum != Digest::MD5.hexdigest(body)
logger.info "SQS message does not match checksum, ignoring..."
else
logger.info "Queued message, SQS message id is: #{message_id}"
block.call body
delete_msg(handle)
if checksum != Digest::MD5.hexdigest(body)
logger.info "SQS message does not match checksum, ignoring..."
else
block.call body
delete_msg(handle)
end
end
else
logger.info "Queue #{@name} is empty"
end
elsif code == 200
logger.info "Queue #{@name} is empty"
fiber_sleep(5)
else
logger.error "SQS returned an error response: #{code} #{http.response}"
# TODO parse the response and print something useful
Expand All @@ -116,12 +116,8 @@ def async_operation(method, parameters, opts)
end

def default_parameters
request_hash = { "Expires" => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"Version" => API_VERSION }
end

def default_prefix
'sqs'
request_hash = { "Expires" => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"Version" => API_VERSION }
end

def logger
Expand All @@ -131,12 +127,5 @@ def logger
def timeout
Integer(@config['timeout'])
end

def parse_response(string)
parser = XML::Parser.string(string)
doc = parser.parse
doc.root.namespaces.default_prefix = default_prefix
return doc
end
end
end
32 changes: 16 additions & 16 deletions qanat/libexec/qanat-daemon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,23 @@ def notify_upon_exception(name, ctx)
SQS.run do
DaemonKit.logger.info "start"

sdb = Simpledb::Database.new('images-staging')
IMAGE_SETS.each_with_index do |images, idx|
Fiber.new do
images.each do |iid|
p [idx, sdb.get(iid)]
end
end.resume
end

# sqs = SQS::Queue.new('test')
#
# sqs.poll(5) do |msg|
# DaemonKit.logger.info "Processing #{msg}"
#
# # obj = YAML::load(msg)
# # dispatch(obj, priority)
# sdb = Simpledb::Database.new('images-staging')
# IMAGE_SETS.each_with_index do |images, idx|
# Fiber.new do
# images.each do |iid|
# p [idx, sdb.get(iid)]
# end
# end.resume
# end

sqs = SQS::Queue.new('test')

sqs.poll(5) do |msg|
DaemonKit.logger.info "Processing #{msg}"

# obj = YAML::load(msg)
# dispatch(obj, priority)
end

end

0 comments on commit 9a6ffc0

Please sign in to comment.