Skip to content

Commit

Permalink
Clean up examples and preliminary publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
bpardee committed Oct 31, 2012
1 parent 0305f40 commit b0b9ca0
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 44 deletions.
6 changes: 6 additions & 0 deletions History.md
@@ -1,6 +1,12 @@
Changelog
=====================

0.0.2
-----

- Fix timestamp
- More documentation

0.0.1
-----

Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -18,7 +18,7 @@ Simple example for synchronous message handling:
```
require 'nsq'
reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')
reader = NSQ::Reader.new(:nsqd_tcp_addresses => '127.0.0.1:4150')
# Subscribe to topic=test channel=simple
reader.subscribe('test', 'simple') do |message|
# If this block raises an exception, then the message will be requeued.
Expand All @@ -36,7 +36,7 @@ foo_worker_count = 50
bar_worker_count = 30
baz_worker_count = 20
reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')
reader = NSQ::Reader.new(:nsqd_tcp_addresses => '127.0.0.1:4150')
foo_subscriber = reader.subscribe('test', 'foo', :max_in_flight => foo_worker_count)
bar_subscriber = reader.subscribe('test2', 'bar', :max_in_flight => bar_worker_count)
Expand Down
6 changes: 6 additions & 0 deletions examples/async/README
@@ -0,0 +1,6 @@
# Start up reader
bundle exec ./reader.rb
Hit <RETURN> to initialize the Reader and <RETURN> again when you want to stop the reader

# Run publisher with no arguments to get usage info:
bundle exec ./publisher.rb
21 changes: 21 additions & 0 deletions examples/async/publisher.rb
@@ -0,0 +1,21 @@
#!/usr/bin/env ruby

require 'nsq'

if ARGV.length != 3
$stderr.puts "bundle exec ./publisher.rb <topic> <count> <eval-string>"
$stderr.puts " where <topic> is either test_xy or test_z"
$stderr.puts " and <eval-string> could be something like 'sleep rand(100)/10.0'"
$stderr.puts " Example: bundle exec ./publisher.rb test_xy 500 'sleep rand(100)/10.0'"
$stderr.puts " or: bundle exec ./publisher.rb test_z 5000 nil"
exit 1
end
topic = ARGV[0]
count = ARGV[1].to_i
eval_string = ARGV[2]

NSQ::Publisher.new('localhost', 4150) do |publisher|
count.times do
publisher.publish(topic, eval_string)
end
end
4 changes: 2 additions & 2 deletions examples/async/reader.rb
Expand Up @@ -11,7 +11,7 @@
puts 'Press enter to start and enter to finish'
$stdin.gets

reader = NSQ.create_reader(
reader = NSQ::Reader.new(
:nsqd_tcp_addresses => '127.0.0.1:4150',
#:logger_level => Logger::DEBUG
)
Expand Down Expand Up @@ -54,7 +54,7 @@ def initialize(index, subscriber, char)
puts
puts "Summary of worker message counts"
threads.each do |char, arr|
puts "#{char} - #{arr.map(&:message_count).join(' ')}"
puts "#{char} - #{arr.map(&:message_count).join(' ')} total=#{arr.map(&:message_count).inject(:+)}"
end
}
$stdin.gets
17 changes: 0 additions & 17 deletions examples/async/writer.rb

This file was deleted.

3 changes: 2 additions & 1 deletion examples/simple/README
@@ -1,4 +1,5 @@
# Start up reader
bundle exec ./reader.rb

# From another terminal, send a message:
curl -d 'hello world' 'http://127.0.0.1:4151/put?topic=test'
bundle exec ./publisher.rb hello world
13 changes: 13 additions & 0 deletions examples/simple/publisher.rb
@@ -0,0 +1,13 @@
#!/usr/bin/env ruby

require 'nsq'

if ARGV.length == 0
$stderr.puts "bundle exec ./publisher.rb <message>*"
$stderr.puts " Example: bundle exec ./publisher.rb hello world"
exit 1
end

NSQ::Publisher.new('localhost', 4150) do |publisher|
publisher.publish('test', ARGV.join(' '))
end
4 changes: 2 additions & 2 deletions examples/simple/reader.rb
Expand Up @@ -6,14 +6,14 @@
# Cntl-c doesn't run at_exit under jruby
puts 'Press enter to start and enter to finish'
gets
reader = NSQ.create_reader(
reader = NSQ::Reader.new(
:nsqd_tcp_addresses => '127.0.0.1:4150',
#:logger_level => Logger::DEBUG
)
thread = Thread.new do
begin
reader.subscribe('test', 'simple') do |message|
puts "Read #{message.body}"
puts "Read: #{message.body.inspect}"
end
reader.run
rescue Exception => e
Expand Down
21 changes: 4 additions & 17 deletions lib/nsq.rb
Expand Up @@ -2,10 +2,12 @@
require 'nsq/message'
require 'nsq/reader'
require 'nsq/subscriber'
require 'nsq/publisher'
require 'nsq/queue_subscriber'
require 'nsq/connection'
require 'nsq/backoff_timer'
require 'nsq/timer'
require 'nsq/util'

module NSQ
extend NSQ::Loggable
Expand All @@ -16,23 +18,8 @@ module NSQ
FRAME_TYPE_ERROR = 1
FRAME_TYPE_MESSAGE = 2

# Create a NSQ::Reader used for subscribing to topics and channels.
# Refer to NSQ::Reader::new for available options.
def self.create_reader(options, &block)
def self.create_reader(options, &block) #:nodoc:
NSQ.logger.info('NSQ#create_reader has been deprecated, please use NSQ::Reader#new instead')
Reader.new(options, &block)
end


def self.assert_topic_and_channel_valid(topic, channel) #:nodoc:
raise "Invalid topic #{topic}" unless valid_topic_name?(topic)
raise "Invalid channel #{channel}" unless valid_channel_name?(channel)
end

def self.valid_topic_name?(topic) #:nodoc:
!!topic.match(/^[\.a-zA-Z0-9_-]+$/)
end

def self.valid_channel_name?(channel) #:nodoc:
!!channel.match(/^[\.a-zA-Z0-9_-]+(#ephemeral)?$/)
end
end
40 changes: 40 additions & 0 deletions lib/nsq/publisher.rb
@@ -0,0 +1,40 @@
require 'socket'

module NSQ
class Publisher
def initialize(host, port, options={}, &block)
@socket = TCPSocket.open(host, port)
@socket.write(MAGIC_V2)
@response_timeout = options[:response_timeout] || 5
yield self if block_given?
ensure
close if block_given?
end

def publish(topic, message)
buf = ['PUB ', topic, "\n", message.length, message].pack('a*a*a*Na*')
@socket.write(buf)
response = ''
loop do
response += @socket.recv(4096)
size, frame, msg = response.unpack('NNa*')
if response.length == size+4
case msg
when 'OK' then return
when 'E_INVALID' then raise 'Invalid message'
when 'E_BAD_TOPIC' then raise 'Bad topic'
when 'E_BAD_MESSAGE' then raise 'Bad message'
when 'E_PUT_FAILED' then raise 'Put failed'
else raise "Unknown PUB response: #{msg}"
end
elsif response.length > size+4
raise "Unexpected PUB response - Expected size = #{size} actual size = #{response.length-4}: message=#{msg}"
end
end
end

def close
@socket.close
end
end
end
2 changes: 1 addition & 1 deletion lib/nsq/reader.rb
Expand Up @@ -67,7 +67,7 @@ def initialize(options={})
# Refer to Subscriber::new for the options that can be passed to this method.
#
def subscribe(topic, channel, options={}, &block)
NSQ.assert_topic_and_channel_valid(topic, channel)
Util.assert_topic_and_channel_valid(topic, channel)
subscriber = nil
name = "#{topic}:#{channel}"
@subscriber_mutex.synchronize do
Expand Down
17 changes: 17 additions & 0 deletions lib/nsq/util.rb
@@ -0,0 +1,17 @@
module NSQ
module Util

def self.assert_topic_and_channel_valid(topic, channel) #:nodoc:
raise "Invalid topic #{topic}" unless valid_topic_name?(topic)
raise "Invalid channel #{channel}" unless valid_channel_name?(channel)
end

def self.valid_topic_name?(topic) #:nodoc:
!!topic.match(/^[\.a-zA-Z0-9_-]+$/)
end

def self.valid_channel_name?(channel) #:nodoc:
!!channel.match(/^[\.a-zA-Z0-9_-]+(#ephemeral)?$/)
end
end
end
3 changes: 1 addition & 2 deletions ruby_nsq.gemspec
Expand Up @@ -10,8 +10,7 @@ Gem::Specification.new do |s|
s.test_files = Dir["test/**/*"]

s.add_dependency 'nio4r'
#s.add_dependency 'http_parser.rb'
#s.add_dependency 'thread_safe'
s.add_dependency 'resilient_socket'
s.add_development_dependency 'rdoc'
s.add_development_dependency 'minitest'
s.add_development_dependency 'turn'
Expand Down

0 comments on commit b0b9ca0

Please sign in to comment.