Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Fixed timing bug in batch publishing, fixes #4
Browse files Browse the repository at this point in the history
  • Loading branch information
iconara committed Jun 18, 2012
1 parent a5a734e commit cf54d36
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
7 changes: 5 additions & 2 deletions lib/autobahn/publisher.rb
Expand Up @@ -55,11 +55,12 @@ def start!
if @batch_options[:timeout] && @batch_options[:timeout] > 0
@scheduler = Concurrency::Executors.new_single_thread_scheduled_executor(Concurrency::NamingDaemonThreadFactory.new('batch_drainer'))
@drainer_task = @scheduler.schedule_with_fixed_delay(
method(:force_drain).to_proc,
method(:maybe_force_drain).to_proc,
@batch_options[:timeout],
@batch_options[:timeout],
Concurrency::TimeUnit::SECONDS
)
@last_drain = Time.now
end
self
end
Expand Down Expand Up @@ -101,9 +102,11 @@ def drain_batch
end
end
@publisher.publish(batch)
@last_drain = Time.now
end

def force_drain
def maybe_force_drain
return if (Time.now - @last_drain) < @batch_options[:timeout]
drain
drain_batch
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/autobahn_intg_spec.rb
Expand Up @@ -150,7 +150,7 @@ def counting_down(n, options={})
it 'sends a batch after a timeout, even if it is not full' do
@publisher.publish('hello' => 'world')
@publisher.publish('foo' => 'bar')
sleep(@batch_timeout + 0.5)
sleep(@batch_timeout * 2)
message = @queues.map { |q| h, m = q.get; m }.compact.first
@encoder.decode(message).should == [{'hello' => 'world'}, {'foo' => 'bar'}]
end
Expand Down

0 comments on commit cf54d36

Please sign in to comment.