Skip to content

Commit

Permalink
Backoff logic for READY (completely untested). Let QueueSubscriber's …
Browse files Browse the repository at this point in the history
…finish up work before closing socket.
  • Loading branch information
bpardee committed Oct 26, 2012
1 parent f7e2e0b commit 4dc89c6
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 48 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ baz_threads.each(&:join)

## TODO:

* Ready logic

* Backoff for connections and failed messages.

* Fix timestamp

* Implement lookupd
Expand Down
34 changes: 34 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env rake
begin
require 'bundler/setup'
rescue LoadError
puts 'You must `gem install bundler` and `bundle install` to run rake tasks'
end
begin
require 'rdoc/task'
rescue LoadError
require 'rdoc/rdoc'
require 'rake/rdoctask'
RDoc::Task = Rake::RDocTask
end

RDoc::Task.new(:rdoc) do |rdoc|
rdoc.rdoc_dir = 'rdoc'
rdoc.title = 'ruby_nsq'
rdoc.options << '--line-numbers'
rdoc.rdoc_files.include('README.md')
rdoc.rdoc_files.include('lib/**/*.rb')
end

Bundler::GemHelper.install_tasks

require 'rake/testtask'

Rake::TestTask.new(:test) do |t|
t.libs << 'lib'
t.libs << 'test'
t.pattern = 'test/**/*_test.rb'
t.verbose = false
end

task :default => :test
8 changes: 7 additions & 1 deletion examples/async/reader.rb
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env ruby

require 'nsq'
require 'thread'
require 'logger'

x_worker_count = 50
y_worker_count = 30
Expand All @@ -8,7 +11,10 @@
puts 'Press enter to start and enter to finish'
$stdin.gets

reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')
reader = NSQ.create_reader(
:nsqd_tcp_addresses => '127.0.0.1:4150',
:logger_level => Logger::DEBUG
)

x_subscriber = reader.subscribe('test_xy', 'x', :max_in_flight => x_worker_count)
y_subscriber = reader.subscribe('test_xy', 'y', :max_in_flight => y_worker_count)
Expand Down
6 changes: 4 additions & 2 deletions examples/async/writer.rb
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env ruby

if ARGV.length != 3
$stderr.puts "ruby writer.rb <topic> <count> <eval-string>"
$stderr.puts " where <topic> is either test_xy or test_z"
$stderr.puts " and <eval-string> could be something like 'sleep rand(100)/10.0'"
$stderr.puts " Example: ruby writer.rb test_xy 500 'sleep rand(100)/10.0'"
$stderr.puts " or: ruby writer.rb test_z 5000 nil"
$stderr.puts " Example: ./writer.rb test_xy 500 'sleep rand(100)/10.0'"
$stderr.puts " or: ./writer.rb test_z 5000 nil"
exit 1
end
topic = ARGV[0]
Expand Down
2 changes: 1 addition & 1 deletion examples/simple/reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
gets
reader = NSQ.create_reader(
:nsqd_tcp_addresses => '127.0.0.1:4150',
:logger_level => Logger::DEBUG
#:logger_level => Logger::DEBUG
)
thread = Thread.new do
begin
Expand Down
59 changes: 43 additions & 16 deletions lib/nsq/connection.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'monitor'
require 'thread' #Mutex

module NSQ
class Connection
Expand All @@ -12,6 +13,8 @@ def initialize(reader, subscriber, host, port)
@port = port
@name = "#{subscriber.name}:#{host}:#{port}"
@write_monitor = Monitor.new
@ready_mutex = Mutex.new
@sending_ready = false

# Connect states :init, :interval, :connecting, :connected, :closed
@connect_state = :init
Expand All @@ -24,23 +27,38 @@ def initialize(reader, subscriber, host, port)
connect
end

def send_init(topic, channel, short_id, long_id, ready_count)
def send_init(topic, channel, short_id, long_id)
write NSQ::MAGIC_V2
write "SUB #{topic} #{channel} #{short_id} #{long_id}\n"
self.send_ready(ready_count)
self.send_ready
end

def send_ready(count)
@ready_count = count
write "RDY #{count}\n"
def send_ready
@ready_count = @subscriber.ready_count
write "RDY #{@ready_count}\n" unless @subscriber.stopped?
@sending_ready = false
end

def send_finish(id)
def send_finish(id, success)
write "FIN #{id}\n"
@ready_mutex.synchronize do
@ready_count -= 1
if success
@ready_backoff_timer.success
else
@ready_backoff_timer.failure
end
check_ready
end
end

def send_requeue(id, time_ms)
write "REQ #{id} #{time_ms}\n"
@ready_mutex.synchronize do
@ready_count -= 1
@ready_backoff_timer.failure
check_ready
end
end

def reset
Expand All @@ -54,8 +72,8 @@ def reset
interval = @connection_backoff_timer.interval
if interval > 0
@connect_state = :interval
NSQ.logger.debug {"#{self}: Reattempting connection in #{interval} seconds"}
@reader.add_timeout(interval) do
NSQ.logger.debug {"#{self}: Reattempting connection in #{interval} seconds"}
connect
end
else
Expand Down Expand Up @@ -95,14 +113,6 @@ def connect
do_connect
end

def message_success!

end

def message_failure!

end

private

def do_connect
Expand All @@ -129,6 +139,21 @@ def do_connect
end
end

def check_ready
if !@sending_ready && @ready_count <= @subscriber.ready_threshold
interval = @ready_backoff_timer.interval
if interval == 0.0
send_ready
else
NSQ.logger.debug {"#{self}: Delaying READY for #{interval} seconds"}
@sending_ready = true
@reader.add_timeout(interval) do
send_ready
end
end
end
end

def read_messages
@buffer << @socket.read_nonblock(4096)
while @buffer.length >= 8
Expand Down Expand Up @@ -170,7 +195,9 @@ def send_nop
def write(msg)
NSQ.logger.debug {"#{@name}: Sending #{msg.inspect}"}
# We should only ever have one reader but we can have multiple writers
@write_monitor.synchronize { @socket.write(msg) if verify_connect_state?(:connected) }
@write_monitor.synchronize do
@socket.write(msg) if verify_connect_state?(:connected)
end
end

def to_s
Expand Down
21 changes: 18 additions & 3 deletions lib/nsq/queue_subscriber.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
require 'thread' #Mutex

module NSQ
class QueueSubscriber < Subscriber
def initialize(reader, topic, channel, options)
super
@queue = Queue.new
@queue = Queue.new
@run_mutex = Mutex.new
@run_count = 0
end

def ready_count
# Return the minimum of Subscriber#ready_count and the amount of space left in the queue
[super, self.max_in_flight - @queue.size].min
end

def handle_message(connection, message)
@queue << [connection, message]
end

def run(&block)
@run_mutex.synchronize { @run_count += 1}
until @stopped
pair = @queue.pop
if pair == :stop
Expand All @@ -19,12 +29,17 @@ def run(&block)
connection, message = pair
process_message(connection, message, &block)
end
ensure
@run_mutex.synchronize { @run_count -= 1}
end

def stop
super
# Give the threads something to popd
@stopped = true
# Give the threads something to pop
@queue << :stop
# TODO: Put a max time on this so we don't potentially hang
sleep 1 while @run_count > 0
super
end
end
end
31 changes: 14 additions & 17 deletions lib/nsq/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def initialize(reader, topic, channel, options, &block)
@channel = channel
@block = block
@max_tries = options[:max_tries]
@max_in_flight = options[:max_in_flight] || 1
@max_in_flight = (options[:max_in_flight] || 1).to_i
@requeue_delay = (options[:requeue_delay] || 90).to_i * 1000
@connection_hash = {}

Expand Down Expand Up @@ -42,10 +42,16 @@ def create_connection_backoff_timer
BackoffTimer.new(@connection_min_interval, @connection_max_interval, @connection_ratio, @connection_short_length, @connection_long_length)
end

def connection_max_in_flight
# TODO: Maybe think about this a little more
val = @max_in_flight / [@connection_hash.size, 1].max
[val, 1].max
# Threshold for a connection where it's time to send a new READY message
def ready_threshold
@max_in_flight / @connection_hash.size / 4
end

# The actual value for the READY message
def ready_count
# TODO: Should we take into account the last_ready_count minus the number of messages sent since then?
# Rounding up!
(@max_in_flight + @connection_hash.size - 1) / @connection_hash.size
end

def connection_count
Expand Down Expand Up @@ -75,7 +81,7 @@ def stopped?
end

def handle_connection(connection)
connection.send_init(@topic, @channel, @reader.short_id, @reader.long_id, self.connection_max_in_flight)
connection.send_init(@topic, @channel, @reader.short_id, @reader.long_id)
end

def handle_heartbeat(connection)
Expand All @@ -87,24 +93,15 @@ def handle_message(connection, message)

def process_message(connection, message, &block)
yield message
connection.send_finish(message.id)
connection.message_success!
connection.send_finish(message.id, true)
rescue Exception => e
NSQ.logger.error("#{connection.name}: Exception during handle_message: #{e.message}\n\t#{e.backtrace.join("\n\t")}")
if @max_tries && attempts >= @max_tries
NSQ.logger.warning("#{connection.name}: Giving up on message after #{@max_tries} tries: #{body.inspect}")
connection.send_finish(message.id)
connection.send_finish(message.id, false)
else
connection.send_requeue(message.id, attempts * @requeue_delay)
end
connection.message_failure!
ensure
handle_ready_count(connection)
end

def handle_ready_count(connection)
# TODO: Need to add 25% logic
connection.send_ready(self.connection_max_in_flight)
end

def handle_frame_error(connection, error_message)
Expand Down
7 changes: 3 additions & 4 deletions ruby_nsq.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ Gem::Specification.new do |s|
s.authors = ['Brad Pardee']
s.email = ['bradpardee@gmail.com']
s.homepage = 'http://github.com/ClarityServices/ruby_nsq'
s.files = Dir["{lib}/**/*"] + %w(LICENSE.txt Rakefile History.md README.md)
s.files = Dir["{lib,examples}/**/*"] + %w(LICENSE.txt Rakefile History.md README.md)
s.test_files = Dir["test/**/*"]

s.add_dependency 'resilient_socket'
s.add_dependency 'nio4r'
s.add_dependency 'http_parser.rb'
s.add_dependency 'thread_safe'
#s.add_dependency 'http_parser.rb'
#s.add_dependency 'thread_safe'
s.add_development_dependency 'rdoc'
s.add_development_dependency 'minitest'
s.add_development_dependency 'turn'
Expand Down

0 comments on commit 4dc89c6

Please sign in to comment.