Skip to content

Commit

Permalink
Summary of worker handled messages in async example
Browse files Browse the repository at this point in the history
  • Loading branch information
bpardee committed Oct 25, 2012
1 parent f5d6abd commit a32e4af
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions examples/async/reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,44 @@
puts 'Press enter to start and enter to finish'
$stdin.gets

finish_mutex = Mutex.new
reader = NSQ.create_reader(:nsqd_tcp_addresses => '127.0.0.1:4150')

x_subscriber = reader.subscribe('test_xy', 'x', :max_in_flight => x_worker_count)
y_subscriber = reader.subscribe('test_xy', 'y', :max_in_flight => y_worker_count)
z_subscriber = reader.subscribe('test_z', 'z', :max_in_flight => z_worker_count)

threads = []
[[x_subscriber, x_worker_count, 'x'], [y_subscriber, y_worker_count, 'y'], [z_subscriber, z_worker_count, 'z']].each do |subscriber, count, char|
threads += count.times.map do |i|
Thread.new(i, subscriber, char) do |i, subscriber, char|
message_count = 0
class MyThread < Thread
attr_accessor :message_count
def initialize(index, subscriber, char)
@index = index
super do |i, subscriber, char|
@message_count = 0
subscriber.run do |message|
print char
eval message.body
message_count += 1
end
if ARGV[0] == '-v'
finish_mutex.synchronize { puts '%s[%02d] handled %4d messages' % [char, i, message_count]}
else
print char.upcase
print char
@message_count += 1
end
print char.upcase
end
end
end

threads = {}
[[x_subscriber, x_worker_count, 'x'], [y_subscriber, y_worker_count, 'y'], [z_subscriber, z_worker_count, 'z']].each do |subscriber, count, char|
threads[char] = count.times.map do |i|
MyThread.new(i, subscriber, char)
end
end

main_thread = Thread.new do
reader.run
end
$stdin.gets
puts 'Exiting...'
reader.stop
main_thread.join
threads.each(&:join)
threads.each_value { |arr| arr.each(&:join) }
puts
threads.each do |char, arr|
puts "#{char} - #{arr.map(&:message_count).join(' ')}"
end

0 comments on commit a32e4af

Please sign in to comment.