Permalink
Browse files

Some cleanup

  • Loading branch information...
1 parent 19e4392 commit 7decb83a22f2691bb5d6e5faec7f7d6334a04342 @bpardee bpardee committed Oct 24, 2012
Showing with 236 additions and 75 deletions.
  1. +90 −0 README.md
  2. +16 −5 examples/simple_reader.rb
  3. +5 −0 lib/nsq.rb
  4. +42 −0 lib/nsq/backoff_timer.rb
  5. +10 −5 lib/nsq/connection.rb
  6. +1 −1 lib/nsq/loggable.rb
  7. +30 −0 lib/nsq/message.rb
  8. +5 −59 lib/nsq/reader.rb
  9. +10 −5 lib/nsq/subscriber.rb
  10. +27 −0 test/backoff_timer_test.rb
View
@@ -0,0 +1,90 @@
+# ruby_nsq
+
+https://github.com/ClarityServices/ruby_nsq
+
+## Description:
+
+Ruby client for the [NSQ](https://github.com/bitly/nsq) realtime message processing system.
+
+## Install:
+
+ gem install ruby_nsq
+
+## Usage:
+
+See [examples](https://github.com/ClarityServices/ruby_nsq/tree/master/examples)
+
+Simple example for synchronous message handling:
+```
+require 'nsq'
+
+reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')
+# Subscribe to topic=test channel=simple
+reader.subscribe('test', 'simple') do |message|
+ # If this block raises an exception, then the message will be requeued.
+ puts "Read #{message.body}"
+end
+reader.run # Doesn't return until reader.stop is called
+puts 'Reader stopped'
+```
+
+NOTE: Not yet implemented!
+Advanced example demonstrating asynchronous handling of messages on multiple threads:
+```
+require 'nsq'
+
+foo_worker_count = 50
+bar_worker_count = 30
+baz_worker_count = 20
+
+reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')
+
+foo_queue = reader.subscribe('test', 'foo', :max_in_flight => foo_worker_count)
+bar_queue = reader.subscribe('test2', 'bar', :max_in_flight => bar_worker_count)
+baz_queue = reader.subscribe('test2', 'baz', :max_in_flight => baz_worker_count)
+
+foo_threads = foo_worker_count.times.map do |i|
+ Thread.new(i) do |i|
+ # API #1
+ until foo_queue.stopped?
+ message = foo_queue.read
+ if message
+ begin
+ puts 'Foo[%02d] read: %s' % i, message.body
+ sleep 1 # Dummy processing of message
+ # Give success status so message won't be requeued
+ message.success!
+ rescue Exception => e
+ # Message will be requeued
+ message.failure!
+ end
+ end
+ end
+
+ # API #2 (Equivalent to above)
+ foo_queue.run do |message|
+ puts 'Foo[%02d] read: %s' % i, message.body
+ sleep 1 # Dummy processing of message
+ end
+
+ puts 'Foo[%02d] thread exiting' % i
+ end
+end
+
+bar_threads = ... Same kind of thing as above ...
+baz_threads = ... Same kind of thing as above ...
+
+reader.run # Doesn't return until reader.stop is called
+puts 'Reader stopped'
+foo_threads.each(&:join)
+bar_threads.each(&:join)
+baz_threads.each(&:join)
+```
+
+## TODO:
+
+* No block subscribe calls that will return queue
+
+* Ready logic
+
+* Backoff for connections and failed messages.
View
@@ -1,8 +1,19 @@
require 'nsq'
+require 'logger'
-reader = NSQ::Reader.new(:nsqd_tcp_addresses => '127.0.0.1:4150')
-reader.subscribe('test', 'simple') do |id, timestamp, attempts, body|
- puts "id=#{id} ts=#{timestamp} attempts=#{attempts} body=#{body}"
+puts 'Press enter to start and enter to finish'
+gets
+reader = NSQ.create_reader(
+ :nsqd_tcp_addresses => '127.0.0.1:4150',
+ :logger_level => Logger::DEBUG
+)
+thread = Thread.new do
+ reader.subscribe('test', 'simple') do |message|
+ puts "Read #{message.body}"
+ end
+ reader.run
+ puts 'Reader exiting'
end
-at_exit { reader.stop }
-reader.run
+gets
+reader.stop
+thread.join
View
@@ -1,4 +1,5 @@
require 'nsq/loggable'
+require 'nsq/message'
require 'nsq/reader'
require 'nsq/subscriber'
require 'nsq/connection'
@@ -12,6 +13,10 @@ module NSQ
FRAME_TYPE_ERROR = 1
FRAME_TYPE_MESSAGE = 2
+ def self.create_reader(options, &block)
+ Reader.new(options, &block)
+ end
+
def self.assert_topic_and_channel_valid(topic, channel)
raise "Invalid topic #{topic}" unless valid_topic_name?(topic)
raise "Invalid channel #{channel}" unless valid_channel_name?(channel)
View
@@ -0,0 +1,42 @@
+# Stolen from pynsq library since somebodies thought about this a lot more than me
+module NSQ
+ # This is a timer that is smart about backing off exponentially when there are problems
+ class BackoffTimer
+
+ attr_reader :min_interval, :max_interval, :short_interval, :long_interval
+
+ def initialize(min_interval, max_interval, ratio=0.25, short_length=10, long_length=250)
+ @min_interval = min_interval.to_f
+ @max_interval = max_interval.to_f
+ ratio = ratio.to_f
+
+ @max_short_timer = (@max_interval - @min_interval) * ratio
+ @max_long_timer = (@max_interval - @min_interval) * (1.0 - ratio)
+ @short_unit = @max_short_timer / short_length
+ @long_unit = @max_long_timer / long_length
+
+ @short_interval = 0.0
+ @long_interval = 0.0
+ end
+
+ # Update the timer to reflect a successful call
+ def success
+ @short_interval -= @short_unit
+ @long_interval -= @long_unit
+ @short_interval = [@short_interval, 0.0].max
+ @long_interval = [@long_interval, 0.0].max
+ end
+
+ # Update the timer to reflect a failed call
+ def failure
+ @short_interval += @short_unit
+ @long_interval += @long_unit
+ @short_interval = [@short_interval, @max_short_timer].min
+ @long_interval = [@long_interval, @max_long_timer].min
+ end
+
+ def interval
+ @min_interval + @short_interval + @long_interval
+ end
+ end
+end
View
@@ -34,7 +34,7 @@ def close
@selector.deregister(@socket)
write "CLS\n"
@socket.close
- rescue
+ rescue Exception => e
ensure
@socket = nil
end
@@ -70,9 +70,9 @@ def do_connect
end
def read_messages
- NSQ.logger.debug("Before read buffer=#{@buffer.inspect}")
+ #NSQ.logger.debug("Before read buffer=#{@buffer.inspect}")
@buffer << @socket.read_nonblock(4096)
- NSQ.logger.debug("After read buffer=#{@buffer.inspect}")
+ #NSQ.logger.debug("After read buffer=#{@buffer.inspect}")
while @buffer.length >= 8
size, frame = @buffer.unpack('NN')
break if @buffer.length < 4+size
@@ -92,10 +92,11 @@ def read_messages
when NSQ::FRAME_TYPE_MESSAGE
raise "Bad message: #{@buffer.inspect}" if size < 34
ts_hi, ts_lo, attempts, id = @buffer.unpack('@8NNna16')
- timestamp = Time.at((ts_hi * 2**32 + ts_lo) / 1000000000.0)
body = @buffer[34, size-30]
+ message = Message.new(self, id, ts_hi, ts_lo, attempts, body)
@buffer = @buffer[(4+size)..-1]
- @subscriber.handle_message(self, id, timestamp, attempts, body)
+ NSQ.logger.debug {"#{self}: Read message=#{message}"}
+ @subscriber.handle_message(self, message)
else
raise "Unrecognized message frame: #{frame} buffer=#{@buffer.inspect}"
end
@@ -112,5 +113,9 @@ def write(msg)
NSQ.logger.debug {"#{@name}: Sending #{msg.inspect}"}
@socket.write(msg)
end
+
+ def to_s
+ @name
+ end
end
end
View
@@ -12,7 +12,7 @@ def rails_logger
def default_logger
require 'logger'
l = Logger.new($stdout)
- l.level = Logger::DEBUG
+ l.level = Logger::INFO
l
end
View
@@ -0,0 +1,30 @@
+module NSQ
+ class Message
+ attr_reader :connection, :id, :attempts, :body
+
+ def initialize(connection, id, timestamp_high, timestamp_low, attempts, body)
+ @connection = connection
+ @id = id
+ @timestamp_high = timestamp_high
+ @timestamp_low = timestamp_low
+ @attempts = attempts
+ @body = body
+ end
+
+ def timestamp
+ Time.at((@timestamp_high * 2**32 + @timestamp_low) / 1000000000.0)
+ end
+
+ def success!
+
+ end
+
+ def failure!
+
+ end
+
+ def to_s
+ "#{connection} id=#{id} timestamp=#{timestamp} attempts=#{attempts} body=#{body}"
+ end
+ end
+end
View
@@ -4,62 +4,6 @@
require 'nio'
#require 'thread_safe'
-#NSQ base reader class.
-#
-#This receives messages from nsqd and calls task methods to process that message
-#
-#It handles the logic for backing off on retries and giving up on a message
-#
-#ex.
-# import nsq
-#
-# def task1(message):
-# print message
-# return True
-#
-# def task2(message):
-# print message
-# return True
-#
-# all_tasks = {"task1": task1, "task2": task2}
-# r = nsq.Reader(all_tasks, lookupd_http_addresses=['127.0.0.1:4161'],
-# topic="nsq_reader", channel="asdf", lookupd_poll_interval=15)
-# nsq.run()
-
-#import logging
-#import os
-#import ujson as json
-#import time
-#import signal
-#import socket
-#import functools
-#import urllib
-#
-#import tornado.options
-#import tornado.ioloop
-#import tornado.httpclient
-#
-#import BackoffTimer
-#import nsq
-#import async
-#
-#
-#tornado.options.define('heartbeat_file', type=str, default=None, help="path to a file to touch for heartbeats")
-
-# Reader provides a loop that calls each task provided by ``all_tasks`` up to ``max_tries``
-# requeueing on any failures with increasing multiples of ``requeue_delay`` between subsequent
-# tries of each message.
-# options:
-# preprocess_method - defines an optional method that can alter the message data before
-# other task functions are called.
-# validate_method - defines an optional method that returns a boolean as to weather or not
-# this message should be processed.
-# all_tasks - defines the a mapping of tasks and functions that individually will be called
-# with the message data.
-# ``async`` determines whether handlers will do asynchronous processing. If set to True, handlers
-# must accept a keyword argument called "finisher" that will be a callable used to signal message
-# completion, with a boolean argument indicating success
-
module NSQ
class Reader
attr_reader :name, :long_id, :short_id
@@ -74,12 +18,13 @@ def initialize(options={})
@long_id = options[:long_id] || Socket.gethostname
@short_id = options[:short_id] || @long_id.split('.')[0]
NSQ.logger = options[:logger] if options[:logger]
+ NSQ.logger.level = options[:logger_level] if options[:logger_level]
- @selector = NIO::Selector.new
+ @selector = ::NIO::Selector.new
@topic_count = Hash.new(0)
@subscribers = {}
@subscriber_mutex = Monitor.new
- @name = "#{@long_id}-#{@short_id}"
+ @name = "#{@long_id}:#{@short_id}"
raise 'Must pass either option :nsqd_tcp_addresses or :lookupd_http_addresses' if @nsqd_tcp_addresses.empty? && @lookupd_http_addresses.empty?
@@ -131,11 +76,12 @@ def run
end
def stop
+ NSQ.logger.info("#{self}: Reader stopping...")
@stopped = true
+ @selector.wakeup
@subscriber_mutex.synchronize do
@subscribers.each_value {|subscriber| subscriber.close}
end
- @selector.wakeup
end
def to_s
View
@@ -52,22 +52,23 @@ def handle_connection(connection)
def handle_heartbeat(connection)
end
- def handle_message(connection, id, timestamp, attempts, body)
- @block.call(id, timestamp, attempts, body)
- connection.send_finish(id)
+ def handle_message(connection, message)
+ @block.call(message)
+ connection.send_finish(message.id)
rescue Exception => e
NSQ.logger.error("#{connection.name}: Exception during handle_message: #{e.message}\n\t#{e.backtrace.join("\n\t")}")
if @max_tries && attempts >= @max_tries
NSQ.logger.warning("#{connection.name}: Giving up on message after #{@max_tries} tries: #{body.inspect}")
- connection.send_finish(id)
+ connection.send_finish(message.id)
else
- connection.send_requeue(id, attempts * @requeue_delay)
+ connection.send_requeue(message.id, attempts * @requeue_delay)
end
ensure
handle_ready_count(connection)
end
def handle_ready_count(connection)
+ # TODO: Need to add 25% logic
connection.send_ready(self.connection_max_in_flight)
end
@@ -82,5 +83,9 @@ def handle_io_error(connection, exception)
connection.close
connection.connect
end
+
+ def to_s
+ @name
+ end
end
end
Oops, something went wrong.

0 comments on commit 7decb83

Please sign in to comment.