Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ERP-121 Provide a DSL to specify routing key and queue_name. Deprecat…

…e the constants.
  • Loading branch information...
commit 6099cc93da51f43e5153552186021a72ee7690a3 1 parent 12c8d15
@tusharr tusharr authored
View
24 README.md
@@ -56,8 +56,8 @@ You can declare a subscriber class as follows
```ruby
class UserWelcomeSubscriber < HonestPubsub::Server::ClientWorker
- REQUEST_KEY = "user_created" # The message prefix that you're subscribing to
- QUEUE = "user_welcome_emails" # The queue needs to be unique per subscriber
+ subscribe_to "user_created" # The message prefix that you're subscribing to
+ # subscribe_to "user_created", on: 'welcome_emails_queue' # If you want to specify the queue name
def perform(context, payload)
# context is a hash that was put into HonestPubsub::Context
@@ -67,24 +67,10 @@ class UserWelcomeSubscriber < HonestPubsub::Server::ClientWorker
end
```
-You also need a subscriber server to run the subscriber classes
+You also run subscribers in a separate process to consume the message using the provided executable
+`bundle exec start_subscribers`
-
-`bin/subscribers`
-
-```ruby
-#!/usr/bin/env ruby
-require 'rubygems'
-require File.expand_path('../../config/boot.rb', __FILE__)
-require File.expand_path('../../config/application', __FILE__)
-require 'honest_pubsub/server'
-require_relative '../app/subscribers/user_welcome_subscriber'
-server = ::HonestPubsub::Server::SubscriberServer.new(ARGV)
-server.start
-```
-
-and execute it from the command line as follows:
-`bin/subscriber user_welcome_subscriber`
+You can execute `bundle exec start_subscribers --help` to see all the various options that it providers
## Contributing
View
2  lib/honest_pubsub/configuration.rb
@@ -31,7 +31,7 @@ def self.application_name
@application_name ||= if defined?(Rails)
Rails.application.class.name.to_s.gsub("::Application", '')
else
- $0.to_s
+ 'honest_pubsub'
end.downcase
end
end
View
26 lib/honest_pubsub/server/client_worker.rb
@@ -5,13 +5,25 @@ module HonestPubsub
module Server
class ClientWorker
@@registered_subscribers = []
- class_attribute :payload_validators, :error_handlers
+ class_attribute :payload_validators, :error_handlers, :subscribed_key, :subscribed_queue
attr_accessor :delivery_routing_data, :delivery_properties
def self.inherited(klass)
@@registered_subscribers << klass
end
+ # Specify the routing key that the subscriber class should listen to.
+ # @param [String] routing_key_name The routing key to subscribe to. Must be characters only separated by periods (.)
+ # @param [Hash] options Allowed option is :on to optionally specify a queue. If not provided, queue name is generated from the routing key
+ def self.subscribe_to(routing_key_name, options = {})
+ options.assert_valid_keys(:on)
+ unless validate_routing_key_name(routing_key_name)
+ raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.")
+ end
+ self.subscribed_key = routing_key_name
+ self.subscribed_queue = generated_queue_name(routing_key_name, options[:on])
+ end
+
# Sets the validator for payload
#
# @param validator The validator to use for validating the payload.
@@ -54,6 +66,18 @@ def valid_payload?(payload)
}
end
+ private
+
+ def self.validate_routing_key_name(key)
+ return true if key.blank?
+ key.match(/\A([a-z]+\.?)*([a-z]+)\Z/).present?
+ end
+
+ def self.generated_queue_name(routing_key, queue_name)
+ return queue_name if queue_name.present?
+ [ HonestPubsub::Configuration.application_name.to_s.gsub(/[^\w\_]/, ''), routing_key.gsub(".", '_') ].reject(&:blank?).join('_')
+ end
+
end
end
end
View
45 lib/honest_pubsub/server/subscriber_server.rb
@@ -18,7 +18,7 @@ class SubscriberServer
include Celluloid
def initialize(subscribers)
- @workers = subscribers.map { |subscriber| create_worker(subscriber) }
+ @workers = subscribers.map { |subscriber| create_queue_listeners(subscriber) }
end
def start
@@ -29,8 +29,8 @@ def start
worker.start
end
- thread = Thread.current
- interrupts = ["HUP","INT", "QUIT","ABRT","TERM"]
+ thread = Thread.current
+ interrupts = ["HUP", "INT", "QUIT", "ABRT", "TERM"]
interrupts.each do |signal_name|
Signal.trap(signal_name) {
puts "Processing #{signal_name}"
@@ -47,7 +47,7 @@ def start
begin
STDOUT.puts "Tearing down subscriber for #{worker.worker_class.name}"
worker.shutdown
- rescue =>e
+ rescue => e
::HonestPubsub::Logger.new.log_service("all_services", :warn, "#{worker.worker_class.name} - did not tear down correctly. Error - #{e.message}")
end
end
@@ -57,9 +57,40 @@ def start
private
- def create_worker(subscriber)
- STDOUT.puts "Setting up request_key: #{subscriber::REQUEST_KEY} and queue:#{subscriber::QUEUE}"
- ClientQueueListener.new(subscriber, subscriber::REQUEST_KEY, subscriber::QUEUE)
+ def warn(message)
+ old_behavior = ActiveSupport::Deprecation.behavior
+ ActiveSupport::Deprecation.behavior = [:stderr, :log]
+ ActiveSupport::Deprecation.warn(message)
+ ensure
+ ActiveSupport::Deprecation.behavior = old_behavior
+ end
+
+ def create_queue_listeners(subscriber)
+ routing_key = subscriber.subscribed_key
+ subscribed_queue_name = subscriber.subscribed_queue
+
+ if routing_key.blank? && subscriber.const_defined?(:REQUEST_KEY)
+ routing_key = subscriber::REQUEST_KEY
+ warn("Declaring #{subscriber.name}::REQUEST_KEY for routing key is deprecated. Use subscribe_to routing_key.")
+ end
+
+ if subscribed_queue_name.blank? && subscriber.const_defined?(:QUEUE)
+ subscribed_queue_name = subscriber::QUEUE
+ warn("Declaring #{subscriber.name}::QUEUE for queue name is deprecated. Use subscribe_to routing_key, on: queue_name. #{subscriber.name}")
+ end
+
+
+
+ if routing_key.blank?
+ raise ArgumentError.new("Routing key must be provided in #{subscriber.name} using subscribe_to")
+ end
+
+ if subscribed_queue_name.blank?
+ raise ArgumentError.new("Queue Name must be provided in #{subscriber.name} using subscribe_to")
+ end
+
+ STDOUT.puts "Setting up listener for request_key: #{routing_key} and queue:#{subscribed_queue_name}"
+ ClientQueueListener.new(subscriber, routing_key, subscribed_queue_name)
end
end
end
View
61 spec/honest_pubsub/server/client_worker_spec.rb
@@ -1,6 +1,9 @@
require 'spec_helper'
describe HonestPubsub::Server::ClientWorker do
+ before(:each) { Object.const_set :Klass, Class.new(HonestPubsub::Server::ClientWorker) }
+ after(:each) { Object.send :remove_const, :Klass }
+
describe "#routing_key" do
subject { HonestPubsub::Server::ClientWorker.new(routing_data, properties).routing_key }
@@ -15,16 +18,11 @@
describe "self.validate_payload_with" do
before do
- Object.const_set :Klass, Class.new(HonestPubsub::Server::ClientWorker)
validators.each do |validator|
Klass.validates_payload_with validator
end
end
- after do
- Object.send :remove_const, :Klass
- end
-
context "One validator" do
let(:validators){ [lambda{|payload| true}] }
it {
@@ -41,16 +39,11 @@
subject { Klass.new({},{}).valid_payload?({}) }
before do
- Object.const_set :Klass, Class.new(HonestPubsub::Server::ClientWorker)
validators.each do |validator|
Klass.validates_payload_with validator
end
end
- after do
- Object.send :remove_const, :Klass
- end
-
context "Two validators returning true" do
let(:validators){ [lambda{|payload| true}, lambda{|payload| true}] }
it { should be true }
@@ -66,4 +59,52 @@
it { should be true }
end
end
+
+ describe "#validate_routing_key_name" do
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef')).to eq(true) }
+ it { expect(Klass.send(:validate_routing_key_name, '')).to eq(true) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef.abcdef')).to eq(true) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef.abcdef.asd')).to eq(true) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef.abcdef/')).to eq(false) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef.abcdef.a123')).to eq(false) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcdef.')).to eq(false) }
+ it { expect(Klass.send(:validate_routing_key_name, 'abcAf')).to eq(false) }
+ end
+
+ describe "#generated_queue_name" do
+ before(:each) { allow(HonestPubsub::Configuration).to receive(:application_name).and_return("app_name")}
+
+ it { expect(Klass.send(:generated_queue_name, 'abcdef', 'queue_name')).to eq('queue_name') }
+ it { expect(Klass.send(:generated_queue_name, 'abcdef', nil)).to eq('app_name_abcdef') }
+ it { expect(Klass.send(:generated_queue_name, 'abcdef.abcd', nil)).to eq('app_name_abcdef_abcd') }
+ it { expect(Klass.send(:generated_queue_name, 'a.b.c', nil)).to eq('app_name_a_b_c') }
+ end
+
+ describe '.subscribe_to' do
+ before(:each) { allow(HonestPubsub::Configuration).to receive(:application_name).and_return("app_name")}
+ context 'routing_key without queue_name' do
+ before { Klass.subscribe_to 'a.b.c' }
+ it 'sets the routing key and queue name' do
+ expect(Klass.subscribed_key).to eq('a.b.c')
@ricaurte Owner

I like the above tests, but these are doing multiple expects in an it again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ expect(Klass.subscribed_queue).to eq('app_name_a_b_c')
+ end
+ end
+
+ context 'routing_key with queue_name' do
+ before { Klass.subscribe_to 'a.b.c', on: 'foo_bar' }
+ it 'sets the routing key and queue name' do
+ expect(Klass.subscribed_key).to eq('a.b.c')
+ expect(Klass.subscribed_queue).to eq('foo_bar')
+ end
+ end
+
+ context 'invalid routing key' do
+ before { }
+ it 'sets the routing key and queue name' do
+ expect {
+ Klass.subscribe_to 'a.b.c.', on: 'foo_bar'
+ }.to raise_error(ArgumentError)
+ end
+ end
+ end
end
View
1  spec/spec_helper.rb
@@ -30,6 +30,7 @@
# Some test subscriber classes to make testing easier
class MyTestSubscriber1 < HonestPubsub::Server::ClientWorker
+
def perform(payload)
end
Please sign in to comment.
Something went wrong with that request. Please try again.