Skip to content

Commit

Permalink
reorg and cleaning up API
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanmoran committed Apr 10, 2011
1 parent 2894fc9 commit e76d4de
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 100 deletions.
2 changes: 1 addition & 1 deletion examples/resque-pubsub.rb
Expand Up @@ -4,7 +4,7 @@
rails_env = ENV['RAILS_ENV'] || 'development'

REDIS_CONFIG = YAML.load_file(rails_root + '/config/redis.yml')[rails_env]
Resque.redis = REDIS_CONFIG["host"] + ':' + REDIS_CONFIG["port"].to_s
Resque.redis = REDIS_CONFIG['host'] + ':' + REDIS_CONFIG["port"].to_s

namespace = ENV['RESQUE_NAMESPACE'] || 'mynamespace' # Edit this to use a different namespace for Resque in the app
Resque.redis.namespace = namespace
Expand Down
8 changes: 2 additions & 6 deletions lib/resque-pubsub.rb
@@ -1,6 +1,2 @@
require 'resque/plugins/pubsub/subscriber'
require 'resque/plugins/pubsub/publisher'
require 'resque/plugins/pubsub/broker'
self.send(:include, Resque::Plugins::Pubsub::Subscriber)
self.send(:include, Resque::Plugins::Pubsub::Publisher)

require 'resque'
Dir.glob(File.expand_path('resque/plugins/pubsub/*', File.dirname(__FILE__))).each { |filename| require filename }
4 changes: 2 additions & 2 deletions lib/resque/plugins/pubsub/broker.rb
Expand Up @@ -17,9 +17,9 @@ def self.perform(topic, message)
subscribers = Exchange.redis.smembers("#{topic}_subscribers")
subscribers.each do |s|
sinfo = JSON.parse(s)
puts "distributing to #{sinfo.inspect}"
puts "[Broker] distributing to #{sinfo.inspect}"
Broker.redis.sadd("#{sinfo['namespace']}:queues", "fanout:#{topic}")
Broker.redis.rpush("#{sinfo['namespace']}:queue:fanout:#{topic}", {:class=> sinfo["class"], :args=>[message]}.to_json)
Broker.redis.rpush("#{sinfo['namespace']}:queue:fanout:#{topic}", { :class => sinfo['class'], :args => [message] }.to_json)
end
end

Expand Down
20 changes: 9 additions & 11 deletions lib/resque/plugins/pubsub/exchange.rb
@@ -1,4 +1,3 @@
require 'resque'
module Resque
module Plugins
module Pubsub
Expand All @@ -11,27 +10,26 @@ def self.redis
return @redis if @redis
client_to_copy = Resque.redis.client
redis_new = Redis.new(:host => client_to_copy.host, :port => client_to_copy.port, :thread_safe => true, :db => client_to_copy.db)
puts "making a redis in exchange, namespace will be #{@@pubsub_namespace}"
@redis = Redis::Namespace.new(@@pubsub_namespace || "resque:pubsub", :redis => redis_new)
puts "[Exchange] making a redis in exchange, namespace will be #{@@pubsub_namespace}"
@redis = Redis::Namespace.new(@@pubsub_namespace || 'resque:pubsub', :redis => redis_new)
end

@queue = :subscription_requests

def self.perform(subscription_info)
puts "handling a subscription on the exchange"
puts "requested subscription is #{subscription_info.inspect}"
puts "namespace is #{Exchange.redis.namespace}"
redis = Exchange.redis
redis.sadd("#{subscription_info["topic"]}_subscribers", { :class => subscription_info["class"], :namespace => subscription_info["namespace"] }.to_json)
puts '[Exchange] handling a subscription on the exchange'
puts "[Exchange] requested subscription is #{subscription_info.inspect}"
puts "[Exchange] namespace is #{Exchange.redis.namespace}"
Exchange.redis.sadd("#{subscription_info["topic"]}_subscribers", { :class => subscription_info['class'], :namespace => subscription_info['namespace'] }.to_json)
end

def self.pubsub_namespace
@@pubsub_namespace
end

def self.pubsub_namespace=(n)
@@pubsub_namespace = n
@redis.client.disconnect
def self.pubsub_namespace=(namespace)
@@pubsub_namespace = namespace
@redis.client.disconnect if @redis
@redis = nil
end

Expand Down
9 changes: 5 additions & 4 deletions lib/resque/plugins/pubsub/publisher.rb
@@ -1,4 +1,3 @@
require 'resque/plugins/pubsub/exchange'
module Resque
module Plugins
module Pubsub
Expand All @@ -9,11 +8,13 @@ def self.included(base)
end

module InstanceMethods

def publish(topic, message)
puts "Publisher publishing #{message} in #{topic}"
Exchange.redis.sadd(:queues, "messages")
Exchange.redis.rpush("queue:messages", { :class => 'Resque::Plugins::Pubsub::Broker', :args => [topic, message] }.to_json)
puts "[#{self.class.to_s}] publishing #{message} in #{topic}"
Exchange.redis.sadd(:queues, 'messages')
Exchange.redis.rpush('queue:messages', { :class => 'Resque::Plugins::Pubsub::Broker', :args => [topic, message] }.to_json)
end

end

end
Expand Down
9 changes: 5 additions & 4 deletions lib/resque/plugins/pubsub/subscriber.rb
@@ -1,29 +1,30 @@
require 'resque/plugins/pubsub/exchange'
module Resque
module Plugins
module Pubsub
module Subscriber

def self.included(base)
base.extend ClassMethods
base.send(:extend, ClassMethods)
end

module ClassMethods

def subscribe(topic, options={})
@queue = "fanout:#{topic}"
reader_method = options[:reader_method] || "read_#{topic}_message"
module_eval <<-"end_eval"
def self.perform(message)
self.send("#{reader_method}", message)
self.send("#{reader_method.to_s}", message)
end
end_eval
options[:namespace] = Resque.redis.namespace
options[:topic] = topic
options[:class] = self.to_s
puts "Subscriber subscribing with #{options.inspect}"
puts "[#{self.to_s}] subscribing with #{options.inspect}"
Exchange.redis.sadd(:queues, :subscription_requests)
Exchange.redis.rpush("queue:subscription_requests", { :class => 'Resque::Plugins::Pubsub::Exchange', :args => [options] }.to_json)
end

end

end
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/test_custom_subscriber.rb
@@ -0,0 +1,17 @@
class TestCustomSubscriber
include Resque::Plugins::Pubsub::Subscriber

subscribe 'test_custom_topic', :reader_method => :simple

@@last_message = nil

def self.simple(message)
puts "[#{self.to_s}] got test custom topic message: #{message.inspect}"
@@last_message = message
end

def self.last_message
@@last_message
end

end
3 changes: 3 additions & 0 deletions test/fixtures/test_publisher.rb
@@ -0,0 +1,3 @@
class TestPublisher
include Resque::Plugins::Pubsub::Publisher
end
14 changes: 7 additions & 7 deletions test/test_subscriber.rb → test/fixtures/test_subscriber.rb
@@ -1,17 +1,17 @@
class TestSubscriber
require 'resque-pubsub'
include Resque::Plugins::Pubsub::Subscriber

subscribe 'test_topic'

@@last_message = nil

def self.read_test_topic_message(message)
puts "got test topic message: #{message.inspect}"
puts "[#{self.to_s}] got test topic message: #{message.inspect}"
@@last_message = message
end

def self.last_message
@@last_message
end

end
64 changes: 27 additions & 37 deletions test/pubsub_test.rb
Expand Up @@ -4,6 +4,7 @@ class PubsubTest < Test::Unit::TestCase

def setup
$success = $lock_failed = $lock_expired = 0
Resque.redis.namespace = nil
Resque.redis.flushall
Resque.redis.namespace = 'test_pubsub'
end
Expand All @@ -15,71 +16,60 @@ def test_lint
end

def test_all
# Just loading the "TestSubscriber" class is enough to subscribe
require 'test_publisher'
require 'test_subscriber'
TestSubscriber.subscribe('test_topic')
# Now process the subscription
Resque.redis.namespace = 'resque:pubsub'
@worker = Resque::Worker.new(:subscription_requests)
@worker.process
Resque::Worker.new(:subscription_requests).process
# Check that the subscription is in the subscribers list
assert subscription_exists(Resque.redis.smembers("test_topic_subscribers"), "TestSubscriber", "test_pubsub")
assert_equal true, subscription_exists(Resque.redis.smembers('test_topic_subscribers'), 'TestSubscriber', 'test_pubsub')

p = TestPublisher.new
p.publish("test_topic", "Test message")
p.publish('test_topic', 'Test message')
# Run Resque for the broker
Resque.redis.namespace = 'resque:pubsub'
@worker = Resque::Worker.new(:messages)
@worker.process
Resque::Worker.new(:messages).process
# Check that the message queue has been populated
Resque.redis.namespace = 'test_pubsub'
assert Resque.redis.keys.include?('queue:fanout:test_topic')
assert Resque.redis.llen('queue:fanout:test_topic') == 1
assert_equal true, Resque.redis.keys.include?('queue:fanout:test_topic')
assert_equal 1, Resque.redis.llen('queue:fanout:test_topic')

# Now run the subscriber Resque
@worker = Resque::Worker.new('fanout:test_topic')
@worker.process
assert Resque.redis.llen('fanout:test_topic') == 0
assert TestSubscriber.last_message == 'Test message'
Resque::Worker.new('fanout:test_topic').process
assert_equal 0, Resque.redis.llen('queue:fanout:test_topic')
assert_equal 'Test message', TestSubscriber.last_message
end

def test_configuration_options
# Configure the pubsub namespace
require 'resque-pubsub'
Resque::Plugins::Pubsub::Exchange.pubsub_namespace = 'resque:custom_space'
puts "namespace is set to #{Resque::Plugins::Pubsub::Exchange.pubsub_namespace}"
require 'test_publisher'
require 'test_subscriber'
require 'test_subscriber_custom'
TestCustomSubscriber.subscribe('test_custom_topic', :reader_method => :simple)
# Now process the subscription
Resque.redis.namespace = 'resque:custom_space'
@worker = Resque::Worker.new(:subscription_requests)
@worker.process
Resque::Worker.new(:subscription_requests).process
# Check that the subscription is in the subscribers list
assert subscription_exists(Resque.redis.smembers("test_custom_topic_subscribers"), "TestSubscriberCustom", "test_pubsub")
p = TestPublisher.new
p.publish("test_custom_topic", "Test custom message")
assert_equal true, subscription_exists(Resque.redis.smembers('test_custom_topic_subscribers'), 'TestCustomSubscriber', 'test_pubsub')

TestPublisher.new.publish('test_custom_topic', 'Test custom message')
# Run Resque for the broker
Resque.redis.namespace = 'resque:custom_space'
@worker = Resque::Worker.new(:messages)
@worker.process
Resque::Worker.new(:messages).process
# Check that the message queue has been populated
Resque.redis.namespace = 'test_pubsub'
assert Resque.redis.keys.include?('queue:fanout:test_custom_topic')
assert Resque.redis.llen('queue:fanout:test_custom_topic') == 1
assert_equal true, Resque.redis.keys.include?('queue:fanout:test_custom_topic')
assert_equal 1, Resque.redis.llen('queue:fanout:test_custom_topic')

# Now run the subscriber Resque
@worker = Resque::Worker.new('fanout:test_custom_topic')
@worker.process
assert Resque.redis.llen('fanout:test_custom_topic') == 0
assert TestSubscriberCustom.last_message == 'Test custom message'
# Also make sure TestSubscriber DIDN'T get the message
assert TestSubscriber.last_message != 'Test custom message'
Resque::Worker.new('fanout:test_custom_topic').process
assert_equal 0, Resque.redis.llen('queue:fanout:test_custom_topic')
assert_equal 'Test custom message', TestCustomSubscriber.last_message
assert_not_equal 'Test custom message', TestSubscriber.last_message
end

private

def subscription_exists(subscribers, klass, namespace)
subscribers.inject(false) do |result, s|
sinfo = JSON.parse(s)
result = result || (sinfo["class"] == klass && sinfo['namespace'] == namespace)
result = result || (sinfo['class'] == klass && sinfo['namespace'] == namespace)
end
end

Expand Down
15 changes: 8 additions & 7 deletions test/test_helper.rb
Expand Up @@ -10,15 +10,13 @@

require 'resque-pubsub'

##
# make sure we can run redis
if !system("which redis-server")
if !system('which redis-server')
puts '', "** can't find `redis-server` in your path"
puts "** try running `sudo rake install`"
abort ''
end

##
# start our own redis when the tests start,
# kill it when they end
at_exit do
Expand All @@ -31,12 +29,15 @@
end

pid = `ps -e -o pid,command | grep [r]edis-test`.split(" ")[0]
puts "Killing test redis server..."
puts 'Killing test redis server...'
`rm -f #{dir}/dump.rdb`
Process.kill("KILL", pid.to_i)
Process.kill('KILL', pid.to_i)
exit exit_code
end

puts "Starting redis for testing at localhost:9736..."
puts 'Starting redis for testing at localhost:9736...'
`redis-server #{dir}/redis-test.conf`
Resque.redis = '127.0.0.1:9736'
Resque.redis = '127.0.0.1:9736'
Resque.redis.namespace = 'test_pubsub'

Dir.glob(File.expand_path(dir + '/fixtures/*')).each { |filename| require filename }
4 changes: 0 additions & 4 deletions test/test_publisher.rb

This file was deleted.

17 changes: 0 additions & 17 deletions test/test_subscriber_custom.rb

This file was deleted.

0 comments on commit e76d4de

Please sign in to comment.