Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk processing for workers #82

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions lib/qu/backend/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require 'qu/batch_payload'
require 'qu/backend/wrapper'

module Qu
module Backend

# backend that takes messages in bulk for processing
class Batch
include Wrapper

def_delegators :@backend, :size, :clear, :push

def complete(payload)
@backend.batch_complete(payload.payloads)
end

def abort(payload)
@backend.batch_abort(payload.payloads)
end

def fail(payload)
@backend.batch_fail(payload.payloads)
end

def pop(queue_name = 'default')
payloads = @backend.batch_pop( queue_name, 10 ) # size should be configurable
if payloads && !payloads.empty?
result = payloads.group_by { |payload| payload.klass }.map do |klass,payloads|
Qu::BatchPayload.new( :queue => queue_name, :klass => klass, :payloads => payloads )
end

current = result.shift

result.each do |payload|
@backend.batch_push(payload.payloads)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we have 1 queue for 2 jobs, each job has 5 items and we do a pop for the batch backend, we end up with 5 of job 1 as a batch payload and job 2 gets pushed back on?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, I couldn't think of a better solution here. Given batches have specific usage, I'd say users shouldn't mix work here between two different jobs. I did it like this mostly so it won't lose data, but I think every batch job should get it's own queue.

end

current
end
end

end
end
end
17 changes: 2 additions & 15 deletions lib/qu/backend/instrumented.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
require 'forwardable'
require 'qu/backend/wrapper'

module Qu
module Backend
# Internal: Backend that wraps all backends with instrumentation.
class Instrumented < Base
extend Forwardable
include Wrapper
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this fine.

include Qu::Instrumenter

def self.wrap(backend)
if backend.nil?
backend
else
new(backend)
end
end

def_delegators :@backend, :connection, :connection=

def initialize(backend)
@backend = backend
end

def push(payload)
instrument("push.#{InstrumentationNamespace}") { |ipayload|
ipayload[:payload] = payload
Expand Down
88 changes: 88 additions & 0 deletions lib/qu/backend/spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ class CustomQueue < Qu::Job
queue :custom
end

class SimpleNumericJob < Qu::Job
attr_reader :numbers
def initialize(*numbers)
@numbers = numbers
end
end

class OtherNumericJob < Qu::Job
attr_reader :numbers
def initialize(*numbers)
@numbers = numbers
end
end

shared_examples_for 'a backend interface' do
let(:payload) { Qu::Payload.new(:klass => SimpleJob) }

Expand Down Expand Up @@ -175,3 +189,77 @@ class CustomQueue < Qu::Job
end
end
end

shared_examples_for 'a batch capable backend' do

def create_payloads(size)
(1..size).map do |number|
Qu::Payload.new( :klass => SimpleNumericJob, :queue => 'default', :args => number )
end
end

def push_messages(size)
payloads = create_payloads(size)
subject.batch_push(payloads)
end

describe 'pushing many messages' do

it 'should push them all at once' do
expect(subject.size).to eq(0)

numbers = create_payloads(10)

subject.batch_push( numbers )
expect(subject.size).to eq(10)
end

it 'should push and all messages' do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be something like it "should pop in batches" or something?

push_messages(10)

result = subject.batch_pop('default', 10).map { |payload| payload.args }

expect(result.sort).to eq((1..10).to_a)
expect(subject.size).to eq(0)
end

end

describe 'when completing many messages' do

it 'should complete all payloads' do
push_messages(10)

numbers = subject.batch_pop('default', 10)
expect(subject.messages_not_visible).to eq(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing a batch backend to know about the messages not visible doesn't seem like a good idea. It feels directly tied to SQS more than batch in general. I would focus more on the batch operations and find a way to test the overall behavior than introspect like this and force batch backends to be smarter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's too specific.


subject.batch_complete(numbers)
expect(subject.size).to eq(0)
expect(subject.messages_not_visible).to eq(0)
end

end

describe 'batch abort' do
it 'should abort all messages' do
push_messages(10)

numbers = subject.batch_pop('default', 10)
expect(subject.size).to eq(0)
subject.batch_abort(numbers)
expect(subject.size).to eq(10)
end
end

describe 'batch fail' do
it 'should fail all messages' do
push_messages(10)

numbers = subject.batch_pop('default', 10)
expect(subject.size).to eq(0)
subject.batch_abort(numbers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the previous 'batch abort' describe block, right? Typo or am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they both do the same thing, couldn't figure out a way to write tests for them that are not exactly the same, I think these should probably become one shared example.

expect(subject.size).to eq(10)
end
end

end
97 changes: 78 additions & 19 deletions lib/qu/backend/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,57 @@
module Qu
module Backend
class SQS < Base
def push(payload)
# id does not really matter for sqs as they have ids already so i'm just
# sending something relatively unique for errors and what not
payload.id = Digest::SHA1.hexdigest(payload.to_s + Time.now.to_s)

queue = begin
connection.queues.named(payload.queue)
rescue ::AWS::SQS::Errors::NonExistentQueue
connection.queues.create(payload.queue)
end

queue.send_message(dump(payload.attributes_for_push))
def push(payload)
message = find_or_create_queue(payload.queue).send_message(generate_dump(payload))
payload.message = message
payload
end

def batch_push(payloads)
map_by_queue(payloads) do |queue,group|
messages = group.map { |payload| generate_dump(payload) }
find_or_create_queue(queue).batch_send(*messages)
group
end.flatten
end

def complete(payload)
payload.message.delete if payload.message
end

def batch_complete(payloads)
map_by_queue(payloads) do |queue,group|
receipts = group.map { |payload| payload.message.handle }
connection.queues.named(queue).batch_delete(*receipts)
end
end

def abort(payload)
payload.message.visibility_timeout = 0 if payload.message
end

def fail(payload)
payload.message.visibility_timeout = 0 if payload.message
def batch_abort(payload)
map_by_queue(payload) do |queue,group|
connection.queues.named(queue).batch_change_visibility(0, *group.map(&:message))
end
end

alias fail abort
alias batch_fail batch_abort

def pop(queue_name = 'default')
begin
queue = connection.queues.named(queue_name)
create_payload(queue.receive_message)
rescue ::AWS::SQS::Errors::NonExistentQueue
end
end

if message = queue.receive_message
doc = load(message.body)
payload = Payload.new(doc)
payload.message = message
return payload
end
def batch_pop( queue_name = 'default', limit = 10 )
begin
queue = connection.queues.named(queue_name)
queue.receive_messages( :limit => limit ).map { |message| create_payload(message) }
rescue ::AWS::SQS::Errors::NonExistentQueue
end
end
Expand Down Expand Up @@ -73,6 +87,51 @@ def connection
@connection ||= ::AWS::SQS.new
end

# private api
def messages_not_visible(queue_name = 'default')
connection.queues.named(queue_name).approximate_number_of_messages_not_visible
end

private

def find_or_create_queue(queue_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. Would take this in a separate pull.

begin
connection.queues.named(queue_name)
rescue ::AWS::SQS::Errors::NonExistentQueue
connection.queues.create(queue_name)
end
end

def create_payload(message)
if message
doc = load(message.body)
payload = Payload.new(doc)
payload.message = message
payload
end
end

def generate_dump(payload)
dump(set_message_id(payload).attributes_for_push)
end

def map_by_queue( payloads )
return unless payloads
begin
payloads.group_by { |p| p.queue }.map do |queue,group|
yield(queue,group)
end
rescue ::AWS::SQS::Errors::NonExistentQueue
end
end

def set_message_id(payload)
# id does not really matter for sqs as they have ids already so i'm just
# sending something relatively unique for errors and what not
payload.id = Digest::SHA1.hexdigest(payload.to_s + Time.now.to_s)
payload
end

end
end
end
29 changes: 29 additions & 0 deletions lib/qu/backend/wrapper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module Qu
module Backend
module Wrapper
extend Forwardable
def_delegators :@backend, :connection, :connection=

def self.included( base )
base.extend(ClassMethods, Forwardable)
end

def initialize(backend)
@backend = backend
end

module ClassMethods

def wrap(backend)
if backend.nil?
backend
else
new(backend)
end
end

end

end
end
end
12 changes: 12 additions & 0 deletions lib/qu/batch_payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require 'qu/payload'

module Qu
class BatchPayload < Payload

def initialize( options )
super
self.args = self.payloads.map(&:args).flatten
end

end
end
Loading