Skip to content

Commit

Permalink
Clean all trailing whitespases
Browse files Browse the repository at this point in the history
  • Loading branch information
dsnipe committed Apr 8, 2016
1 parent 150ecc0 commit 2b111c1
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 115 deletions.
4 changes: 2 additions & 2 deletions lib/sqewer.rb
Expand Up @@ -6,7 +6,7 @@ module Sqewer
require path
end
end

# Loads a particular Sqewer extension that is not loaded
# automatically during the gem require.
#
Expand All @@ -15,7 +15,7 @@ def self.require_extension(extension_name)
path = File.join("sqewer", "extensions", extension_name)
require_relative path
end

# Shortcut access to Submitter#submit.
#
# @see {Sqewer::Submitter#submit!}
Expand Down
4 changes: 2 additions & 2 deletions lib/sqewer/atomic_counter.rb
Expand Up @@ -5,14 +5,14 @@ class Sqewer::AtomicCounter
def initialize
@m, @v = Mutex.new, 0
end

# Returns the current value of the counter
#
# @return [Fixnum] the current value of the counter
def to_i
@m.synchronize { @v + 0 }
end

# Increments the counter
#
# @return [Fixnum] the current value of the counter
Expand Down
6 changes: 3 additions & 3 deletions lib/sqewer/cli.rb
Expand Up @@ -20,7 +20,7 @@ def start(worker = Sqewer::Worker.default)
# Signal not supported
end
end

begin
worker.start
# The worker is non-blocking, so in the main CLI process we select() on the signal
Expand All @@ -34,7 +34,7 @@ def start(worker = Sqewer::Worker.default)
exit 1
end
end

def handle_signal(worker, sig)
case sig
when 'USR1', 'TERM'
Expand All @@ -46,6 +46,6 @@ def handle_signal(worker, sig)
raise Interrupt
end
end

extend self
end
32 changes: 16 additions & 16 deletions lib/sqewer/connection.rb
Expand Up @@ -8,34 +8,34 @@
class Sqewer::Connection
DEFAULT_TIMEOUT_SECONDS = 5
BATCH_RECEIVE_SIZE = 10

# A wrapper for most important properties of the received message
class Message < Struct.new(:receipt_handle, :body)
def inspect
body.inspect
end

def has_body?
body && !body.empty?
end
end

# Returns the default adapter, connected to the queue set via the `SQS_QUEUE_URL`
# environment variable.
def self.default
new(ENV.fetch('SQS_QUEUE_URL'))
rescue KeyError => e
raise "SQS_QUEUE_URL not set in the environment. This is the queue URL that the default that Sqewer uses"
end

# Initializes a new adapter, with access to the SQS queue at the given URL.
#
# @param queue_url[String] the SQS queue URL (the URL can be copied from your AWS console)
def initialize(queue_url)
require 'aws-sdk'
@queue_url = queue_url
end

# Receive at most 10 messages from the queue, and return the array of Message objects.
#
# @return [Array<Message>] an array of Message objects
Expand All @@ -46,7 +46,7 @@ def receive_messages
Message.new(message.receipt_handle, message.body)
end
end

# Send a message to the backing queue
#
# @param message_body[String] the message to send
Expand All @@ -56,7 +56,7 @@ def receive_messages
def send_message(message_body, **kwargs_for_send)
send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end

# Stores the messages for the SQS queue (both deletes and sends), and yields them in allowed batch sizes
class MessageBuffer < Struct.new(:messages)
MAX_RECORDS = 10
Expand All @@ -67,7 +67,7 @@ def each_batch
messages.each_slice(MAX_RECORDS){|batch| yield(batch)}
end
end

# Saves the messages to send to the SQS queue
class SendBuffer < MessageBuffer
def send_message(message_body, **kwargs_for_send)
Expand All @@ -78,7 +78,7 @@ def send_message(message_body, **kwargs_for_send)
messages << m
end
end

# Saves the receipt handles to batch-delete from the SQS queue
class DeleteBuffer < MessageBuffer
def delete_message(receipt_handle)
Expand All @@ -88,7 +88,7 @@ def delete_message(receipt_handle)
messages << m
end
end

# Send multiple messages. If any messages fail to send, an exception will be raised.
#
# @yield [#send_message] the object you can send messages through (will be flushed at method return)
Expand All @@ -105,23 +105,23 @@ def send_multiple_messages
end
end
end

# Deletes a message after it has been succesfully decoded and processed
#
# @param message_identifier[String] the ID of the message to delete. For SQS, it is the receipt handle
# @return [void]
def delete_message(message_identifier)
delete_multiple_messages {|via| via.delete_message(message_identifier) }
end

# Deletes multiple messages after they all have been succesfully decoded and processed.
#
# @yield [#delete_message] an object you can delete an individual message through
# @return [void]
def delete_multiple_messages
buffer = DeleteBuffer.new
yield(buffer)

buffer.each_batch do | batch |
resp = client.delete_message_batch(queue_url: @queue_url, entries: batch)
failed = resp.failed
Expand All @@ -131,9 +131,9 @@ def delete_multiple_messages
end
end
end

private

class RetryWrapper < Struct.new(:sqs_client)
MAX_RETRIES = 1000
# Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use
Expand All @@ -153,7 +153,7 @@ class RetryWrapper < Struct.new(:sqs_client)
end
end
end

def client
@client ||= RetryWrapper.new(Aws::SQS::Client.new)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/sqewer/connection_messagebox.rb
Expand Up @@ -23,7 +23,7 @@ def initialize(connection)
@sends = []
@mux = Mutex.new
end

# Saves the given body and the keyword arguments (such as delay_seconds) to be sent into the queue.
# If there are more sends in the same flush, they will be batched using batched deletes.G
#
Expand All @@ -33,7 +33,7 @@ def send_message(message_body, **kwargs_for_send)
@sends << [message_body, kwargs_for_send]
}
end

# Saves the given identifier to be deleted from the queue. If there are more
# deletes in the same flush, they will be batched using batched deletes.
#
Expand All @@ -43,7 +43,7 @@ def delete_message(message_identifier)
@deletes << message_identifier
}
end

# Flushes all the accumulated commands to the queue connection.
# First the message sends are going to be flushed, then the message deletes.
# All of those will use batching where possible.
Expand All @@ -52,7 +52,7 @@ def flush!
@connection.send_multiple_messages do | buffer |
@sends.each { |body, kwargs| buffer.send_message(body, **kwargs) }
end

@connection.delete_multiple_messages do | buffer |
@deletes.each { |id| buffer.delete_message(id) }
end
Expand Down
10 changes: 5 additions & 5 deletions lib/sqewer/execution_context.rb
Expand Up @@ -14,29 +14,29 @@ def initialize(submitter, extra_variables={})
@params = {}
extra_variables.each_pair{|k, v| self[k] = v }
end

# Submits one or more jobs to the queue
#
# @see {Sqewer::Submitter#submit!}
def submit!(job, **execution_options)
@submitter.submit!(job, **execution_options)
end

# Sets a key in the execution environment
#
# @param key[#to_s] the key to set
# @param value the value to set
def []=(key, value)
@params[key.to_s] = value
end

# Returns a key of the execution environment by name
#
# @param key[#to_s] the key to get
def [](key)
@params[key.to_s]
end

# Returns a key of the execution environment, or executes the given block
# if the key is not set
#
Expand All @@ -45,7 +45,7 @@ def [](key)
def fetch(key, &blk)
@params.fetch(key.to_s, &blk)
end

# Returns the logger set in the execution environment, or
# the NullLogger if no logger is set. Can be used to supply
# a logger prefixed with job parameters per job.
Expand Down
6 changes: 3 additions & 3 deletions lib/sqewer/extensions/appsignal_wrapper.rb
Expand Up @@ -7,17 +7,17 @@ class AppsignalWrapper
# Unserialize the job
def around_deserialization(serializer, msg_id, msg_payload)
return yield unless (defined?(Appsignal) && Appsignal.active?)

Appsignal.monitor_transaction('perform_job.demarshal',
:class => serializer.class.to_s, :params => {:recepit_handle => msg_id}, :method => 'deserialize') do
yield
end
end

# Run the job with Appsignal monitoring.
def around_execution(job, context)
return yield unless (defined?(Appsignal) && Appsignal.active?)

Appsignal.monitor_transaction('perform_job.sqewer',
:class => job.class.to_s, :params => job.to_h, :method => 'run') do |t|
context['appsignal.transaction'] = t
Expand Down
14 changes: 7 additions & 7 deletions lib/sqewer/middleware_stack.rb
@@ -1,18 +1,18 @@
# Allows arbitrary wrapping of the job deserialization and job execution procedures
class Sqewer::MiddlewareStack

# Returns the default middleware stack, which is empty (an instance of None).
#
# @return [MiddlewareStack] the default empty stack
def self.default
@instance ||= new
end

# Creates a new MiddlewareStack. Once created, handlers can be added using `:<<`
def initialize
@handlers = []
end

# Adds a handler. The handler should respond to :around_deserialization and #around_execution.
#
# @param handler[#around_deserializarion, #around_execution] The middleware item to insert
Expand All @@ -21,21 +21,21 @@ def <<(handler)
@handlers << handler
# TODO: cache the wrapping proc
end

def around_execution(job, context, &inner_block)
return yield if @handlers.empty?

responders = @handlers.select{|e| e.respond_to?(:around_execution) }
responders.reverse.inject(inner_block) {|outer_block, middleware_object|
->{
->{
middleware_object.public_send(:around_execution, job, context, &outer_block)
}
}.call
end

def around_deserialization(serializer, message_id, message_body, &inner_block)
return yield if @handlers.empty?

responders = @handlers.select{|e| e.respond_to?(:around_deserialization) }
responders.reverse.inject(inner_block) {|outer_block, middleware_object|
->{ middleware_object.public_send(:around_deserialization, serializer, message_id, message_body, &outer_block) }
Expand Down
2 changes: 1 addition & 1 deletion lib/sqewer/null_logger.rb
Expand Up @@ -4,6 +4,6 @@ module Sqewer::NullLogger
(Logger.instance_methods- Object.instance_methods).each do | null_method |
define_method(null_method){|*a| }
end

extend self
end

0 comments on commit 2b111c1

Please sign in to comment.