Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: add basic instrumentation to defer queue #19824

Merged
merged 1 commit into from Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions lib/scheduler/defer.rb
Expand Up @@ -4,15 +4,18 @@
module Scheduler
module Deferrable
DEFAULT_TIMEOUT ||= 90
STATS_CACHE_SIZE ||= 100

def initialize
@async = !Rails.env.test?
@queue = Queue.new
@mutex = Mutex.new
@stats_mutex = Mutex.new
@paused = false
@thread = nil
@reactor = nil
@timeout = DEFAULT_TIMEOUT
@stats = LruRedux::ThreadSafeCache.new(STATS_CACHE_SIZE)
end

def timeout=(t)
Expand All @@ -23,6 +26,10 @@ def length
@queue.length
end

def stats
@stats_mutex.synchronize { @stats.to_a }
end

def pause
stop!
@paused = true
Expand All @@ -38,6 +45,11 @@ def async=(val)
end

def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk)
@stats_mutex.synchronize do
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
stats[:queued] += 1
end

if @async
start_thread if !@thread&.alive? && !@paused
@queue << [db, blk, desc]
Expand Down Expand Up @@ -74,6 +86,7 @@ def start_thread
# using non_block to match Ruby #deq
def do_work(non_block = false)
db, job, desc = @queue.deq(non_block)
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
db ||= RailsMultisite::ConnectionManagement::DEFAULT

RailsMultisite::ConnectionManagement.with_connection(db) do
Expand All @@ -84,6 +97,10 @@ def do_work(non_block = false)
end if !non_block
job.call
rescue => ex
@stats_mutex.synchronize do
stats = @stats[desc]
stats[:errors] += 1 if stats
end
Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'")
ensure
warning_job&.cancel
Expand All @@ -93,6 +110,13 @@ def do_work(non_block = false)
Discourse.handle_job_exception(ex, message: "Processing deferred code queue")
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
@stats_mutex.synchronize do
stats = @stats[desc]
if stats
stats[:finished] += 1
stats[:duration] += Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
end
end
end
end

Expand Down
26 changes: 26 additions & 0 deletions spec/lib/scheduler/defer_spec.rb
Expand Up @@ -18,6 +18,32 @@ def wait_for(timeout, &blk)

after { @defer.stop! }

it "supports basic instrumentation" do
@defer.later("first") {}
@defer.later("first") {}
@defer.later("second") {}
@defer.later("bad") { raise "boom" }

wait_for(200) { @defer.length == 0 }

stats = Hash[@defer.stats]

expect(stats["first"][:queued]).to eq(2)
expect(stats["first"][:finished]).to eq(2)
expect(stats["first"][:errors]).to eq(0)
expect(stats["first"][:duration]).to be > 0

expect(stats["second"][:queued]).to eq(1)
expect(stats["second"][:finished]).to eq(1)
expect(stats["second"][:errors]).to eq(0)
expect(stats["second"][:duration]).to be > 0

expect(stats["bad"][:queued]).to eq(1)
expect(stats["bad"][:finished]).to eq(1)
expect(stats["bad"][:duration]).to be > 0
expect(stats["bad"][:errors]).to eq(1)
end

it "supports timeout reporting" do
@defer.timeout = 0.05

Expand Down