Skip to content

Commit

Permalink
Add further AWS operations required for image crawling
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Jan 3, 2010
1 parent f7be7f4 commit 67525fd
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 17 deletions.
7 changes: 3 additions & 4 deletions qanat/config/arguments.rb
Expand Up @@ -6,7 +6,6 @@
# Ruby hash that is later accessible through
# DaemonKit.arguments.options and can be used in your daemon process.

# Here is an example:
# opts.on('-f', '--foo FOO', 'Set foo') do |foo|
# @options[:foo] = foo
# end
opts.on('-q', '--queue NAME', 'Name of SQS to process') do |n|
@options[:queue_name] = n
end
42 changes: 38 additions & 4 deletions qanat/lib/s3.rb
@@ -1,3 +1,5 @@
require 'mime/types'

module S3
USE_100_CONTINUE_PUT_SIZE = 1_000_000
DEFAULT_HOST = 's3.amazonaws.com'
Expand All @@ -24,30 +26,62 @@ def put(key, data=nil, headers={})
data)
code = result.response_header.status
if code != 200
raise ArgumentError, "S3 put failed: #{code} #{result.response}"
raise RuntimeError, "S3 put failed: #{code} #{result.response}"
end
code == 200
end

def get(key, headers={}, &block)
result = async_operation(:get, headers.merge(:key => CGI::escape(key)))
code = result.response_header.status
if code == 404
return nil
end
if code != 200
raise ArgumentError, "S3 get failed: #{result.response}"
raise RuntimeError, "S3 get failed: #{result.response}"
end
result.response
end

def head(key, headers={})
result = async_operation(:head, headers.merge(:key => CGI::escape(key)))
p result
code = result.response_header.status
if code == 404
return nil
end
code == 200
end

def delete(key, headers={})
result = async_operation(:delete, headers.merge(:key => CGI::escape(key)))
p result
code = result.response_header.status
code == 200
end

def put_file(filename, data)
digest = Digest::MD5.hexdigest(data)
headers = head(file_path)
if headers and headers[MD5SUM] == digest
logger.info "[S3] Skipping upload of #{file_path}, unchanged..."
# skip push to S3
else
logger.info "[S3] Pushing #{file_path}"
options = { MD5SUM => digest }
type = guess_mimetype(file_path)
options["Content-Type"] = type if !type.blank?
options["Content-Encoding"] = 'gzip' if file_path =~ /\.gz$/
put(file_path, data, { 'x-amz-acl' => 'public-read' }.merge(options))
end
file_path
end

private

MD5SUM = 'x-amz-meta-md5sum'

def guess_mimetype(filepath)
MIME::Types.type_for(filepath)[0].to_s
end

def async_operation(method, headers={}, body=nil)
f = Fiber.current
Expand Down
2 changes: 1 addition & 1 deletion qanat/lib/sdb.rb
Expand Up @@ -41,7 +41,7 @@ def get(id_or_array)
logger.error "SDB got an error response: #{code} #{http.response}"
return nil
end
{ 'id' => id_or_array }.merge(to_attributes(http.response))
to_attributes(http.response)
end

def put(id, attribs)
Expand Down
34 changes: 31 additions & 3 deletions qanat/lib/sqs.rb
Expand Up @@ -24,15 +24,43 @@ def poll(concurrency, &block)
end
end

def push(msg)
request_hash = generate_request_hash("SendMessage", 'MessageBody' => msg)
http = async_operation(:post, @uri, request_hash, :timeout => timeout)
code = http.response_header.status
if code != 200
logger.error "SQS send_message returned an error response: #{code} #{http.response}"
end
end

private

def url_for(name)
def create(name)
request_hash = generate_request_hash("CreateQueue", 'QueueName' => name)
http = async_operation(:post, DEFAULT_HOST, request_hash, :timeout => timeout)
code = http.response_header.status
if code != 200
logger.error "SQS send_message returned an error response: #{code} #{http.response}"
end
end

def url_for(name, recur=false)
raise ArgumentError, "No queue given" if !name || name.strip == ''
request_hash = generate_request_hash("ListQueues", 'QueueNamePrefix' => name)
http = async_operation(:get, DEFAULT_HOST, request_hash, :timeout => timeout)
code = http.response_header.status
if code == 200
doc = Nokogiri::XML(http.response)
url = doc.xpath('//xmlns:QueueUrl').first.content
tag = doc.xpath('//xmlns:QueueUrl').first
if !tag
if !recur
create(name)
return url_for(name, true)
else
raise ArgumentError, "Unable to create queue '#{name}'"
end
end
url = tag.content
logger.info "Queue #{name} at #{url}"
return url
end
Expand Down Expand Up @@ -74,7 +102,7 @@ def receive_msg(count=1, &block)
end
end
else
logger.info "Queue #{@name} is empty"
logger.info "Queue #{@uri} is empty"
Fiber.sleep(5)
end
else
Expand Down
27 changes: 22 additions & 5 deletions qanat/libexec/qanat-daemon.rb
Expand Up @@ -3,14 +3,31 @@
# of strange things might start happening...
DaemonKit::Application.running! do |config|
# Trap signals with blocks or procs
# config.trap( 'INT' ) do
# # do something clever
# end
# config.trap( 'TERM', Proc.new { puts 'Going down' } )
config.trap( 'HUP' ) do
# Dump the REE stack traces
p caller_for_all_threads if Object.respond_to? :caller_for_all_threads
end
config.trap('TERM') do
# TODO print out the number of messages waiting to be processed
end
end

Qanat.run do
DaemonKit.logger.info "start"

Fiber.new do
sqs = SQS::Queue.new(DaemonKit.arguments.options[:queue_name])
sqs.poll(5) do |msg|
DaemonKit.logger.info "Processing #{msg}"

obj = YAML::load(msg)
dispatch(obj, priority)
end
end.resume
end

def dispatch(msg, priority)
notify_upon_exception('jobber', msg) do |hash|
notify_upon_exception('qanat', msg) do |hash|
name = hash.fetch(:msg_type).to_s.camelize
profile hash.inspect do
name.constantize.new.process(hash, priority)
Expand Down

0 comments on commit 67525fd

Please sign in to comment.