Skip to content

Commit

Permalink
Merge branch 'fix/DEX-2092/v6-improvements' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2092] fix: per box-based poll rate-limiter + dynamic brpop timeout calculation

Closes DEX-2092

See merge request nstmrt/rubygems/outbox!84
  • Loading branch information
ysatarov committed Mar 28, 2024
2 parents 8789cb8 + 38c81c9 commit 9f57cb5
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 6 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased] - yyyy-mm-dd

## [6.0.1] - 2024-03-28

### Added
- `PollThrottler::RateLimited` is now per box-based

### Fixed
- `BRPOP` timeout default for queue processing

## [6.0.0] - 2024-02-04

### Added
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/engine.rb
Expand Up @@ -48,7 +48,7 @@ class Engine < Rails::Engine
pc.threads_count = 4
pc.general_timeout = 120
pc.cutoff_timeout = 60
pc.brpop_delay = 2
pc.brpop_delay = 1
end

c.database_switcher = "Sbmt::Outbox::DatabaseSwitcher"
Expand Down
27 changes: 24 additions & 3 deletions lib/sbmt/outbox/v2/poll_throttler/rate_limited.rb
Expand Up @@ -8,15 +8,36 @@ module Outbox
module V2
module PollThrottler
class RateLimited < Base
attr_reader :queues

def initialize(limit: nil, interval: nil, balanced: true)
@queue = Limiter::RateQueue.new(limit, interval: interval, balanced: balanced)
@limit = limit
@interval = interval
@balanced = balanced
@queues = {}
@mutex = Mutex.new
end

def wait(_worker_num, _poll_task, _task_result)
@queue.shift
def wait(_worker_num, poll_task, _task_result)
queue_for(poll_task).shift

Success(Sbmt::Outbox::V2::Throttler::THROTTLE_STATUS)
end

private

def queue_for(task)
key = task.item_class.box_name
return @queues[key] if @queues.key?(key)

@mutex.synchronize do
return @queues[key] if @queues.key?(key)

@queues[key] = Limiter::RateQueue.new(
@limit, interval: @interval, balanced: @balanced
)
end
end
end
end
end
Expand Down
10 changes: 9 additions & 1 deletion lib/sbmt/outbox/v2/processor.rb
Expand Up @@ -12,6 +12,8 @@ class Processor < BoxProcessor
delegate :processor_config, :batch_process_middlewares, :logger, to: "Sbmt::Outbox"
attr_reader :lock_timeout, :brpop_delay

REDIS_BRPOP_MIN_DELAY = 0.1

def initialize(
boxes,
threads_count: nil,
Expand All @@ -20,7 +22,7 @@ def initialize(
redis: nil
)
@lock_timeout = lock_timeout || processor_config.general_timeout
@brpop_delay = brpop_delay || processor_config.brpop_delay
@brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay)

super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis)
end
Expand Down Expand Up @@ -95,6 +97,12 @@ def fetch_redis_job(scheduled_task)
def redis_block_timeout
redis.read_timeout + brpop_delay
end

def redis_brpop_delay(boxes_count, default_delay)
return default_delay if boxes_count == 1

REDIS_BRPOP_MIN_DELAY
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.0.0"
VERSION = "6.0.1"
end
end
35 changes: 35 additions & 0 deletions spec/lib/sbmt/outbox/v2/poll_throttler/rate_limited_spec.rb
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require "sbmt/outbox/v2/poller"
require "sbmt/outbox/v2/poll_throttler/rate_limited"

describe Sbmt::Outbox::V2::PollThrottler::RateLimited do
let(:limit) { 60 }
let(:interval) { 60 }

let(:throttler) { described_class.new(limit: limit, interval: interval) }
let(:task_inbox) do
Sbmt::Outbox::V2::Tasks::Poll.new(
item_class: InboxItem, worker_name: "poller", partition: 0, buckets: [0, 2]
)
end
let(:task_outbox) do
Sbmt::Outbox::V2::Tasks::Poll.new(
item_class: OutboxItem, worker_name: "poller", partition: 0, buckets: [0, 2]
)
end

it "uses different queues for item classes" do
expect do
throttler.call(0, task_inbox, Sbmt::Outbox::V2::ThreadPool::PROCESSED)
throttler.call(0, task_outbox, Sbmt::Outbox::V2::ThreadPool::PROCESSED)
throttler.call(0, task_inbox, Sbmt::Outbox::V2::ThreadPool::PROCESSED)
throttler.call(0, task_outbox, Sbmt::Outbox::V2::ThreadPool::PROCESSED)
end.to increment_yabeda_counter(Yabeda.box_worker.poll_throttling_counter).with_tags(status: "throttle", throttler: "Sbmt::Outbox::V2::PollThrottler::RateLimited", name: "inbox_item", type: :inbox, worker_name: "poller", worker_version: 2).by(2)
.and increment_yabeda_counter(Yabeda.box_worker.poll_throttling_counter).with_tags(status: "throttle", throttler: "Sbmt::Outbox::V2::PollThrottler::RateLimited", name: "outbox_item", type: :outbox, worker_name: "poller", worker_version: 2).by(2)
.and measure_yabeda_histogram(Yabeda.box_worker.poll_throttling_runtime).with_tags(throttler: "Sbmt::Outbox::V2::PollThrottler::RateLimited", name: "inbox_item", type: :inbox, worker_name: "poller", worker_version: 2)
.and measure_yabeda_histogram(Yabeda.box_worker.poll_throttling_runtime).with_tags(throttler: "Sbmt::Outbox::V2::PollThrottler::RateLimited", name: "outbox_item", type: :outbox, worker_name: "poller", worker_version: 2)

expect(throttler.queues.count).to eq(2)
end
end

0 comments on commit 9f57cb5

Please sign in to comment.