From 38df1045d2479058712b2e0efa066c3b4c98b4e4 Mon Sep 17 00:00:00 2001 From: David Balatero Date: Wed, 17 Jun 2009 17:28:12 -0700 Subject: [PATCH] Fixed concurrency problems in window counter. Rewrote WindowCounter to be much more efficient. [#11: resolved] --- lib/queue_stick/window_counter.rb | 54 +++++++++---------------- spec/queue_stick/window_counter_spec.rb | 22 +++++++++- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/lib/queue_stick/window_counter.rb b/lib/queue_stick/window_counter.rb index 78c1836..8033314 100644 --- a/lib/queue_stick/window_counter.rb +++ b/lib/queue_stick/window_counter.rb @@ -1,3 +1,5 @@ +require 'thread' + module QueueStick class WindowCounter attr_reader :name @@ -8,60 +10,40 @@ def initialize(name, window) 'counter requires a name' if name.nil? raise ArgumentError, 'counter requires a time window' if window.nil? + @name = name + @window = window - # Converted to seconds. - @window = window * 60 - @count = 0 - - @counts = [] - @timings = [] + @counts = Hash.new { |h, k| h[k] = 0 } + @mutex = Mutex.new end - # TODO(dbalatero): break this up? def count - to_delete = [] + current_minute = self.class.current_minute + range = (current_minute - @window)..current_minute sum = 0 - threshold = threshold_time - - @counts.each_index do |index| - if @timings[index] < threshold - to_delete << index - else - sum += @counts[index] + @mutex.synchronize do + range.each do |key| + sum += @counts[key] end end - - to_delete.each do |index| - @counts.delete_at(index) - @timings.delete_at(index) - end - sum end def increment!(by = 1) - current = Time.now - current -= current.sec - timing = @timings.last - - if timing and - self.class.times_have_same_minute?(current, timing) - @counts[@counts.size - 1] += by - else - @counts << by - @timings << current + current_minute = self.class.current_minute + @mutex.synchronize do + @counts[current_minute] += by end end private - def self.times_have_same_minute?(time1, time2) - time1.to_i / 60 == time2.to_i / 60 + def self.current_minute + minute_for(Time.now) end - def threshold_time - current = Time.now - current - (current.sec + @window) + 60 + def self.minute_for(time) + (time.to_f / 60).round end end end diff --git a/spec/queue_stick/window_counter_spec.rb b/spec/queue_stick/window_counter_spec.rb index b7ab57b..99d1180 100644 --- a/spec/queue_stick/window_counter_spec.rb +++ b/spec/queue_stick/window_counter_spec.rb @@ -33,6 +33,26 @@ counter = QueueStick::WindowCounter.new(:test, 5) counter.count.should == 0 end + + # see lighthouse ticket #11 + it "should not flip the count back to 0 randomly with multiple threads contending" do + counter = QueueStick::WindowCounter.new(:test, 1) + threads = [] + + # 16 increments, 4 threads + 4.times do + threads << Thread.new(counter) do |c| + 4.times do + c.count + c.increment! + c.count + end + end + end + threads.each { |thread| thread.join } + + counter.count.should == 16 + end end describe "increment!" do @@ -73,7 +93,7 @@ @counter.count.should == 33 Time.stub!(:now). - and_return(Time.mktime(2009, 5, 18, 22, 36, 0, 0)) + and_return(Time.mktime(2009, 5, 18, 22, 37, 31, 0)) @counter.count.should == 0 end end