Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

First working version with redis and mongo backends

  • Loading branch information...
commit 56fbcdc1a4adde691a5769d39485b5e4e885cda2 0 parents
@bkeepers authored
4 .gitignore
@@ -0,0 +1,4 @@
+*.gem
+.bundle
+Gemfile.lock
+pkg/*
19 Gemfile
@@ -0,0 +1,19 @@
+source "http://rubygems.org"
+gemspec :name => 'qu'
+
+group :mongo do
+ gemspec :name => 'qu-mongo', :development_group => :mongo
+end
+
+group :redis do
+ gemspec :name => 'qu-redis', :development_group => :redis
+end
+
+group :test do
+ gem 'SystemTimer', :platform => :mri_18
+ gem 'ruby-debug', :platform => :mri_18
+ gem 'rake'
+ gem 'rspec', '~> 2.0'
+ gem 'guard-rspec'
+ gem 'guard-bundler'
+end
12 Guardfile
@@ -0,0 +1,12 @@
+guard 'rspec', :version => 2 do
+ watch(%r{^spec/.+_spec\.rb$})
+ watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" }
+ watch(%r{^lib/qu/backend/spec\.rb$}) { |m| "spec/qu/backend" }
+ watch('spec/spec_helper.rb') { "spec" }
+ watch(%r{^spec/support/(.+)\.rb$}) { "spec" }
+end
+
+guard 'bundler' do
+ watch('Gemfile')
+ watch(/^.+\.gemspec/)
+end
47 README.md
@@ -0,0 +1,47 @@
+## Features
+
+* Multiple backends (redis, mongo, sql)
+* Resque-like API
+*
+
+## Installation
+
+ gem 'qu-redis'
+
+ Qu.configure do |c|
+ c.connection = Redis.new
+ c.fork = false
+ c.poll = 20 # for backends that need to poll instead of blocking
+ end
+
+## API
+
+ Qu.length('presentations')
+ Qu.work_off # work off all jobs until there are none
+ Qu.clear
+
+ class ProcessPresentation < Qu::Job.new(:presentation_id)
+ queue :mailers
+
+ def perform
+ presentation = Presentation.find(presentation_id)
+ # work here
+ end
+ end
+
+ job_id = Qu.enqueue ProcessPresentation, @presentation.id
+
+ ProcessPresentation.create(:presentation_id => 1)
+
+ Qu::Worker.new(*%w(presentations slides *)).start # or work_off
+
+## ToDo
+
+* worker.work
+* add job back on queue when worker dies
+* use queue specified in job class
+* configurable exception handling
+* callbacks (enqueue, process, error)
+* make poll timer configurable
+* logger
+* autoconfigure heroku connections
24 Rakefile
@@ -0,0 +1,24 @@
+require 'rspec/core/rake_task'
+
+desc "Run all specs"
+RSpec::Core::RakeTask.new(:spec) do |t|
+ t.rspec_opts = %w[--color]
+ t.verbose = false
+end
+
+namespace :spec do
+ Backends = %w(mongo redis)
+
+ Backends.each do |backend|
+ desc "Run specs for #{backend} backend"
+ RSpec::Core::RakeTask.new(backend) do |t|
+ t.rspec_opts = %w[--color]
+ t.verbose = false
+ t.pattern = "spec/qu/backend/#{backend}_spec.rb"
+ end
+ end
+
+ task :backends => Backends
+end
+
+task :default => :spec
4 lib/qu-mongo.rb
@@ -0,0 +1,4 @@
+require 'qu'
+require 'qu/backend/mongo'
+
+Qu.backend = Qu::Backend::Mongo.new
4 lib/qu-redis.rb
@@ -0,0 +1,4 @@
+require 'qu'
+require 'qu/backend/redis'
+
+Qu.backend = Qu::Backend::Redis.new
22 lib/qu.rb
@@ -0,0 +1,22 @@
+require 'qu/version'
+require 'qu/job'
+require 'qu/backend/base'
+
+require 'forwardable'
+
+module Qu
+ autoload :Worker, 'qu/worker'
+
+ extend SingleForwardable
+ extend self
+
+ def_delegators :backend, :enqueue, :length, :queues, :reserve
+
+ def backend=(backend)
+ @backend = backend
+ end
+
+ def backend
+ @backend ||= Backend::Redis.new
+ end
+end
18 lib/qu/backend/base.rb
@@ -0,0 +1,18 @@
+require 'multi_json'
+
+module Qu
+ module Backend
+ class Base
+
+ private
+
+ def encode(data)
+ MultiJson.encode(data)
+ end
+
+ def decode(data)
+ MultiJson.decode(data)
+ end
+ end
+ end
+end
68 lib/qu/backend/mongo.rb
@@ -0,0 +1,68 @@
+require 'mongo'
+
+module Qu
+ module Backend
+ class Mongo < Base
+ def database
+ @database ||= ::Mongo::Connection.new.db('qu')
+ end
+
+ def clear(queue = queues)
+ Array(queue).each do |q|
+ jobs(q).drop
+ self[:queues].remove({:name => q})
+ end
+ end
+
+ def queues
+ self[:queues].find.map {|doc| doc['name'] }
+ end
+
+ def length(queue)
+ jobs(queue).count
+ end
+
+ def enqueue(klass, *args)
+ id = BSON::ObjectId.new
+ jobs(klass.queue).insert({:_id => id, :class => klass.to_s, :args => args})
+ self[:queues].update({:name => klass.queue}, {:name => klass.queue}, :upsert => true)
+ id
+ end
+
+ def reserve(worker, options = {:block => true})
+ worker.queues.each do |queue|
+ begin
+ doc = jobs(queue).find_and_modify(:remove => true)
+ return Job.load(doc['_id'], doc['class'], doc['args'])
+ rescue ::Mongo::OperationFailure
+ # No jobs in the queue
+ end
+ end
+
+ if options[:block]
+ sleep 5
+ retry
+ end
+ end
+
+ def release(job)
+
+ end
+
+ def delete(job)
+
+ end
+
+
+ private
+
+ def jobs(queue)
+ self["queue:#{queue}"]
+ end
+
+ def [](name)
+ database["qu.#{name}"]
+ end
+ end
+ end
+end
58 lib/qu/backend/redis.rb
@@ -0,0 +1,58 @@
+require 'redis'
+
+module Qu
+ module Backend
+ class Redis < Base
+ def redis
+ @redis ||= ::Redis.connect
+ end
+
+ def enqueue(klass, *args)
+ data = encode('class' => klass.to_s, 'args' => args)
+ id = unique_id(data)
+ redis.set("job:#{id}", data)
+ redis.rpush("queue:#{klass.queue}", id)
+ redis.sadd('queues', klass.queue)
+ id
+ end
+
+ def length(queue)
+ redis.llen("queue:#{queue}")
+ end
+
+ def clear(queue = queues)
+ Array(queue).each do |q|
+ redis.srem('queues', q)
+ redis.del("queue:#{q}")
+ end
+ end
+
+ def queues
+ Array(redis.smembers('queues'))
+ end
+
+ def reserve(worker, options = {:block => true})
+ queues = worker.queues.map {|q| "queue:#{q}" }
+
+ if options[:block]
+ id = redis.blpop(*queues.push(0))[1]
+ else
+ queues.detect {|queue| id = redis.lpop(queue) }
+ end
+
+ if id
+ data = decode(redis.get("job:#{id}"))
+ redis.del("job:#{id}")
+ Job.load(id, data['class'], data['args'])
+ end
+ end
+
+ private
+
+ def unique_id(data)
+ Digest::MD5.hexdigest("#{Time.now.to_f} - #{rand} - #{data}")
+ end
+
+ end
+ end
+end
124 lib/qu/backend/spec.rb
@@ -0,0 +1,124 @@
+class SimpleJob < Qu::Job
+ def perform
+ end
+end
+
+class CustomQueue < Qu::Job
+ queue :custom
+end
+
+shared_examples_for 'a backend' do
+ let(:job) { SimpleJob }
+
+ before(:all) do
+ Qu.backend = described_class.new
+ end
+
+ before do
+ subject.clear
+ end
+
+ describe 'enqueue' do
+ it 'should return a job id' do
+ subject.enqueue(job).should_not be_nil
+ end
+
+ it 'should add a job to the queue' do
+ subject.enqueue(job)
+ subject.length(job.queue).should == 1
+ end
+
+ it 'should add queue to list of queues' do
+ subject.queues.should == []
+ subject.enqueue job
+ subject.queues.should == [job.queue]
+ end
+
+ it 'should assign a different job id for the same job enqueue multiple times' do
+ id = subject.enqueue(job)
+ subject.enqueue(job.clone).should_not == id
+ end
+ end
+
+ describe 'clear' do
+ it 'should clear jobs for given queue' do
+ subject.enqueue job
+ subject.length(job.queue).should == 1
+ subject.clear(job.queue)
+ subject.length(job.queue).should == 0
+ subject.queues.should_not include(job.queue)
+ end
+
+ it 'should not clear jobs for a different queue' do
+ subject.enqueue job
+ subject.clear('other')
+ subject.length(job.queue).should == 1
+ end
+
+ it 'should clear all queues without any args' do
+ subject.enqueue job
+ job.stub!(:queue).and_return('other')
+ subject.enqueue job
+ subject.length(job.queue).should == 1
+ subject.length('other').should == 1
+ subject.clear
+ subject.length(job.queue).should == 0
+ subject.length('other').should == 0
+ end
+ end
+
+ describe 'reserve' do
+ let(:worker) { Qu::Worker.new(job.queue) }
+
+ before do
+ @id = subject.enqueue job
+ end
+
+ it 'should return next job' do
+ subject.reserve(worker).id.should == @id
+ end
+
+ it 'should not return an already reserved job' do
+ another_job = SimpleJob
+ subject.enqueue another_job
+
+ subject.reserve(worker).id.should_not == subject.reserve(worker).id
+ end
+
+ it 'should return next job in given queues' do
+ subject.enqueue SimpleJob
+ job_id = subject.enqueue CustomQueue
+ subject.enqueue SimpleJob
+
+ worker = Qu::Worker.new('custom', 'default')
+
+ subject.reserve(worker).id.should == job_id
+ end
+
+ it 'should not return job from different queue' do
+ worker = Qu::Worker.new('video')
+ timeout { subject.reserve(worker) }.should be_nil
+ end
+
+ it 'should block by default if no jobs available' do
+ subject.clear
+ timeout(1) do
+ subject.reserve(worker)
+ fail("#reserve should block")
+ end
+ end
+
+ it 'should not block if :block option is set to false' do
+ timeout(1) do
+ subject.reserve(worker, :block => false)
+ true
+ end.should be_true
+ end
+
+ def timeout(count = 0.1, &block)
+ SystemTimer.timeout(count, &block)
+ rescue Timeout::Error
+ nil
+ end
+ end
+end
38 lib/qu/job.rb
@@ -0,0 +1,38 @@
+module Qu
+ class Job
+ attr_accessor :id
+
+ def self.queue(name = nil)
+ @queue = name.to_s if name
+ @queue ||= 'default'
+ end
+
+ def self.with(*params)
+ Class.new(Qu::Job) do
+ attr_reader *params
+
+ class_eval <<-EOF, __FILE__, __LINE__
+ def initialize(#{params.join(', ')})
+ #{params.map {|p| "@#{p}"}.join(', ')} = #{params.join(', ')}
+ end
+ EOF
+ end
+ end
+
+ def self.load(id, class_name, args)
+ constantize(class_name).new(*args).tap {|job| job.id = id }
+ end
+
+ protected
+
+ def self.constantize(class_name)
+ constant = Object
+ class_name.split('::').each do |name|
+ constant = constant.const_get(name) || constant.const_missing(name)
+ end
+ constant
+ end
+
+
+ end
+end
3  lib/qu/version.rb
@@ -0,0 +1,3 @@
+module Qu
+ VERSION = "0.1.0"
+end
22 lib/qu/worker.rb
@@ -0,0 +1,22 @@
+module Qu
+ class Worker
+ def initialize(*queues)
+ @queues = queues.flatten
+ end
+
+ def queues
+ @queues.map {|q| q == '*' ? Qu.queues.sort : q }.flatten.uniq
+ end
+
+ def work_off
+ while job = Qu.reserve(self, :block => false)
+ job.perform
+ end
+ end
+
+ def work
+ job = Qu.reserve(self)
+ job.perform
+ end
+ end
+end
19 qu-mongo.gemspec
@@ -0,0 +1,19 @@
+# -*- encoding: utf-8 -*-
+$:.push File.expand_path("../lib", __FILE__)
+require "qu/version"
+
+Gem::Specification.new do |s|
+ s.name = "qu-mongo"
+ s.version = Qu::VERSION
+ s.authors = ["Brandon Keepers"]
+ s.email = ["brandon@opensoul.org"]
+ s.homepage = "http://github.com/bkeepers/qu"
+ s.summary = "Mongo backend for qu"
+ s.description = "Mongo backend for qu"
+
+ s.files = `git ls-files -- lib | grep mongo`.split("\n")
+ s.require_paths = ["lib"]
+
+ s.add_dependency 'mongo'
+ s.add_dependency 'qu', Qu::VERSION
+end
19 qu-redis.gemspec
@@ -0,0 +1,19 @@
+# -*- encoding: utf-8 -*-
+$:.push File.expand_path("../lib", __FILE__)
+require "qu/version"
+
+Gem::Specification.new do |s|
+ s.name = "qu-redis"
+ s.version = Qu::VERSION
+ s.authors = ["Brandon Keepers"]
+ s.email = ["brandon@opensoul.org"]
+ s.homepage = "http://github.com/bkeepers/qu"
+ s.summary = "Redis backend for qu"
+ s.description = "Redis backend for qu"
+
+ s.files = `git ls-files -- lib | grep redis`.split("\n")
+ s.require_paths = ["lib"]
+
+ s.add_dependency 'redis'
+ s.add_dependency 'qu', Qu::VERSION
+end
20 qu.gemspec
@@ -0,0 +1,20 @@
+# -*- encoding: utf-8 -*-
+$:.push File.expand_path("../lib", __FILE__)
+require "qu/version"
+
+Gem::Specification.new do |s|
+ s.name = "qu"
+ s.version = Qu::VERSION
+ s.authors = ["Brandon Keepers"]
+ s.email = ["brandon@opensoul.org"]
+ s.homepage = ""
+ s.summary = %q{}
+ s.description = %q{}
+
+ s.files = `git ls-files | grep -v 'redis\|mongo'`.split("\n")
+ s.test_files = `git ls-files -- spec | grep -v 'redis\|mongo'`.split("\n")
+ s.executables = `git ls-files -- bin`.split("\n").map{ |f| File.basename(f) }
+ s.require_paths = ["lib"]
+
+ s.add_dependency 'multi_json'
+end
6 spec/qu/backend/mongo_spec.rb
@@ -0,0 +1,6 @@
+require 'spec_helper'
+require 'qu-mongo'
+
+describe Qu::Backend::Mongo do
+ it_should_behave_like 'a backend'
+end
6 spec/qu/backend/redis_spec.rb
@@ -0,0 +1,6 @@
+require 'spec_helper'
+require 'qu-redis'
+
+describe Qu::Backend::Redis do
+ it_should_behave_like 'a backend'
+end
72 spec/qu/job_spec.rb
@@ -0,0 +1,72 @@
+require 'spec_helper'
+
+describe Qu::Job do
+ class MyJob < Qu::Job.with(:arg)
+ queue :custom
+ end
+
+ describe '.queue' do
+ it 'should default to "default"' do
+ Qu::Job.queue.should == 'default'
+ end
+
+ it 'should allow setting a queue' do
+ MyJob.queue.should == 'custom'
+ end
+ end
+
+ describe '.with' do
+ subject { Qu::Job.with(:arg1, :arg2) }
+
+ it 'should create a subclass of job' do
+ subject.should < Qu::Job
+ end
+
+ it 'should define attr readers for args' do
+ lambda { subject.instance_method('arg1') }.should_not raise_error(NameError)
+ lambda { subject.instance_method('arg2') }.should_not raise_error(NameError)
+ end
+
+ it 'should define an initializer for the args' do
+ subject.instance_method('initialize').arity.should == 2
+ subject.new('a', 'b').arg1.should == 'a'
+ end
+ end
+
+ describe '.load' do
+ it 'should initialize a job of the given class' do
+ Qu::Job.load('1', 'MyJob', [1]).should be_instance_of(MyJob)
+ end
+
+ it 'should find namespaced jobs' do
+ Qu::Job.load('1', 'Qu::Job', []).should be_instance_of(Qu::Job)
+ end
+
+ it 'should set job id' do
+ Qu::Job.load('987', 'MyJob', [1]).id.should == '987'
+ end
+
+ it 'should initialize with with args' do
+ Qu::Job.load('1', 'MyJob', ['a']).arg.should == 'a'
+ end
+ end
+
+ # describe 'encode' do
+ # it 'should json encode the class and args' do
+ # MultiJson.decode(subject.encode).should == {'klass' => 'SimpleJob', 'args' => [1, 2]}
+ # end
+ # end
+ #
+ # describe 'attributes' do
+ # it 'should return a hash of attributes' do
+ # subject.attributes.should == {'klass' => 'SimpleJob', 'args' => [1, 2]}
+ # end
+ # end
+ #
+ # describe 'perform' do
+ # it 'should call .perform on the class' do
+ # SimpleJob.should_receive(:perform).with(1, 2)
+ # subject.perform
+ # end
+ # end
+end
46 spec/qu/worker_spec.rb
@@ -0,0 +1,46 @@
+require 'spec_helper'
+
+describe Qu::Worker do
+ let(:job) { SimpleJob.new }
+
+ describe 'queues' do
+ before do
+ Qu.stub!(:queues).and_return(%w(c a b))
+ end
+
+ it 'should use all queues from backend with an asterisk' do
+ Qu::Worker.new('*').queues.should == %w(a b c)
+ end
+
+ it 'should append other queues with an asterisk' do
+ Qu::Worker.new('b', '*').queues.should == %w(b a c)
+ end
+
+ it 'should properly handle queues passed as an array to the initializer' do
+ Qu::Worker.new(%w(b *)).queues.should == %w(b a c)
+ end
+ end
+
+ describe 'work' do
+ before do
+ Qu.stub!(:reserve).and_return(job)
+ end
+
+ it 'should reserve a job' do
+ Qu.should_receive(:reserve).with(subject)
+ subject.work
+ end
+
+ it 'should perform the job' do
+ job.should_receive(:perform)
+ subject.work
+ end
+ end
+
+ describe 'work_off' do
+ it 'should work all jobs off the queue' do
+ Qu.should_receive(:reserve).with(subject, :block => false).and_return(job, job, job, nil)
+ subject.work_off
+ end
+ end
+end
10 spec/qu_spec.rb
@@ -0,0 +1,10 @@
+require 'spec_helper'
+
+describe Qu do
+ describe 'enqueue' do
+ it 'should call enqueue on the backend with a job' do
+ Qu.backend.should_receive(:enqueue).with(SimpleJob, 'a', 'b')
+ Qu.enqueue SimpleJob, 'a', 'b'
+ end
+ end
+end
4 spec/spec_helper.rb
@@ -0,0 +1,4 @@
+require 'bundler'
+Bundler.require :default, :test
+require 'qu'
+require 'qu/backend/spec'
Please sign in to comment.
Something went wrong with that request. Please try again.